view zb.py @ 0:bdcdd8380d94

Some test routines & framework for talking to MaxStream ZigBee modules.
author darius@inchoate.localdomain
date Sun, 28 Oct 2007 20:24:28 +1030
parents
children b5d8ec0ffd21
line wrap: on
line source

import serial, struct

s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \
		  stopbits=1, rtscts=0)
s.setTimeout(0.1)
#s.write('+++')
#s.readline(eol='\r')

testdata = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x07', '@', '~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8']

def readpkt(s):
    l = []
    while (1):
        a = s.read()
        if (a == ''):
            break
        l.append(a)
    
    return l

class Packet:
    def __init__(self):
        pass

class unPacker:
    def __init__(self):
        self.buffer = []
        self.state = 'init'
        self.packets = []

        self.bufmsb = 0
        self.dataleft = 0
        
        self.fr_err = 0 # Framing error
        self.ck_err = 0 # Checksum error
        
    def process(self, data):
        for d in data:
            dint = struct.unpack('B', d)[0]
            if (self.state == 'init'):
                if (d != '\x7e'):
                    print "Framing error, got 0x%02x, expected 0x7f" % (dint)
                    self.fr_err = self.fr_err + 1
                    continue

                self.state = 'sizemsb'
            elif (self.state == 'sizemsb'):
                self.bufmsb = dint
                self.state = 'sizelsb'
            elif (self.state == 'sizelsb'):
                self.dataleft = self.bufmsb << 8 | \
                                dint
                self.state = 'data'
            elif (self.state == 'data'):
                self.buffer.append(dint)
                self.dataleft = self.dataleft - 1
                if (self.dataleft == 0):
                    self.state = 'cksum'
            elif (self.state == 'cksum'):
                pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff
                rxcksum = dint
                if (pktsum + rxcksum != 0xff):
                    self.ck_err = self.ck_err + 1
                    print "Checksum error, got 0x%02x, expected 0x%02x" % \
                          (rxcksum, 0xff - pktsum)
                else:
                    print "Got a packet - " + str(self.buffer)
                    self.state = 'init'
                    self.buffer = []
            else:
                print "Invalid state %s! Resetting" % (self.state)
                self.state = 'init'