view usb488.py @ 55:ad5942d22f78

Use BaseException rather than strings. Add ask method. Wrap tag to a byte.
author Daniel O'Connor <doconnor@gsoft.com.au>
date Tue, 08 Dec 2020 13:59:05 +1030
parents 876d951bbcc0
children 91b476ebc0f2
line wrap: on
line source

#!/usr/bin/env python

# Copyright (c) 2009
#      Daniel O'Connor <darius@dons.net.au>.  All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

#
# Spec/info..
#
# http://www.usb.org/developers/devclass_docs/USBTMC_1_006a.zip
# http://svn.openmoko.org/developers/werner/ahrt/host/tmc/README
# http://www.home.agilent.com/agilent/redirector.jspx?action=ref&cname=AGILENT_EDITORIAL&ckey=1189335&lc=eng&cc=US&nfr=-35560.0.00
# linux-2.6.29.3/drivers/usb/class/usbtmc.c
# http://sdpha2.ucsd.edu/Lab_Equip_Manuals/usbtmc_usb488_subclass_1_00.pdf
#

import usb

#
# The usual SCPI commands are wrapped before being sent.
#
# Write:
# Offset	Field		Size	Value	Description
# 0		MsgID		1	0x01	DEV_DEP_MSG_OUT
# 1		bTag		1	0x01	Varies with each transfer
# 2		bTagInverse	1	0xfe	Inverse of previous field
# 3		Reserved	1	0x00
# 4		TransferSize	4	0x06
# 5		..			0x00
# 6		..			0x00
# 7		..			0x00
# 8		bmTransferAttr	1	0x01	1 == end of msg
# 9		Reserved	1	0x00
# 10		Reserved	1	0x00
# 11		Reserved	1	0x00
# 12		Msg itself	1	0x2a	'*'
# 13				1	0x49	'I'
# 14				1	0x44	'D'
# 15				1	0x4e	'N'
# 16				1	0x3f	'?'
# 17				1	0x0a	'\n'
# 18-19		Alignment	2	0x0000	Bring into 4 byte alignment
#
#
# Send a read request:
# Offset	Field		Size	Value	Description
# 0		MsgID		1	0x02	REQUEST_DEV_DEP_MSG_IN
# 1		bTag		1	0x02	Varies with each transfer
# 2		bTagInverse	1	0xfd	Inverse of previous field
# 3		Reserved	1	0x00
# 4		TransferSize	4	0x64
# 5		..			0x00
# 6		..			0x00
# 7		..			0x00
# 8		bmTransferAttr	1	0x00
# 9		Term char	1	0x00
# 10		Reserved	1	0x00
# 11		Reserved	1	0x00

# No libusb versions of these available
USB_CLASS_APP_SPECIFIC = 254
USB_SUBCLASS_TMC = 3
USB_PROTOCOL_488 = 1

# USB488 message IDs
DEV_DEP_MSG_OUT = 1
REQUEST_DEV_DEP_MSG_IN = 2
DEV_DEP_MSG_IN = 2

class USB488Device(object):
    def __init__(self, vendor = None, product = None, serial = None, path = None):
        """Search for a USB488 class device, if specified vendor,
        product, serial and path will refine the search"""

        busses = usb.busses()

        #
        # Search for the device we want
        #
        found = False
        for bus in busses:
            for dev in bus.devices:
                # Skip ones that don't match
                if vendor != None and dev.idVendor != vendor:
                    continue
                if product != None and dev.idProduct != product:
                    continue
                if serial != None and dev.idSerialNumber != serial:
                    continue
                if path != None and dev.filename != path:
                    continue

                # The libusb examples say you can check for device
                # class and then open, however in that case you can't
                # find the endpoint number which seems pretty useless
                # unless you want to hard code everything.
                for confidx in xrange(len(dev.configurations)):
                    for iface in dev.configurations[confidx].interfaces:
                        for altif in iface:
                            # Check if this is a USB488 capable interface
                            if altif.interfaceClass == USB_CLASS_APP_SPECIFIC and \
                                   altif.interfaceSubClass == USB_SUBCLASS_TMC and \
                                   altif.interfaceProtocol == USB_PROTOCOL_488:
                                found = True
                                break
                        if found:
                            break
                    if found:
                        break
                if found:
                    break
            if found:
                break
        if not found:
            raise BaseException("Could not find a suitable USB device")
    
        # Open the device and claim the USB interface that supports the spec
        handle = dev.open()
        handle.setConfiguration(dev.configurations[confidx].value)
        handle.claimInterface(altif.interfaceNumber)
        handle.setAltInterface(altif.alternateSetting)
        self.dev = dev
        self.handle = handle

        # Get some info for humans
        self.vendname = handle.getString(dev.iManufacturer, 1024)
        self.prodname = handle.getString(dev.iProduct, 1024)
        self.serial = handle.getString(dev.iSerialNumber, 1024)

        # Determine the endpoints for each operation type
        self.intrep = self.bulkinep = self.bulkoutep = None
    
        for ep in altif.endpoints:
            if ep.type == usb.ENDPOINT_TYPE_INTERRUPT and \
                   ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN:
                self.intrep = ep.address

            if ep.type == usb.ENDPOINT_TYPE_BULK:
                if ep.address & usb.ENDPOINT_IN == usb.ENDPOINT_IN:
                    self.bulkinep = ep.address
                else:
                    self.bulkoutep = ep.address
                    self.maxPacket = ep.maxPacketSize

        # Required for 488.2 devices, optional otherwise
        if self.intrep == None:
            print "Can't find interrupt endpoint"

        # Data from the scope (mandatory)
        if self.bulkinep == None:
            raise BaseException("Can't find bulk-in endpoint")

        # Data to the scope (mandatory)
        if self.bulkoutep == None:
            raise BaseException("Can't find bulk-out endpoint")

        self.tag = 1

    def __str__(self):
        rtn = "Mfg: %s Prod: %s" % (self.vendname, self.prodname)
        if self.serial != "":
            rtn += " S/N: " + self.serial

        return rtn
    
    def incrtag(self):
        self.tag = (self.tag + 1) % 255
        if self.tag == 0:
            self.tag += 1

    def write(self, data):
        """Send data (string) to the instrument"""

        orddata = map(ord, data)
        # The device needs a \n at the end, enfore this
        if orddata[-1] != '\n':
            orddata += [ord('\n')]
        datalen = len(orddata)

        # Build the packet
        pkt = [ DEV_DEP_MSG_OUT, self.tag, ~self.tag & 0xff, 0x00,
                datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff,
                datalen >> 24 & 0xff, 1, 0, 0, 0 ]

        # Add the data
        pkt = pkt + orddata

        # Align to 4 bytes
        alignlen = ((len(pkt) / 4) + 1) * 4
        pkt = pkt + [0] * (alignlen - len(pkt))

        # Bump the tag
        self.incrtag()
            
        # Split it up into maxPacket sized chunks and send..
        while len(pkt) > 0:
            chunk = pkt[0:self.maxPacket]
            pkt = pkt[self.maxPacket:]

            #print "Sending %s bytes of data: %s" % (len(chunk), chunk)
            wrote = self.handle.bulkWrite(self.bulkoutep, chunk)
            if wrote != len(chunk):
                raise BaseException("Short write, got %d, expected %d" % (wrote, len(chunk)))

    def read(self, timeout = None):
        """Read data from the device, waits for up to timeout seconds for each USB transaction"""

        if timeout == None:
            timeout = 0.1
            
        # Mangle into milliseconds
        _timeout = int(timeout * 1000.0)

        # Maximum we accept at once
        # Was 2^31 - 1 but that seems to make things take too long to
        # read (perhaps libusb tries to malloc it..)
        datalen = 10240
        data = []
        
        while True:
            # Ask the device to send us something
            pkt = [ REQUEST_DEV_DEP_MSG_IN, self.tag, ~self.tag & 0xff, 0x00,
                    datalen & 0xff, datalen >> 8 & 0xff, datalen >> 16 & 0xff,
                    datalen >> 24 & 0xff, 0, 0, 0, 0]

            # Expected tag
            exptag = self.tag
            
            # Bump tag
            self.incrtag()

            # Send it
            #print "Sending " + str(pkt)
            wrote = self.handle.bulkWrite(self.bulkoutep, pkt, _timeout)
            if wrote != len(pkt):
                print "Short write, got %d, expected %d" % (wrote, len(pkt))

            #print "Reading.."
            read = self.handle.bulkRead(self.bulkinep, datalen, _timeout)
            #print "Read %s bytes: %s" % (len(read), str(read))

            if read[0] != DEV_DEP_MSG_IN:
                raise BaseException("Unexpected Msg ID, got %s expected %d" % (read[0], DEV_DEP_MSG_IN))
            if read[1] != exptag:
                raise BaseException("Unexpected tag, got %d expected %d" % (read[1], exptag))
            if read[2] != ~exptag & 0xff:
                raise BaseException("Unexpected tag inverse, got %d expected %d" % (read[1], ~exptag & 0xff))

            actualdata = read[4] | read[5] << 8 | read[6] << 16 | read[7] << 24
            #print "Computed datalen is %d" % (actualdata)
            data += read[12:12 + actualdata]
            if read[8] & 0x01:
                #print "Breaking out due to EOM"
                break

        # Stringify result for easier consumption
        result = reduce(lambda x, y: x+y, map(chr, data))
        # Trim off \n if present
        if result[-1] == '\n':
            result = result[0:-1]

        return result

    def ask(self, s, timeout = None):
        self.write(s)
        return self.read(timeout = None)

    def isConnected(self):
        """Check if the device is present"""

        # libusb makes it very hard (at least on FreeBSD) to determine if we're still connected.
        # This is a reasonable proxy..
        try:
            self.handle.getString(self.dev.iManufacturer, 100)
        except USBError, e:
            return False

        return True