changeset 4:ed7abe6f59c2

Many fixes (again) - Worked out how to auto-discover packet type classes. - Refactor receive class to make it usable for 16 & 64 bit modes. - Use property() to map to data values. - Add another test case, document what the others look like.
author darius@inchoate.localdomain
date Wed, 31 Oct 2007 20:01:54 +1030
parents 7bf4d4265339
children 5d5963d542bc
files zb.py
diffstat 1 files changed, 127 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/zb.py	Tue Oct 30 21:29:00 2007 +1030
+++ b/zb.py	Wed Oct 31 20:01:54 2007 +1030
@@ -1,21 +1,50 @@
-import serial
+import serial, inspect
 
 class PktBase(object):
-    def __init__(self, data):
+    def __init__(self, data = []):
         self.data = data
         print "Constructing " + self.__class__.__name__
 
     def __repr__(self):
         return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")"
 
-    #def __str__(self):
-    #    return "%s: %s" % (self.PKT_DESC, str(self.data))
+    def Encapsulate(self):
+        return Packets.Encapsulate([self.PKT_TYPE] + self.data)
 
+    def resize(self, dlen):
+        """Ensure the data list can hold at least len elements (0 fill)"""
+        if (len(self.data) < dlen):
+            self.data = (self.data + [0] * dlen)[0:dlen]
+    
 class AT_Cmd(PktBase):
     PKT_TYPE = 0x08
     PKT_DESC = "AT Command"
 
-class AT_Cmd_Queue(PktBase):
+    def __init__(self, *args):
+        if (len(args) == 1):
+            super(AT_Cmd, self).__init__(args[0])
+        elif (len(args) == 2):
+            super(AT_Cmd, self).__init__([])
+            self.cmd = args[0]
+            if (args[1] != None):
+                self.cmdarg = args[1]
+        else:
+            raise TypeError("__init__ takes 1 list of ordinals or 2 strings")
+
+    def setcmd(self, value):
+        self.resize(2)
+        self.data[0] = ord(value[0])
+        self.data[1] = ord(value[1])
+    cmd = property(lambda s: chr(s.data[0]) + chr(s.data[1]), setcmd)
+
+    def setcmdarg(self, value):
+        self.resize(2 + len(value))
+        self.data = self.data[0:2] + map(ord, value)
+    def delcmdarg(self):
+        self.data = self.data[0:2]
+    cmdarg = property(lambda s: map(chr, s.data[2:]), setcmdarg, delcmdarg)
+        
+class AT_Cmd_Queue(AT_Cmd):
     PKT_TYPE = 0x09
     PKT_DESC = "AT Command (queued)"
 
@@ -27,69 +56,111 @@
     PKT_TYPE = 0x8a
     PKT_DESC = "Modem Status"
 
-class RX_64_Bit(PktBase):
-    PKT_TYPE = 0x80
-    PKT_DESC = "RX Packet: 64 bit address"
-
 class RX_16_Bit(PktBase):
     PKT_TYPE = 0x81
     PKT_DESC = "RX Packet: 16 bit address"
+    ADDR_SIZE = 2
+    
+    def setsender(self, value):
+        self.resize(self.ADDR_SIZE)
+        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
+            self.data[i] = (value & (0xff << j)) >> j
 
-class RXIO_64_Bit(PktBase):
-    PKT_TYPE = 0x82
-    PKT_DESC = "RXIO Packet: 64 bit address"
+    def getsender(self):
+        value = 0
+        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
+            value |= self.data[i] << j
+        return value
+    sender = property(getsender, setsender)
 
-class RXIO_16_Bit(PktBase):
+    def setrssi(self, value):
+        self.resize(self.ADDR_SIZE + 1)
+        self.data[self.ADDR_SIZE] = -1 * value
+    rssi = property(lambda s: -1 * s.data[s.ADDR_SIZE], setrssi)
+    
+    def setflags(self, value):
+        self.resize(self.ADDR_SIZE + 2)
+        self.data[self.ADDR_SIZE + 1] = value
+    flags = property(lambda s: s.data[s.ADDR_SIZE + 1], setflags)
+
+class RX_64_Bit(RX_16_Bit):
+    PKT_TYPE = 0x80
+    PKT_DESC = "RX Packet: 64 bit address"
+    ADDR_SIZE = 8
+
+class RXIO_16_Bit(RX_16_Bit):
     PKT_TYPE = 0x83
     PKT_DESC = "RXIO Packet: 16 bit address"
 
+    def setnsamples(self, value):
+        self.resize(self.ADDR_SIZE + 3)
+        self.data[self.ADDR_SIZE + 2] = value
+    nsamples = property(lambda s: s.data[s.ADDR_SIZE + 2], setnsamples)
+
+    def setmask(self, value):
+        self.resize(self.ADDR_SIZE + 5)
+        self.data[self.ADDR_SIZE + 3] = (value & 0xff00) >> 8
+        self.data[self.ADDR_SIZE + 4] = value & 0xff
+    mask = property(lambda s: s.data[s.ADDR_SIZE + 3] << 8 | s.data[s.ADDR_SIZE + 4], setmask)
+
     def __str__(self):
-        sender = self.data[0] << 8 | self.data[1]
-        rssi = -1 * self.data[2]
-        flags = self.data[3]
-        nsamples = self.data[4]
-        mask = self.data[5] << 8 | self.data[6]
-        rtn = "0x%04x (%ddBm) -> %d samples, mask 0x%04x" % (sender, rssi, nsamples, mask)
+        rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender,
+                                                             self.rssi, self.nsamples, self.mask)
         # Any DIO lines enabled?
-        if (mask | 0x01ff):
-            rtn = rtn + ", DIO - 0x%03x" % (self.data[7] << 8 | self.data[8])
-            offs = 9
+        if (self.mask | 0x01ff):
+            rtn = rtn + ", DIO - 0x%03x" % (self.data[self.ADDR_SIZE + 5] << 8 |
+                                            self.data[self.ADDR_SIZE + 6])
+            offs = self.ADDR_SIZE + 7
         else:
-            offs = 7
+            offs = self.ADDR_SIZE + 5
             
         # Any ADC lines enabled?
-        if (mask | 0x7e00):
+        if (self.mask | 0x7e00):
             for i in range(6):
-                if (mask & 1 << (i + 9)):
+                if (self.mask & 1 << (i + 9)):
                     rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 |
                                                       self.data[offs + 1])
                     offs = offs + 2
 
         return rtn
 
+class RXIO_64_Bit(RX_16_Bit):
+    PKT_TYPE = 0x82
+    PKT_DESC = "RXIO Packet: 64 bit address"
+    ADDR_SIZE = 8
+    
+class TX_16_Bit(PktBase):
+    PKT_TYPE = 0x00
+    PKT_DESC = "TX Packet: 16 bit address"
+
 class TX_64_Bit(PktBase):
     PKT_TYPE = 0x00
     PKT_DESC = "TX Packet: 64 bit address"
 
-class TX_16_Bit(PktBase):
-    PKT_TYPE = 0x00
-    PKT_DESC = "TX Packet: 16 bit address"
-
 class TX_Status(PktBase):
     PKT_TYPE = 0x89
     PKT_DESC = "TX Status"
 
 class Packets(object):
-    PKT_CLASSES = [AT_Cmd, AT_Cmd_Queue, AT_Response, Modem_Status, RX_64_Bit,
-                   RX_16_Bit, RXIO_64_Bit, RXIO_16_Bit, TX_64_Bit, TX_16_Bit,
-                   TX_Status]
+    PKT_CLASSES = None
     
-    def Build(data):
-        for p in Packets.PKT_CLASSES:
+    def Build(self, data):
+        if (self.PKT_CLASSES == None):
+            m = inspect.getmodule(self)
+            # Generate list of objects from their names
+            mobjs = map(lambda n: m.__dict__[n], m.__dict__)
+            # Find all the classes
+            pktclasses = filter(inspect.isclass, mobjs)
+            # Find all subclasses of PktBase (but not PktBase itself)
+            pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses)
+            self.PKT_CLASSES = pktclasses
+            
+        for p in self.PKT_CLASSES:
             if (p.PKT_TYPE == data[0]):
                 return(p(data[1:]))
-    
-    Build = staticmethod(Build)
+
+        raise ValueError("Unknown packet type 0x%02x" % (data[0]))
+    Build = classmethod(Build)
 
     def Encapsulate(data):
         pktsum = reduce(lambda x, y: x + y, data) & 0xff
@@ -99,6 +170,7 @@
     Encapsulate = staticmethod(Encapsulate)
 
     def __init__(self):
+        print str(inspect.getmodule(self))
         self.buffer = []
         self.state = 'init'
         self.packets = []
@@ -125,33 +197,32 @@
     def process(self, data):
         pktcount = 0
         for d in data:
-            dord = ord(d)
             if (self.state == 'init'):
                 if (d != '\x7e'):
-                    print "Framing error, got 0x%02x, expected 0x7f" % (dord)
-                    self.fr_err = self.fr_err + 1
+                    print "Framing error, got 0x%02x, expected 0x7f" % (ord(d))
+                    self.fr_err += 1
                     continue
                 
                 self.state = 'sizemsb'
             elif (self.state == 'sizemsb'):
-                self.bufmsb = dord
+                self.bufmsb = ord(d)
                 self.state = 'sizelsb'
             elif (self.state == 'sizelsb'):
                 self.dataleft = self.bufmsb << 8 | \
-                                dord
+                                ord(d)
                 self.state = 'data'
             elif (self.state == 'data'):
-                self.buffer.append(dord)
+                self.buffer.append(ord(d))
                 self.dataleft = self.dataleft - 1
                 if (self.dataleft == 0):
                     self.state = 'cksum'
             elif (self.state == 'cksum'):
                 pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff
-                rxcksum = dord
+                rxcksum = ord(d)
                 self.state = 'init'
                 if (pktsum + rxcksum != 0xff):
                     self.buffer = []
-                    self.ck_err = self.ck_err + 1
+                    self.ck_err += 1
                     print "Checksum error, got 0x%02x, expected 0x%02x" % \
                           (rxcksum, 0xff - pktsum)
                 else:
@@ -159,8 +230,8 @@
                     p = Packets.Build(self.buffer)
                     self.pktq.append(p)
                     self.buffer = []
-                    pktcount = pktcount + 1
-                    self.rx_cnt = self.rx_cnt + 1
+                    pktcount += 1
+                    self.rx_cnt += 1
             else:
                 print "Invalid state %s! Resetting" % (self.state)
                 self.state = 'init'
@@ -177,9 +248,19 @@
 #s.write('+++')
 #s.readline(eol='\r')
 
+
+# 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f
 goodtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8']
+
+# Checksum error
 badtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0e', '8']
+
+#0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff
 adctest = ['~', '\x00', '\x0c', '\x83', '\x00', '\x05', '$', '\x00', '\x01', '\x02', '\x0e', '\x00', '\x0e', '\x03', '\xff', '2' ]
+
+# Exception
+badpkttypetest = ['~', '\x00', '\x03', '\x0a', 'V', 'L', 'S']
+
 up = Packets()
 up.process(goodtest)
 up.process(badtest)