view zb.py @ 1:b5d8ec0ffd21

Lots of changes.. - Create a class for each packet type, needs more fleshing out. - Make sure we reset state when we get a checksum error. - Add some more test cases. - OOify things a bit more.
author darius@inchoate.localdomain
date Tue, 30 Oct 2007 17:26:40 +1030
parents bdcdd8380d94
children 7bf4d4265339
line wrap: on
line source

import serial

class PktBase(object):
    def __init__(self, data):
        self.data = data
        print "Constructing " + self.__class__.__name__

    def __repr__(self):
        return str(self.__class__) + "(" + str([self.PKT_TYPE] + self.data) + ")"

    #def __str__(self):
    #    return "%s: %s" % (self.PKT_DESC, str(self.data))

class AT_Cmd(PktBase):
    PKT_TYPE = 0x08
    PKT_DESC = "AT Command"

class AT_Cmd_Queue(PktBase):
    PKT_TYPE = 0x09
    PKT_DESC = "AT Command (queued)"

class AT_Response(PktBase):
    PKT_TYPE = 0x88
    PKT_DESC = "AT Command response"

class Modem_Status(PktBase):
    PKT_TYPE = 0x8a
    PKT_DESC = "Modem Status"

class RX_64_Bit(PktBase):
    PKT_TYPE = 0x80
    PKT_DESC = "RX Packet: 64 bit address"

class RX_16_Bit(PktBase):
    PKT_TYPE = 0x81
    PKT_DESC = "RX Packet: 16 bit address"

class RXIO_64_Bit(PktBase):
    PKT_TYPE = 0x82
    PKT_DESC = "RXIO Packet: 64 bit address"

class RXIO_16_Bit(PktBase):
    PKT_TYPE = 0x83
    PKT_DESC = "RXIO Packet: 16 bit address"

    def __str__(self):
        sender = self.data[0] << 8 | self.data[1]
        rssi = -1 * self.data[2]
        flags = self.data[3]
        nsamples = self.data[4]
        mask = self.data[5] << 8 | self.data[6]
        rtn = "0x%04x (%ddBm) -> %d samples, mask 0x%04x" % (sender, rssi, nsamples, mask)
        # Any DIO lines enabled?
        if (mask | 0x01ff):
            rtn = rtn + ", DIO - 0x%03x" % (self.data[7] << 8 | self.data[8])
            offs = 9
        else:
            offs = 7
            
        # Any ADC lines enabled?
        if (mask | 0x7e00):
            for i in range(6):
                if (mask & 1 << (i + 9)):
                    rtn = rtn + ", ADC%d - 0x%02x" % (i, self.data[offs] << 8 |
                                                      self.data[offs + 1])
                    offs = offs + 2

        return rtn

class TX_64_Bit(PktBase):
    PKT_TYPE = 0x00
    PKT_DESC = "TX Packet: 64 bit address"

class TX_16_Bit(PktBase):
    PKT_TYPE = 0x00
    PKT_DESC = "TX Packet: 16 bit address"

class TX_Status(PktBase):
    PKT_TYPE = 0x89
    PKT_DESC = "TX Status"

class Packets(object):
    PKT_CLASSES = [AT_Cmd, AT_Cmd_Queue, AT_Response, Modem_Status, RX_64_Bit,
                   RX_16_Bit, RXIO_64_Bit, RXIO_16_Bit, TX_64_Bit, TX_16_Bit,
                   TX_Status]
    
    def Build(data):
        for p in Packets.PKT_CLASSES:
            if (p.PKT_TYPE == data[0]):
                return(p(data[1:]))
    
    Build = staticmethod(Build)

    def Encapsulate(data):
        pktsum = reduce(lambda x, y: x + y, data) & 0xff
        pkt = [0x7e] + [len(data) >> 8] + [len(data) & 0xff] + data + [0xff - pktsum]
        return(map(chr, pkt))
    
    Encapsulate = staticmethod(Encapsulate)

    def __init__(self):
        self.buffer = []
        self.state = 'init'
        self.packets = []
        
        self.bufmsb = 0
        self.dataleft = 0
        
        self.fr_err = 0 # Framing error
        self.ck_err = 0 # Checksum error
        self.rx_cnt = 0 # Packet count
        
        self.pktq = []
    
    def getdata(self, s):
        l = []
        while (1):
            a = s.read()
            if (a == ''):
                break
            l.append(a)
    
        return self.process(l)
    
    def process(self, data):
        pktcount = 0
        for d in data:
            dord = ord(d)
            if (self.state == 'init'):
                if (d != '\x7e'):
                    print "Framing error, got 0x%02x, expected 0x7f" % (dord)
                    self.fr_err = self.fr_err + 1
                    continue
                
                self.state = 'sizemsb'
            elif (self.state == 'sizemsb'):
                self.bufmsb = dord
                self.state = 'sizelsb'
            elif (self.state == 'sizelsb'):
                self.dataleft = self.bufmsb << 8 | \
                                dord
                self.state = 'data'
            elif (self.state == 'data'):
                self.buffer.append(dord)
                self.dataleft = self.dataleft - 1
                if (self.dataleft == 0):
                    self.state = 'cksum'
            elif (self.state == 'cksum'):
                pktsum = reduce(lambda x, y: x + y, self.buffer) & 0xff
                rxcksum = dord
                self.state = 'init'
                if (pktsum + rxcksum != 0xff):
                    self.buffer = []
                    self.ck_err = self.ck_err + 1
                    print "Checksum error, got 0x%02x, expected 0x%02x" % \
                          (rxcksum, 0xff - pktsum)
                else:
                    print "Got a packet - " + str(self.buffer)
                    p = Packets.Build(self.buffer)
                    self.pktq.append(p)
                    self.buffer = []
                    pktcount = pktcount + 1
                    self.rx_cnt = self.rx_cnt + 1
            else:
                print "Invalid state %s! Resetting" % (self.state)
                self.state = 'init'
        
        return pktcount

#for c in dir():
#    if (issubclass(c, PktBase)):
#        print ..

s = serial.Serial(port='/dev/cuad0', baudrate=9600, bytesize=8, parity='N', \
		  stopbits=1, rtscts=0)
s.setTimeout(0.1)
#s.write('+++')
#s.readline(eol='\r')

goodtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0f', '8']
badtest = ['~', '\x00', '\n', '\x83', '\x00', '\x01', '$', '\x00', '\x01', '\x00', '\x0f', '\x00', '\x0e', '8']
adctest = ['~', '\x00', '\x0c', '\x83', '\x00', '\x05', '$', '\x00', '\x01', '\x02', '\x0e', '\x00', '\x0e', '\x03', '\xff', '2' ]
up = Packets()
up.process(goodtest)
#up.process(badtest)
p = up.pktq.pop(0)