# HG changeset patch # User darius@inchoate.localdomain # Date 1194267952 -37800 # Node ID d147529ad2db4f4b71ed0e1b8c4729c570001f1a # Parent 9f0808b1345423535e8375c30fced702d82198ca Switch to only reading RX pkts and writing TX pkts to make things simpler. Add AT reply test case. diff -r 9f0808b13454 -r d147529ad2db zb.py --- a/zb.py Sun Nov 04 21:10:21 2007 +1030 +++ b/zb.py Mon Nov 05 23:35:52 2007 +1030 @@ -1,22 +1,46 @@ import serial, inspect +def easyord(i): + if (type(i) != type(str())): + return i + else: + return ord(i) + class PktBase(object): + PKT_MAXLEN = 2 ** 16 + def __init__(self, data = []): - self.data = data + self._data = data print "Constructing " + self.__class__.__name__ - def __repr__(self): - return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")" - def Encapsulate(self): return Packets.Encapsulate([self.PKT_TYPE] + self.data) def resize(self, dlen): """Ensure the data list can hold at least len elements (0 fill)""" - if (len(self.data) < dlen): - self.data = (self.data + [0] * dlen)[0:dlen] + if (len(self._data) < dlen): + self._data = (self._data + [0] * dlen)[0:dlen] + + def _checklist(list, min = 0, max = 255, maxlen = None): + if (maxlen != None and len(list) > maxlen): + raise ValueError("must have %d elements" % (maxlen)) + + for i in xrange(len(list)): + if (easyord(list[i]) < min or easyord(list[i]) > max): + raise ValueError("element %d (= %d) out of range must be between %d and %d inclusive" % + (i, ord(list[i]), min, max)) + _checklist = staticmethod(_checklist) + +class TXPkts(PktBase): + """Base class for all packets that go to the module""" + def setframeid(self, value): + if (value < 0 or value > 255): + raise ValueError("FrameID must be 0-255") + self._frameid = value + frameid = property(lambda s: s._frameid, setframeid) -class AT_Cmd(PktBase): + +class AT_Cmd(TXPkts): PKT_TYPE = 0x08 PKT_DESC = "AT Command" @@ -33,22 +57,25 @@ self.cmdarg = args[1] else: raise TypeError("__init__ takes 1 list of ordinals, 1 string, or 2 strings") - - self.data[0] = 1 # XXX: could be smarter about Frame ID - + self.frameid = 0 + self.cmdarg = [] + def setcmd(self, value): - self.resize(3) - self.data[1] = ord(value[0]) - self.data[2] = ord(value[1]) - cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), setcmd) + if (len(value) != 2): + raise ValueError("must have 2 elements") + self._checklist(value, ord('0'), ord('z')) + self._cmd = value + cmd = property(lambda s: s._cmd, setcmd) def setcmdarg(self, value): - self.resize(3 + len(value)) - self.data = self.data[0:3] + map(ord, value) - def delcmdarg(self): - self.data = self.data[0:3] - cmdarg = property(lambda s: map(chr, s.data[3:]), setcmdarg, delcmdarg) - + self._checklist(value, maxlen = self.PKT_MAXLEN - 3) + self._cmdarg = value + cmdarg = property(lambda s: s._cmdarg, setcmdarg) + + def getdata(self): + return([self.frameid] + map(ord, self.cmd) + map(easyord, self.cmdarg)) + data = property(getdata) + class AT_Cmd_Queue(AT_Cmd): PKT_TYPE = 0x09 PKT_DESC = "AT Command (queued)" @@ -56,10 +83,10 @@ class AT_Response(PktBase): PKT_TYPE = 0x88 PKT_DESC = "AT Command response" - frameid = property(lambda s: s.data[0], None) - cmd = property(lambda s: chr(s.data[1]) + chr(s.data[2]), None) - statusOK = property(lambda s: s.data[3] == 0, None) - payload = property(lambda s: s.data[4:], None) + frameid = property(lambda s: s._data[0], None) + cmd = property(lambda s: chr(s._data[1]) + chr(s._data[2]), None) + statusOK = property(lambda s: s._data[3] == 0, None) + payload = property(lambda s: s._data[4:], None) class Modem_Status(PktBase): PKT_TYPE = 0x8a @@ -77,15 +104,15 @@ def getsender(self): value = 0 for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): - value |= self.data[i] << j + value |= self._data[i] << j return value sender = property(getsender, None) - rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], None) + rssi = property(lambda s: -1 * s._data[s.ADDR_SIZE], None) - flags = property(lambda s: s.data[s.ADDR_SIZE + 1], None) + flags = property(lambda s: s._data[s.ADDR_SIZE + 1], None) - payload = property(lambda s: s.data[s.ADDR_SIZE + 2:], None) + payload = property(lambda s: s._data[s.ADDR_SIZE + 2:], None) class RX_64_Bit(RX_16_Bit): PKT_TYPE = 0x80 @@ -96,27 +123,17 @@ PKT_TYPE = 0x83 PKT_DESC = "RXIO Packet: 16 bit address" - def setnsamples(self, value): - self.resize(self.ADDR_SIZE + 3) - self.data[self.ADDR_SIZE + 2] = value + nsamples = property(lambda s: s._data[s.ADDR_SIZE + 2]) - def getnsamples(self): - return self.data[self.ADDR_SIZE + 2] - nsamples = property(getnsamples, setnsamples) - - def setmask(self, value): - self.resize(self.ADDR_SIZE + 5) - self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8 - self.data[self.ADDR_SIZE + 4] = value & 0xff - mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask) + mask = property(lambda s: s._data[s.ADDR_SIZE + 3] << 8 | s._data[s.ADDR_SIZE + 4]) def __str__(self): rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender, self.rssi, self.nsamples, self.mask) # Any DIO lines enabled? if (self.mask | 0x01ff): - rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 | - self.data[self.ADDR_SIZE + 6]) + rtn = rtn + ", DIO - 0x%03x" % (self._data[self.ADDR_SIZE + 5] << 8 | + self._data[self.ADDR_SIZE + 6]) offs = self.ADDR_SIZE + 7 else: offs = self.ADDR_SIZE + 5 @@ -125,10 +142,9 @@ if (self.mask | 0x7e00): for i in range(6): if (self.mask & 1 << (i + 9)): - rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 | - self.data[offs + 1]) + rtn = rtn + ", ADC%d - 0x%02x" % (i, self._data[offs] << 8 | + self._data[offs + 1]) offs = offs + 2 - return rtn class RXIO_64_Bit(RXIO_16_Bit): @@ -136,18 +152,18 @@ PKT_DESC = "RXIO Packet: 64 bit address" ADDR_SIZE = 8 -class TX_16_Bit(RX_64_Bit): +class TX_16_Bit(TXPkts): PKT_TYPE = 0x01 PKT_DESC = "TX Packet: 16 bit address" ADDR_SIZE = 2 - + FLG_DISABLE_ACK = 0x01 + FLG_BCAST_PANID = 0x04 + PKT_MAX_PAYLOAD = 100 + def __init__(self, *args): if (len(args) == 1): - if (type(args[0]) == type(int())): - super(TX_16_Bit, self).__init__([]) - self.recipient = args[0] - else: - super(TX_16_Bit, self).__init__(args[0]) + super(TX_16_Bit, self).__init__() + self.recipient = args[0] elif (len(args) == 2): super(TX_16_Bit, self).__init__([]) self.recipient = args[0] @@ -155,28 +171,38 @@ else: raise TypeError("__init__ takes 1 list of ordinals or 2 strings") - self.resize(self.ADDR_SIZE + 2) - self.data[0] = 1 # XXX: could be smarter about Frame ID - self.data[3] = 0 # XXX: should be able to set flags + self.frameid = 0 + self.flags = 0 + self.payload = [] + + def setrecipient(self, value): + if (value < 0 or value > 2 ** (self.ADDR_SIZE * 8)): + raise ValueError("value out of range must be between 0 and %d" % (2 ** self.ADDR_SIZE)) - def getrecipient(self): - value = 0 - for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): - value |= self.data[i + 1] << j - return value - - def setrecipient(self, value): - self.resize(self.ADDR_SIZE + 1) - self.data[0] = 1 # XXX: could be smarter about Frame ID - for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): - self.data[i + 1] = (value & (0xff << j)) >> j - recipient = property(getrecipient, setrecipient) + self._recipient = value + recipient = property(lambda s: s._recipient, setrecipient) + + def setflags(self, value): + if (value < 0 or value > 255): + raise ValueError("Value must be between 0 and 255 inclusive") + + self._flags = value + flags = property(lambda s: s._flags, setflags) def setpayload(self, value): - self.resize(self.ADDR_SIZE + 2) - self.data[self.ADDR_SIZE + 2:] = value - payload = property(lambda s: s.data[self.ADDR_SIZE + 2:], setpayload) - + self._checklist(value, maxlen = self.PKT_MAX_PAYLOAD) + self._payload = value + payload = property(lambda s: s._payload, setpayload) + + def getdata(self): + data = [self.frameid] + for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)): + data.append((self.recipient & (0xff << j)) >> j) + data.append(self.flags) + data.extend(map(easyord, self.payload)) + return(data) + data = property(getdata) + class TX_64_Bit(TX_16_Bit): PKT_TYPE = 0x00 PKT_DESC = "TX Packet: 64 bit address" @@ -186,9 +212,9 @@ PKT_TYPE = 0x89 PKT_DESC = "TX Status" statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged'] - frameid = property(lambda s: s.data[0], None) - status = property(lambda s: s.data[1], None) - statusMsg = property(lambda s: s.statusTxt[s.data[1]], None) + frameid = property(lambda s: s._data[0], None) + status = property(lambda s: s._data[1], None) + statusMsg = property(lambda s: s.statusTxt[s._data[1]], None) class Packets(object): PKT_CLASSES = None @@ -225,7 +251,7 @@ self.packets = [] self.bufmsb = 0 - self.dataleft = 0 + self._dataleft = 0 self.fr_err = 0 # Framing error self.ck_err = 0 # Checksum error @@ -314,6 +340,9 @@ # Exception badpkttypetest = [126, 0, 3, 10, 86, 76, 83] +# Frame ID = 0, Cmd = 'VL', Status = OK, Value = 'VL Result' +atreply = [126, 0, 14, 136, 0, 86, 76, 0, 86, 76, 32, 82, 101, 115, 117, 108, 116, 148] + up = Packets(s) up.process(goodtest) up.process(badtest)