view giant.py @ 0:1f3c12ba927d default tip

Rework code for USB interface
author Daniel O'Connor <darius@dons.net.au>
date Sun, 19 Nov 2017 18:10:23 +1030
parents
children
line wrap: on
line source

#!/usr/bin/env python

# pip install --user pyusb pycrc
import crc16
import exceptions
import os
import usb.core, usb.util, usb.control # https://github.com/pyusb/pyusb

# USB details
# Borrowed from http://allican.be/blog/2017/01/28/reverse-engineering-cypress-serial-usb.html
vendorId = 0x0665
productId = 0x5161
interface = 0

# Have to add one to these CRCs values before sending
badvals = [0x28, 0x0d, 0x0a]

class Timeout(exceptions.BaseException):
    pass

class FramingError(exceptions.BaseException):
    pass

class CRCError(exceptions.BaseException):
    pass

class GiantIPS(object):
    PIWS = ['Reserved', 'InverterFault', 'BusOver', 'BusUnder', 'BusSoftFail', 'LineFail', 'OPVShort', 'InverterVoltsLow',
                'InverterVoltsHigh', 'OverTemp', 'FanLocked', 'BattVoltsHigh', 'BattLowAlarm', 'Reserved(Overcharge)',
                'BatterySHutdown', 'Reserved(BattDerate)', 'Overload', 'EEPROM', 'InverterOverCurrent', 'SelfTest',
                'OPDCVoltsOver', 'BattOpen', 'CurrentSenseFail', 'BatteryShort', 'PowerLimit', 'PVVoltsHigh1',
                'MPPTOverload', 'MPPTOverloadWarn', 'BattTooLowChrg', 'PVVoltsHigh2', 'MPPTOverload2', 'MPPTOverloadWarn2',
                'BattTooLowChrg2', 'PVVoltsHigh3', 'MPPTOverload3', 'MPPTOverloadWarn3', 'BattTooLowChrg3']
    def __init__(self):
        dev = usb.core.find(idVendor = vendorId, idProduct = productId)
        if dev.is_kernel_driver_active(interface):
            dev.detach_kernel_driver(interface)
        dev.set_interface_altsetting(0, 0)
        self.dev = dev

    def compose_msg(self, data):
        crc = crc16.crc16xmodem(data)
        crclow = crc & 0xff
        crchigh = crc >> 8
        if crclow in badvals:
            crclow += 1
        if crchigh in badvals:
            crchigh += 1
        data = data + chr(crchigh) + chr(crclow) + chr(0x0d)
        while len(data) < 8:
            data = data + b'\0'
        return data

    def tx_msg(self, data):
        self.dev.ctrl_transfer(0x21, 0x9, 0x200, 0, self.compose_msg(data))

    def rx_msg(self):
        res = ''
        tries = 200
        while tries > 0 and '\r' not in res:
            try:
                d = self.dev.read(0x81, 8, 10)
                res += ''.join([chr(i) for i in d if i != 0x00])
            except usb.core.USBError as e:
                if e.errno == 110: # timeout
                    tries -= 1
                    pass
                else:
                    raise


        if tries == 0:
            raise Timeout()
        if res[0] != '(' or res[-1] != '\r':
            raise FramingError()
        crc = crc16.crc16xmodem(res[0:-3])
        crclow = crc & 0xff
        crchigh = crc >> 8
        if ord(res[-3]) != crchigh or ord(res[-2]) != crclow:
            #raise CRCError()
            print('CRC error')

        return res[1:-3]

    def cmd(self, cmd):
        self.tx_msg(cmd)
        return self.rx_msg()

    def getStatus(self):
        d = self.cmd('QPIGS').split()
        status = {}
        if d[16][0] == '1':
            status['SBUPrio'] = True
        else:
            status['SBUPrio'] = False
        if d[16][1] == '1':
            status['ConfigChg'] = True
        else:
            status['ConfigChg'] = False
        if d[16][2] == '1':
            status['BattVoltSteady'] = True
        else:
            status['BattVoltSteady'] = False
        if d[16][3] == '1':
            status['Charging'] = True
        else:
            status['Charging'] = False
        if d[16][4] == '1':
            status['SCC1Charging'] = True
        else:
            status['SCC1Charging'] = False
        if d[16][5:] == '110':
            status['ChargeType'] = 'SCC1'
        elif d[16][5:] == '101':
            status['ChargeType'] = 'AC'
        else:
            status['ChargeType'] = 'Both'
        if d[20][0] == '1':
            status['FloatCharge'] = True
        else:
            status['FloatCharge'] = False
        if d[20][1] == '1':
            status['Switch'] = True
        else:
            status['Switch'] = False
        return {
            'GridVolts' : float(d[0]),
            'GridFreq' : float(d[1]),
            'ACVolts' : float(d[2]),
            'ACFreq' : float(d[3]),
            'ACAppPower' : float(d[4]),
            'ACActPower' : float(d[5]),
            'LoadPct' : float(d[6]),
            'BusVolts' : float(d[7]),
            'BattVolts' : float(d[8]),
            'BattChrCurr' : float(d[9]),
            'BattCap' : float(d[10]),
            'HSTemp' : float(d[11]),
            'PVCurr1' : float(d[12]),
            'PVVolt1' : float(d[13]),
            'SCC1Volt' : float(d[14]),
            'BattDisCurr' : float(d[15]),
            'Status' : status,
            'BattVoltOfs' : float(d[17]) / 0.01, # 10mV
            'PVChrgPow1' : float(d[19]),
            }

    def getAlarms(self):
        d = self.cmd('QPIWS')
        res = {}
        for i in range(min(len(d), len(self.PIWS))):
            if d[i] == '1':
                res[self.PIWS[i]] = True
            else:
                res[self.PIWS[i]] = False
        return res

def main():
    ips = GiantIPS()
    print(ips.cmd('QPI'))

if __name__ == '__main__':
    main()