# HG changeset patch # User darius@inchoate.localdomain # Date 1193727400 -37800 # Node ID b5d8ec0ffd21bdceb78e2aeced54ce6b92ab152f # Parent bdcdd8380d94b6240c76a5e3bdda9012eab689dc Lots of changes.. - Create a class for each packet type, needs more fleshing out. - Make sure we reset state when we get a checksum error. - Add some more test cases. - OOify things a bit more. diff -r bdcdd8380d94 -r b5d8ec0ffd21 zb.py --- a/zb.py Sun Oct 28 20:24:28 2007 +1030 +++ b/zb.py Tue Oct 30 17:26:40 2007 +1030 @@ -1,4 +1,175 @@ -import serial, struct +import serial + +class PktBase(object): + def __init__(self, data): + self.data = data + print "Constructing " + self.__class__.__name__ + + def __repr__(self): + return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" + + #def __str__(self): + # return "%s: %s" % (self.PKT_DESC, str(self.data)) + +class AT_Cmd(PktBase): + PKT_TYPE = 0x08 + PKT_DESC = "AT Command" + +class AT_Cmd_Queue(PktBase): + PKT_TYPE = 0x09 + PKT_DESC = "AT Command (queued)" + +class AT_Response(PktBase): + PKT_TYPE = 0x88 + PKT_DESC = "AT Command response" + +class Modem_Status(PktBase): + PKT_TYPE = 0x8a + PKT_DESC = "Modem Status" + +class RX_64_Bit(PktBase): + PKT_TYPE = 0x80 + PKT_DESC = "RX Packet: 64 bit address" + +class RX_16_Bit(PktBase): + PKT_TYPE = 0x81 + PKT_DESC = "RX Packet: 16 bit address" + +class RXIO_64_Bit(PktBase): + PKT_TYPE = 0x82 + PKT_DESC = "RXIO Packet: 64 bit address" + +class RXIO_16_Bit(PktBase): + PKT_TYPE = 0x83 + PKT_DESC = "RXIO Packet: 16 bit address" + + def __str__(self): + sender = self.data[0] << 8 | self.data[1] + rssi = -1 * self.data[2] + flags = self.data[3] + nsamples = self.data[4] + mask = self.data[5] << 8 | self.data[6] + rtn = "0x%04x (%ddBm) -> %d samples, mask 0x%04x" % (sender, rssi, nsamples, mask) + # Any DIO lines enabled? + if (mask | 0x01ff): + rtn = rtn + ", DIO - 0x%03x" % (self.data[7] << 8 | self.data[8]) + offs = 9 + else: + offs = 7 + + # Any ADC lines enabled? + if (mask | 0x7e00): + for i in range(6): + if (mask & 1 << (i + 9)): + rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | + self.data[offs + 1]) + offs = offs + 2 + + return rtn + +class TX_64_Bit(PktBase): + PKT_TYPE = 0x00 + PKT_DESC = "TX Packet: 64 bit address" + +class TX_16_Bit(PktBase): + PKT_TYPE = 0x00 + PKT_DESC = "TX Packet: 16 bit address" + +class TX_Status(PktBase): + PKT_TYPE = 0x89 + PKT_DESC = "TX Status" + +class Packets(object): + PKT_CLASSES = [AT_Cmd, AT_Cmd_Queue, AT_Response, Modem_Status, RX_64_Bit, + RX_16_Bit, RXIO_64_Bit, RXIO_16_Bit, TX_64_Bit, TX_16_Bit, + TX_Status] + + def Build(data): + for p in Packets.PKT_CLASSES: + if (p.PKT_TYPE == data[0]): + return(p(data[1:])) + + Build = staticmethod(Build) + + def Encapsulate(data): + pktsum = reduce(lambda x, y: x + y, data) & 0xff + pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum] + return(map(chr, pkt)) + + Encapsulate = staticmethod(Encapsulate) + + def __init__(self): + self.buffer = [] + self.state = 'init' + self.packets = [] + + self.bufmsb = 0 + self.dataleft = 0 + + self.fr_err = 0 # Framing error + self.ck_err = 0 # Checksum error + self.rx_cnt = 0 # Packet count + + self.pktq = [] + + def getdata(self, s): + l = [] + while (1): + a = s.read() + if (a == ''): + break + l.append(a) + + return self.process(l) + + def process(self, data): + pktcount = 0 + for d in data: + dord = ord(d) + if (self.state == 'init'): + if (d != '\x7e'): + print "Framing error, got 0x%02x, expected 0x7f" % (dord) + self.fr_err = self.fr_err + 1 + continue + + self.state = 'sizemsb' + elif (self.state == 'sizemsb'): + self.bufmsb = dord + self.state = 'sizelsb' + elif (self.state == 'sizelsb'): + self.dataleft = self.bufmsb << 8 | \ + dord + self.state = 'data' + elif (self.state == 'data'): + self.buffer.append(dord) + self.dataleft = self.dataleft - 1 + if (self.dataleft == 0): + self.state = 'cksum' + elif (self.state == 'cksum'): + pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff + rxcksum = dord + self.state = 'init' + if (pktsum + rxcksum != 0xff): + self.buffer = [] + self.ck_err = self.ck_err + 1 + print "Checksum error, got 0x%02x, expected 0x%02x" % \ + (rxcksum, 0xff - pktsum) + else: + print "Got a packet - " + str(self.buffer) + p = Packets.Build(self.buffer) + self.pktq.append(p) + self.buffer = [] + pktcount = pktcount + 1 + self.rx_cnt = self.rx_cnt + 1 + else: + print "Invalid state %s! Resetting" % (self.state) + self.state = 'init' + + return pktcount + +#for c in dir(): +# if (issubclass(c, PktBase)): +# print .. s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \ stopbits=1, rtscts=0) @@ -6,67 +177,11 @@ #s.write('+++') #s.readline(eol='\r') -testdata = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x07', '@', '~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] - -def readpkt(s): - l = [] - while (1): - a = s.read() - if (a == ''): - break - l.append(a) - - return l - -class Packet: - def __init__(self): - pass - -class unPacker: - def __init__(self): - self.buffer = [] - self.state = 'init' - self.packets = [] +goodtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8'] +badtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0e', '8'] +adctest = ['~', '\x00', '\x0c', '\x83', '\x00', '\x05', '$', '\x00', '\x01', '\x02', '\x0e', '\x00', '\x0e', '\x03', '\xff', '2' ] +up = Packets() +up.process(goodtest) +#up.process(badtest) +p = up.pktq.pop(0) - self.bufmsb = 0 - self.dataleft = 0 - - self.fr_err = 0 # Framing error - self.ck_err = 0 # Checksum error - - def process(self, data): - for d in data: - dint = struct.unpack('B', d)[0] - if (self.state == 'init'): - if (d != '\x7e'): - print "Framing error, got 0x%02x, expected 0x7f" % (dint) - self.fr_err = self.fr_err + 1 - continue - - self.state = 'sizemsb' - elif (self.state == 'sizemsb'): - self.bufmsb = dint - self.state = 'sizelsb' - elif (self.state == 'sizelsb'): - self.dataleft = self.bufmsb << 8 | \ - dint - self.state = 'data' - elif (self.state == 'data'): - self.buffer.append(dint) - self.dataleft = self.dataleft - 1 - if (self.dataleft == 0): - self.state = 'cksum' - elif (self.state == 'cksum'): - pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff - rxcksum = dint - if (pktsum + rxcksum != 0xff): - self.ck_err = self.ck_err + 1 - print "Checksum error, got 0x%02x, expected 0x%02x" % \ - (rxcksum, 0xff - pktsum) - else: - print "Got a packet - " + str(self.buffer) - self.state = 'init' - self.buffer = [] - else: - print "Invalid state %s! Resetting" % (self.state) - self.state = 'init'