view zb.py @ 10:4c91fdfc862e

- Further changes to make things cleaner (ie RX packets are "read only" etc). - Add helper routine to poll the serial port and print any packets seen. - Make opening the serial port optional.
author darius@inchoate.localdomain
date Wed, 07 Nov 2007 17:10:08 +1030
parents d147529ad2db
children 75f785a09e2e
line wrap: on
line source

import serial, inspect, time

def easyord(i):
    if (type(i) != type(str())):
        return i
    else:
        return ord(i)

class PktBase(object):
    PKT_MAXLEN = 2 ** 16
    
    def __init__(self):
        print "Constructing " + self.__class__.__name__

    def Encapsulate(self):
        return Packets.Encapsulate([self.PKT_TYPE] + self.data)

    def resize(self, dlen):
        """Ensure the data list can hold at least len elements (0 fill)"""
        if (len(self._data) < dlen):
            self._data = (self._data + [0] * dlen)[0:dlen]

    def _checklist(list, min = 0, max = 255, maxlen = None):
        if (maxlen != None and len(list) > maxlen):
            raise ValueError("must have %d elements" % (maxlen))
            
        for i in xrange(len(list)):
            if (easyord(list[i]) < min or easyord(list[i]) > max):
                raise ValueError("element %d (= %d) out of range must be between %d and %d inclusive" %
                                 (i, ord(list[i]), min, max))
    _checklist = staticmethod(_checklist)

class TXPkts(PktBase):
    """Base class for all packets that go to the module"""

    def setframeid(self, value):
        if (value < 0 or value > 255):
            raise ValueError("FrameID must be 0-255")
        self._frameid = value
    frameid =  property(lambda s: s._frameid, setframeid)
    
    
class AT_Cmd(TXPkts):
    PKT_TYPE = 0x08
    PKT_DESC = "AT Command"

    def __init__(self, cmd = None, cmdarg = None):
        super(AT_Cmd, self).__init__()
        if (cmd != None):
            self.cmd = cmd
        if (cmdarg != None):
            self.cmdarg = cmdarg

        self.frameid = 0
        self.cmdarg = []
        
    def setcmd(self, value):
        if (len(value) != 2):
            raise ValueError("must have 2 elements")
        self._checklist(value, ord('0'), ord('z'))
        self._cmd = value
    cmd = property(lambda s: s._cmd, setcmd)

    def setcmdarg(self, value):
        self._checklist(value, maxlen = self.PKT_MAXLEN - 3)
        self._cmdarg = value
    cmdarg = property(lambda s: s._cmdarg, setcmdarg)

    def getdata(self):
        return([self.frameid] + map(ord, self.cmd) + map(easyord, self.cmdarg))
    data = property(getdata)

class AT_Cmd_Queue(AT_Cmd):
    PKT_TYPE = 0x09
    PKT_DESC = "AT Command (queued)"

class AT_Response(PktBase):
    PKT_TYPE = 0x88
    PKT_DESC = "AT Command response"
    frameid = property(lambda s: s._data[0], None)
    cmd = property(lambda s: chr(s._data[1]) + chr(s._data[2]), None)
    statusOK = property(lambda s: s._data[3] == 0, None)
    payload = property(lambda s: s._data[4:], None)

class Modem_Status(PktBase):
    PKT_TYPE = 0x8a
    PKT_DESC = "Modem Status"

class RX_16_Bit(PktBase):
    PKT_TYPE = 0x81
    PKT_DESC = "RX Packet: 16 bit address"
    ADDR_SIZE = 2
    
    def __init__(self, data = []):
        super(RX_16_Bit, self).__init__()
        self._data = data

    def __str__(self):
        return "0x%0*x (%ddBm) -> %s" % (self.ADDR_SIZE * 2, self.sender,
                                         self.rssi, str(self.payload))
        
    def getsender(self):
        value = 0
        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
            value |= self._data[i] << j
        return value
    sender = property(getsender, None)

    rssi = property(lambda s: -1 * s._data[s.ADDR_SIZE], None)

    flags = property(lambda s: s._data[s.ADDR_SIZE + 1], None)

    payload = property(lambda s: s._data[s.ADDR_SIZE + 2:], None)
                       
class RX_64_Bit(RX_16_Bit):
    PKT_TYPE = 0x80
    PKT_DESC = "RX Packet: 64 bit address"
    ADDR_SIZE = 8

class RXIO_16_Bit(RX_16_Bit):
    PKT_TYPE = 0x83
    PKT_DESC = "RXIO Packet: 16 bit address"

    nsamples = property(lambda s: s._data[s.ADDR_SIZE + 2])

    mask = property(lambda s: s._data[s.ADDR_SIZE + 3] << 8 | s._data[s.ADDR_SIZE + 4])

    def __str__(self):
        rtn = "0x%0*x (%ddBm) -> %d samples, mask 0x%04x" % (self.ADDR_SIZE * 2, self.sender,
                                                             self.rssi, self.nsamples, self.mask)
        # Any DIO lines enabled?
        if (self.mask | 0x01ff):
            rtn = rtn + ", DIO - 0x%03x" % (self._data[self.ADDR_SIZE + 5] << 8 |
                                            self._data[self.ADDR_SIZE + 6])
            offs = self.ADDR_SIZE + 7
        else:
            offs = self.ADDR_SIZE + 5
            
        # Any ADC lines enabled?
        if (self.mask | 0x7e00):
            for i in range(6):
                if (self.mask & 1 << (i + 9)):
                    rtn = rtn + ", ADC%d - 0x%02x" % (i, self._data[offs] << 8 |
                                                      self._data[offs + 1])
                    offs = offs + 2
        return rtn

class RXIO_64_Bit(RXIO_16_Bit):
    PKT_TYPE = 0x82
    PKT_DESC = "RXIO Packet: 64 bit address"
    ADDR_SIZE = 8
    
class TX_16_Bit(TXPkts):
    PKT_TYPE = 0x01
    PKT_DESC = "TX Packet: 16 bit address"
    ADDR_SIZE = 2
    FLG_DISABLE_ACK = 0x01
    FLG_BCAST_PANID = 0x04
    PKT_MAX_PAYLOAD = 100
    
    def __init__(self, *args):
        if (len(args) == 1):
            super(TX_16_Bit, self).__init__()
            self.recipient = args[0]
        elif (len(args) == 2):
            super(TX_16_Bit, self).__init__([])
            self.recipient = args[0]
            self.payload = args[1]
        else:
            raise TypeError("__init__ takes 1 list of ordinals or 2 strings")

        self.frameid = 0
        self.flags = 0
        self.payload = []
        
    def setrecipient(self, value):
        if (value < 0 or value > 2 ** (self.ADDR_SIZE * 8)):
            raise ValueError("value out of range must be between 0 and %d" % (2 ** self.ADDR_SIZE))
        
        self._recipient = value
    recipient = property(lambda s: s._recipient, setrecipient)

    def setflags(self, value):
        if (value < 0 or value > 255):
            raise ValueError("Value must be between 0 and 255 inclusive")

        self._flags = value
    flags = property(lambda s: s._flags, setflags)

    def setpayload(self, value):
        self._checklist(value, maxlen = self.PKT_MAX_PAYLOAD)
        self._payload = value
    payload = property(lambda s: s._payload, setpayload)

    def getdata(self):
        data = [self.frameid]
        for i, j in zip(reversed(range(self.ADDR_SIZE)), range(0, self.ADDR_SIZE * 8, 8)):
            data.append((self.recipient & (0xff << j)) >> j)
        data.append(self.flags)
        data.extend(map(easyord, self.payload))
        return(data)
    data = property(getdata)

class TX_64_Bit(TX_16_Bit):
    PKT_TYPE = 0x00
    PKT_DESC = "TX Packet: 64 bit address"
    ADDR_SIZE = 8

class TX_Status(PktBase):
    PKT_TYPE = 0x89
    PKT_DESC = "TX Status"
    statusTxt = ['OK', 'No Ack', 'CCA failure', 'Purged']
    frameid = property(lambda s: s._data[0], None)
    status = property(lambda s: s._data[1], None)
    statusMsg = property(lambda s: s.statusTxt[s._data[1]], None)
    
class Packets(object):
    PKT_CLASSES = None
    
    def Build(self, data):
        if (self.PKT_CLASSES == None):
            m = inspect.getmodule(self)
            # Generate list of objects from their names
            mobjs = map(lambda n: m.__dict__[n], m.__dict__)
            # Find all the classes
            pktclasses = filter(inspect.isclass, mobjs)
            # Find all subclasses of PktBase (but not PktBase itself)
            pktclasses = filter(lambda s: issubclass(s, m.PktBase) and s != m.PktBase, pktclasses)
            self.PKT_CLASSES = pktclasses
            
        for p in self.PKT_CLASSES:
            if (p.PKT_TYPE == data[0]):
                return(p(data[1:]))

        raise ValueError("Unknown packet type 0x%02x" % (data[0]))
    Build = classmethod(Build)

    def Encapsulate(data):
        pktsum = reduce(lambda x, y: x + y, data) & 0xff
        pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum]
        return(map(chr, pkt))
    
    Encapsulate = staticmethod(Encapsulate)

    def __init__(self, s = None):
        print str(inspect.getmodule(self))
        self.buffer = []
        self.state = 'init'
        self.packets = []
        
        self.bufmsb = 0
        self._dataleft = 0
        
        self.fr_err = 0 # Framing error
        self.ck_err = 0 # Checksum error
        self.rx_cnt = 0 # Packet count
        
        self.pktq = []
        self.s = s
        
    def writedata(self, data):
        self.s.write("".join(map(str, data)))
                
    def getdata(self):
        l = []
        while (1):
            a = self.s.read()
            if (a == ''):
                break
            l.append(ord(a))
    
        return self.process(l)
    
    def process(self, data):
        pktcount = 0
        for d in data:
            if (self.state == 'init'):
                if (d != 0x7e):
                    print "Framing error, got 0x%02x, expected 0x7e" % (d)
                    self.fr_err += 1
                    continue
                
                self.state = 'sizemsb'
            elif (self.state == 'sizemsb'):
                self.bufmsb = d
                self.state = 'sizelsb'
            elif (self.state == 'sizelsb'):
                self.dataleft = self.bufmsb << 8 | d
                self.state = 'data'
            elif (self.state == 'data'):
                self.buffer.append(d)
                self.dataleft = self.dataleft - 1
                if (self.dataleft == 0):
                    self.state = 'cksum'
            elif (self.state == 'cksum'):
                pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff
                rxcksum = d
                self.state = 'init'
                if (pktsum + rxcksum != 0xff):
                    self.buffer = []
                    self.ck_err += 1
                    print "Checksum error, got 0x%02x, expected 0x%02x" % \
                          (rxcksum, 0xff - pktsum)
                else:
                    print "Got a packet - " + str(self.buffer)
                    p = Packets.Build(self.buffer)
                    self.pktq.append(p)
                    self.buffer = []
                    pktcount += 1
                    self.rx_cnt += 1
            else:
                print "Invalid state %s! Resetting" % (self.state)
                self.state = 'init'
        
        return pktcount

def polldev():
    while (1):
        foo = up.getdata()
        for p in up.pktq:
            print p

        if (foo == 0):
            time.sleep(0.1)

#for c in dir():
#    if (issubclass(c, PktBase)):
#        print ..

try:
    s = serial.Serial(port='/dev/cuaU0', baudrate=9600, bytesize=8, parity='N', \
                      stopbits=1, rtscts=0)
    # Non-blocking
    s.timeout = 0
    #s.write('+++')
    #s.readline(eol='\r')
except serial.serialutil.SerialException, e:
    print "Can't open serial port - " + str(e)
    s = None
    
# 0x0001 (-36dBm) -> 1 samples, mask 0x000f, DIO - 0x00f
goodtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 15, 56]

# Checksum error
badtest = [126, 0, 10, 131, 0, 1, 36, 0, 1, 0, 15, 0, 14, 56]

#0x0005 (-36dBm) -> 1 samples, mask 0x020e, DIO - 0x00e, ADC0 - 0x3ff
adctest = [126, 0, 12, 131, 0, 5, 36, 0, 1, 2, 14, 0, 14, 3, 255, 50]

# Exception
badpkttypetest = [126, 0, 3, 10, 86, 76, 83]

# Frame ID = 0, Cmd = 'VL', Status = OK, Value = 'VL Result'
atreply = [126, 0, 14, 136, 0, 86, 76, 0, 86, 76, 32, 82, 101, 115, 117, 108, 116, 148]

up = Packets(s)
up.process(goodtest)
up.process(badtest)
up.process(adctest)
print up.pktq.pop(0)
print up.pktq.pop(0)