# HG changeset patch # User Daniel O'Connor # Date 1313048518 -34200 # Node ID cba1c44060f56816c1162c924f8aeda6629ef852 # Parent 9bb8a9f3df6bb49812cbaea495a67ce201c88dda Copied from http://sourceforge.net/projects/pythonlabtools/ I modified them slightly to work as something is causing pack_uint to be called instead of pack_int... diff -r 9bb8a9f3df6b -r cba1c44060f5 rpc.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rpc.py Thu Aug 11 17:11:58 2011 +0930 @@ -0,0 +1,965 @@ +"A an RPC module, with optimizations for VXI-11 rpc calls" +_rcsid="$Id: rpc.py 323 2011-04-06 19:10:03Z marcus $" + +#this module is 99% from the python distribution, Demo/rpc/rpc.py, with a few modifications +#by Marcus Mendenhall to improve timeout handling, etc. It probably should follow any licensing +#intent of whoever originated it. + +# Sun RPC version 2 -- RFC1057. + +# XXX There should be separate exceptions for the various reasons why +# XXX an RPC can fail, rather than using RuntimeError for everything + +# XXX The UDP version of the protocol resends requests when it does +# XXX not receive a timely reply -- use only for idempotent calls! + +# XXX There is no provision for call timeout on TCP connections +# Now there is, on receives. 2003.02.20 Marcus Mendenhall, Vanderbilt University marcus.h.mendenhall@vanderbilt.edu + +import xdrlib as xdr +import socket +import os +from exceptions import * + +RPCVERSION = 2 + +CALL = 0 +REPLY = 1 + +AUTH_NULL = 0 +AUTH_UNIX = 1 +AUTH_SHORT = 2 +AUTH_DES = 3 + +MSG_ACCEPTED = 0 +MSG_DENIED = 1 + +SUCCESS = 0 # RPC executed successfully +PROG_UNAVAIL = 1 # remote hasn't exported program +PROG_MISMATCH = 2 # remote can't support version # +PROC_UNAVAIL = 3 # program can't support procedure +GARBAGE_ARGS = 4 # procedure can't decode params + +RPC_MISMATCH = 0 # RPC version number != 2 +AUTH_ERROR = 1 # remote can't authenticate caller + +AUTH_BADCRED = 1 # bad credentials (seal broken) +AUTH_REJECTEDCRED = 2 # client must begin new session +AUTH_BADVERF = 3 # bad verifier (seal broken) +AUTH_REJECTEDVERF = 4 # verifier expired or replayed +AUTH_TOOWEAK = 5 # rejected for security reasons + + +class Packer(xdr.Packer): + + def pack_auth(self, auth): + flavor, stuff = auth + self.pack_enum(flavor) + self.pack_opaque(stuff) + + def pack_auth_unix(self, stamp, machinename, uid, gid, gids): + self.pack_uint(stamp) + self.pack_string(machinename) + self.pack_uint(uid) + self.pack_uint(gid) + self.pack_uint(len(gids)) + for i in gids: + self.pack_uint(i) + + def pack_callheader(self, xid, prog, vers, proc, cred, verf): + self.pack_uint(xid) + self.pack_enum(CALL) + self.pack_uint(RPCVERSION) + self.pack_uint(prog) + self.pack_uint(vers) + self.pack_uint(proc) + self.pack_auth(cred) + self.pack_auth(verf) + # Caller must add procedure-specific part of call + + def pack_replyheader(self, xid, verf): + self.pack_uint(xid) + self.pack_enum(REPLY) + self.pack_uint(MSG_ACCEPTED) + self.pack_auth(verf) + self.pack_enum(SUCCESS) + # Caller must add procedure-specific part of reply + + +# Exceptions +BadRPCFormat = 'rpc.BadRPCFormat' +BadRPCVersion = 'rpc.BadRPCVersion' +GarbageArgs = 'rpc.GarbageArgs' + +class Unpacker(xdr.Unpacker): + + def unpack_auth(self): + flavor = self.unpack_enum() + stuff = self.unpack_opaque() + return (flavor, stuff) + + def unpack_callheader(self): + xid = self.unpack_uint(xid) + temp = self.unpack_enum() + if temp <> CALL: + raise BadRPCFormat, 'no CALL but ' + `temp` + temp = self.unpack_uint() + if temp <> RPCVERSION: + raise BadRPCVerspion, 'bad RPC version ' + `temp` + prog = self.unpack_uint() + vers = self.unpack_uint() + proc = self.unpack_uint() + cred = self.unpack_auth() + verf = self.unpack_auth() + return xid, prog, vers, proc, cred, verf + # Caller must add procedure-specific part of call + + def unpack_replyheader(self): + xid = self.unpack_uint() + mtype = self.unpack_enum() + if mtype <> REPLY: + raise RuntimeError, 'no REPLY but ' + `mtype` + stat = self.unpack_enum() + if stat == MSG_DENIED: + stat = self.unpack_enum() + if stat == RPC_MISMATCH: + low = self.unpack_uint() + high = self.unpack_uint() + raise RuntimeError, \ + 'MSG_DENIED: RPC_MISMATCH: ' + `low, high` + if stat == AUTH_ERROR: + stat = self.unpack_uint() + raise RuntimeError, \ + 'MSG_DENIED: AUTH_ERROR: ' + `stat` + raise RuntimeError, 'MSG_DENIED: ' + `stat` + if stat <> MSG_ACCEPTED: + raise RuntimeError, \ + 'Neither MSG_DENIED nor MSG_ACCEPTED: ' + `stat` + verf = self.unpack_auth() + stat = self.unpack_enum() + if stat == PROG_UNAVAIL: + raise RuntimeError, 'call failed: PROG_UNAVAIL' + if stat == PROG_MISMATCH: + low = self.unpack_uint() + high = self.unpack_uint() + raise RuntimeError, \ + 'call failed: PROG_MISMATCH: ' + `low, high` + if stat == PROC_UNAVAIL: + raise RuntimeError, 'call failed: PROC_UNAVAIL' + if stat == GARBAGE_ARGS: + raise RuntimeError, 'call failed: GARBAGE_ARGS' + if stat <> SUCCESS: + raise RuntimeError, 'call failed: ' + `stat` + return xid, verf + # Caller must get procedure-specific part of reply + + +# Subroutines to create opaque authentication objects + +def make_auth_null(): + return '' + +def make_auth_unix(seed, host, uid, gid, groups): + p = Packer() + p.pack_auth_unix(seed, host, uid, gid, groups) + return p.get_buf() + +def make_auth_unix_default(): + try: + from os import getuid, getgid + uid = getuid() + gid = getgid() + except ImportError: + uid = gid = 0 + import time + return make_auth_unix(int(time.time()-unix_epoch()), \ + socket.gethostname(), uid, gid, []) + +_unix_epoch = -1 +def unix_epoch(): + """Very painful calculation of when the Unix Epoch is. + + This is defined as the return value of time.time() on Jan 1st, + 1970, 00:00:00 GMT. + + On a Unix system, this should always return 0.0. On a Mac, the + calculations are needed -- and hard because of integer overflow + and other limitations. + + """ + global _unix_epoch + if _unix_epoch >= 0: return _unix_epoch + import time + now = time.time() + localt = time.localtime(now) # (y, m, d, hh, mm, ss, ..., ..., ...) + gmt = time.gmtime(now) + offset = time.mktime(localt) - time.mktime(gmt) + y, m, d, hh, mm, ss = 1970, 1, 1, 0, 0, 0 + offset, ss = divmod(ss + offset, 60) + offset, mm = divmod(mm + offset, 60) + offset, hh = divmod(hh + offset, 24) + d = d + offset + _unix_epoch = time.mktime((y, m, d, hh, mm, ss, 0, 0, 0)) + print "Unix epoch:", time.ctime(_unix_epoch) + return _unix_epoch + + +# Common base class for clients + +class Client: + + def __init__(self, host, prog, vers, port): + self.host = host + self.prog = prog + self.vers = vers + self.port = port + self.makesocket() # Assigns to self.sock + self.bindsocket() + self.connsocket() + self.lastxid = 0 # XXX should be more random? + self.addpackers() + self.cred = None + self.verf = None + + def close(self): + self.sock.close() + + def makesocket(self): + # This MUST be overridden + raise RuntimeError, 'makesocket not defined' + + def connsocket(self): + # Override this if you don't want/need a connection + self.sock.connect((self.host, self.port)) + + def bindsocket(self): + # Override this to bind to a different port (e.g. reserved) + self.sock.bind(('', 0)) + + def addpackers(self): + # Override this to use derived classes from Packer/Unpacker + self.packer = Packer() + self.unpacker = Unpacker('') + + def make_call(self, proc, args, pack_func, unpack_func): + #print "make_call() args = " + str(args) + # Don't normally override this (but see Broadcast) + if pack_func is None and args is not None: + raise TypeError, 'non-null args with null pack_func' + #print "packed args" + self.start_call(proc) + if pack_func: + pack_func(args) + self.do_call() + if unpack_func: + result = unpack_func() + else: + result = None + #print "result = " + str(result) + self.unpacker.done() + return result + + def start_call(self, proc): + # Don't override this + self.lastxid = xid = self.lastxid + 1 + cred = self.mkcred() + verf = self.mkverf() + p = self.packer + p.reset() + p.pack_callheader(xid, self.prog, self.vers, proc, cred, verf) + + def do_call(self): + # This MUST be overridden + raise RuntimeError, 'do_call not defined' + + def mkcred(self): + # Override this to use more powerful credentials + if self.cred == None: + self.cred = (AUTH_NULL, make_auth_null()) + return self.cred + + def mkverf(self): + # Override this to use a more powerful verifier + if self.verf == None: + self.verf = (AUTH_NULL, make_auth_null()) + return self.verf + + def call_0(self): # Procedure 0 is always like this + return self.make_call(0, None, None, None) + + +# Record-Marking standard support + +try: + from select import select as _select +except: + _select=None + +def sendfrag_with_timeout(sock, last, frag, timeout_seconds=None): + x = len(frag) + if last: x = x | 0x80000000L + header = (chr(int(x>>24 & 0xff)) + chr(int(x>>16 & 0xff)) + \ + chr(int(x>>8 & 0xff)) + chr(int(x & 0xff))) + block=header+frag + n=len(block) + nsent=0 + while(nsent 114: + raise socket.error, (errno, msg) + raise RuntimeError, 'can\'t assign reserved port' + + +# Client using TCP to a specific port + +class RawTCPClient(Client): + + select_timeout_seconds=None + + def makesocket(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def do_call(self): + call = self.packer.get_buf() + sendrecord(self.sock, call, self.select_timeout_seconds) + reply = recvrecord(self.sock, self.select_timeout_seconds) + u = self.unpacker + u.reset(reply) + xid, verf = u.unpack_replyheader() + if xid <> self.lastxid: + # Can't really happen since this is TCP... + raise RuntimeError, 'wrong xid in reply ' + `xid` + \ + ' instead of ' + `self.lastxid` + + +# Client using UDP to a specific port + +class RawUDPClient(Client): + + def makesocket(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + def do_call(self): + call = self.packer.get_buf() + self.sock.send(call) + try: + from select import select + except ImportError: + print 'WARNING: select not found, RPC may hang' + select = None + BUFSIZE = 8192 # Max UDP buffer size + timeout = 1 + count = 5 + while 1: + r, w, x = [self.sock], [], [] + if select: + r, w, x = select(r, w, x, timeout) + if self.sock not in r: + count = count - 1 + if count < 0: raise RuntimeError, 'timeout' + if timeout < 25: timeout = timeout *2 +## print 'RESEND', timeout, count + self.sock.send(call) + continue + reply = self.sock.recv(BUFSIZE) + u = self.unpacker + u.reset(reply) + xid, verf = u.unpack_replyheader() + if xid <> self.lastxid: +## print 'BAD xid' + continue + break + + +# Client using UDP broadcast to a specific port + +class RawBroadcastUDPClient(RawUDPClient): + + def __init__(self, bcastaddr, prog, vers, port): + RawUDPClient.__init__(self, bcastaddr, prog, vers, port) + self.reply_handler = None + self.timeout = 30 + + def connsocket(self): + # Don't connect -- use sendto + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + def set_reply_handler(self, reply_handler): + self.reply_handler = reply_handler + + def set_timeout(self, timeout): + self.timeout = timeout # Use None for infinite timeout + + def make_call(self, proc, args, pack_func, unpack_func): + if pack_func is None and args is not None: + raise TypeError, 'non-null args with null pack_func' + self.start_call(proc) + if pack_func: + pack_func(args) + call = self.packer.get_buf() + self.sock.sendto(call, (self.host, self.port)) + try: + from select import select + except ImportError: + print 'WARNING: select not found, broadcast will hang' + select = None + BUFSIZE = 8192 # Max UDP buffer size (for reply) + replies = [] + if unpack_func is None: + def dummy(): pass + unpack_func = dummy + while 1: + r, w, x = [self.sock], [], [] + if select: + if self.timeout is None: + r, w, x = select(r, w, x) + else: + r, w, x = select(r, w, x, self.timeout) + if self.sock not in r: + break + reply, fromaddr = self.sock.recvfrom(BUFSIZE) + u = self.unpacker + u.reset(reply) + xid, verf = u.unpack_replyheader() + if xid <> self.lastxid: +## print 'BAD xid' + continue + reply = unpack_func() + self.unpacker.done() + replies.append((reply, fromaddr)) + if self.reply_handler: + self.reply_handler(reply, fromaddr) + return replies + + +# Port mapper interface + +# Program number, version and (fixed!) port number +PMAP_PROG = 100000 +PMAP_VERS = 2 +PMAP_PORT = 111 + +# Procedure numbers +PMAPPROC_NULL = 0 # (void) -> void +PMAPPROC_SET = 1 # (mapping) -> bool +PMAPPROC_UNSET = 2 # (mapping) -> bool +PMAPPROC_GETPORT = 3 # (mapping) -> unsigned int +PMAPPROC_DUMP = 4 # (void) -> pmaplist +PMAPPROC_CALLIT = 5 # (call_args) -> call_result + +# A mapping is (prog, vers, prot, port) and prot is one of: + +IPPROTO_TCP = 6 +IPPROTO_UDP = 17 + +# A pmaplist is a variable-length list of mappings, as follows: +# either (1, mapping, pmaplist) or (0). + +# A call_args is (prog, vers, proc, args) where args is opaque; +# a call_result is (port, res) where res is opaque. + + +class PortMapperPacker(Packer): + + def pack_mapping(self, mapping): + prog, vers, prot, port = mapping + self.pack_uint(prog) + self.pack_uint(vers) + self.pack_uint(prot) + self.pack_uint(port) + + def pack_pmaplist(self, list): + self.pack_list(list, self.pack_mapping) + + def pack_call_args(self, ca): + prog, vers, proc, args = ca + self.pack_uint(prog) + self.pack_uint(vers) + self.pack_uint(proc) + self.pack_opaque(args) + + +class PortMapperUnpacker(Unpacker): + + def unpack_mapping(self): + prog = self.unpack_uint() + vers = self.unpack_uint() + prot = self.unpack_uint() + port = self.unpack_uint() + return prog, vers, prot, port + + def unpack_pmaplist(self): + return self.unpack_list(self.unpack_mapping) + + def unpack_call_result(self): + port = self.unpack_uint() + res = self.unpack_opaque() + return port, res + + +class PartialPortMapperClient: + + def addpackers(self): + self.packer = PortMapperPacker() + self.unpacker = PortMapperUnpacker('') + + def Set(self, mapping): + return self.make_call(PMAPPROC_SET, mapping, \ + self.packer.pack_mapping, \ + self.unpacker.unpack_uint) + + def Unset(self, mapping): + return self.make_call(PMAPPROC_UNSET, mapping, \ + self.packer.pack_mapping, \ + self.unpacker.unpack_uint) + + def Getport(self, mapping): + return self.make_call(PMAPPROC_GETPORT, mapping, \ + self.packer.pack_mapping, \ + self.unpacker.unpack_uint) + + def Dump(self): + return self.make_call(PMAPPROC_DUMP, None, \ + None, \ + self.unpacker.unpack_pmaplist) + + def Callit(self, ca): + return self.make_call(PMAPPROC_CALLIT, ca, \ + self.packer.pack_call_args, \ + self.unpacker.unpack_call_result) + + +class TCPPortMapperClient(PartialPortMapperClient, RawTCPClient): + + def __init__(self, host, port=PMAP_PORT, timeout_seconds=None): + RawTCPClient.__init__(self, \ + host, PMAP_PROG, PMAP_VERS, port) + self.select_timeout_seconds=timeout_seconds + + +class UDPPortMapperClient(PartialPortMapperClient, RawUDPClient): + + def __init__(self, host, port=PMAP_PORT): + RawUDPClient.__init__(self, \ + host, PMAP_PROG, PMAP_VERS, port) + +class BroadcastUDPPortMapperClient(PartialPortMapperClient, \ + RawBroadcastUDPClient): + + def __init__(self, bcastaddr): + RawBroadcastUDPClient.__init__(self, \ + bcastaddr, PMAP_PROG, PMAP_VERS, PMAP_PORT) + + +# Generic clients that find their server through the Port mapper + +class TCPClient(RawTCPClient): + + def __init__(self, host, prog, vers, portmap_proxy_host=None, portmap_proxy_port=PMAP_PORT, timeout_seconds=None): + + self.select_timeout_seconds=timeout_seconds + if portmap_proxy_host is None: + portmap_proxy_host=host #use a proxy to get around firewalled portmappers + pmap = TCPPortMapperClient(portmap_proxy_host, portmap_proxy_port,timeout_seconds) + port = pmap.Getport((prog, vers, IPPROTO_TCP, 0)) + pmap.close() + if port == 0: + raise RuntimeError, 'program not registered' + RawTCPClient.__init__(self, host, prog, vers, port) + + +class UDPClient(RawUDPClient): + + def __init__(self, host, prog, vers, portmap_proxy_host=None, portmap_proxy_port=PMAP_PORT): + if portmap_proxy_host is None: + portmap_proxy_host=host #use a proxy to get around firewalled portmappers + pmap = UDPPortMapperClient(portmap_proxy_host, portmap_proxy_port) + port = pmap.Getport((prog, vers, IPPROTO_UDP, 0)) + pmap.close() + if port == 0: + raise RuntimeError, 'program not registered' + RawUDPClient.__init__(self, host, prog, vers, port) + + +class BroadcastUDPClient(Client): + + def __init__(self, bcastaddr, prog, vers): + self.pmap = BroadcastUDPPortMapperClient(bcastaddr) + self.pmap.set_reply_handler(self.my_reply_handler) + self.prog = prog + self.vers = vers + self.user_reply_handler = None + self.addpackers() + + def close(self): + self.pmap.close() + + def set_reply_handler(self, reply_handler): + self.user_reply_handler = reply_handler + + def set_timeout(self, timeout): + self.pmap.set_timeout(timeout) + + def my_reply_handler(self, reply, fromaddr): + port, res = reply + self.unpacker.reset(res) + result = self.unpack_func() + self.unpacker.done() + self.replies.append((result, fromaddr)) + if self.user_reply_handler is not None: + self.user_reply_handler(result, fromaddr) + + def make_call(self, proc, args, pack_func, unpack_func): + self.packer.reset() + if pack_func: + pack_func(args) + if unpack_func is None: + def dummy(): pass + self.unpack_func = dummy + else: + self.unpack_func = unpack_func + self.replies = [] + packed_args = self.packer.get_buf() + dummy_replies = self.pmap.Callit( \ + (self.prog, self.vers, proc, packed_args)) + return self.replies + + +# Server classes + +# These are not symmetric to the Client classes +# XXX No attempt is made to provide authorization hooks yet + +class Server: + + def __init__(self, host, prog, vers, port): + self.host = host # Should normally be '' for default interface + self.prog = prog + self.vers = vers + self.port = port # Should normally be 0 for random port + self.makesocket() # Assigns to self.sock and self.prot + self.bindsocket() + self.host, self.port = self.sock.getsockname() + self.addpackers() + + def handle(self, call): + # Don't use unpack_header but parse the header piecewise + # XXX I have no idea if I am using the right error responses! + self.unpacker.reset(call) + self.packer.reset() + xid = self.unpacker.unpack_uint() + self.packer.pack_uint(xid) + temp = self.unpacker.unpack_enum() + if temp <> CALL: + return None # Not worthy of a reply + self.packer.pack_uint(REPLY) + temp = self.unpacker.unpack_uint() + if temp <> RPCVERSION: + self.packer.pack_uint(MSG_DENIED) + self.packer.pack_uint(RPC_MISMATCH) + self.packer.pack_uint(RPCVERSION) + self.packer.pack_uint(RPCVERSION) + return self.packer.get_buf() + self.packer.pack_uint(MSG_ACCEPTED) + self.packer.pack_auth((AUTH_NULL, make_auth_null())) + prog = self.unpacker.unpack_uint() + if prog <> self.prog: + self.packer.pack_uint(PROG_UNAVAIL) + return self.packer.get_buf() + vers = self.unpacker.unpack_uint() + if vers <> self.vers: + self.packer.pack_uint(PROG_MISMATCH) + self.packer.pack_uint(self.vers) + self.packer.pack_uint(self.vers) + return self.packer.get_buf() + proc = self.unpacker.unpack_uint() + methname = 'handle_' + `proc` + try: + meth = getattr(self, methname) + except AttributeError: + self.packer.pack_uint(PROC_UNAVAIL) + return self.packer.get_buf() + cred = self.unpacker.unpack_auth() + verf = self.unpacker.unpack_auth() + try: + meth() # Unpack args, call turn_around(), pack reply + except (EOFError, GarbageArgs): + # Too few or too many arguments + self.packer.reset() + self.packer.pack_uint(xid) + self.packer.pack_uint(REPLY) + self.packer.pack_uint(MSG_ACCEPTED) + self.packer.pack_auth((AUTH_NULL, make_auth_null())) + self.packer.pack_uint(GARBAGE_ARGS) + return self.packer.get_buf() + + def turn_around(self): + try: + self.unpacker.done() + except RuntimeError: + raise GarbageArgs + self.packer.pack_uint(SUCCESS) + + def handle_0(self): # Handle NULL message + self.turn_around() + + def makesocket(self): + # This MUST be overridden + raise RuntimeError, 'makesocket not defined' + + def bindsocket(self): + # Override this to bind to a different port (e.g. reserved) + self.sock.bind((self.host, self.port)) + + def addpackers(self): + # Override this to use derived classes from Packer/Unpacker + self.packer = Packer() + self.unpacker = Unpacker('') + + +class TCPServer(Server): + + def register(self): + mapping = self.prog, self.vers, self.prot, self.port + p = TCPPortMapperClient(self.host) + if not p.Set(mapping): + raise RuntimeError, 'register failed' + + def unregister(self): + mapping = self.prog, self.vers, self.prot, self.port + p = TCPPortMapperClient(self.host) + if not p.Unset(mapping): + raise RuntimeError, 'unregister failed' + + def makesocket(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.prot = IPPROTO_TCP + + def loop(self): + self.sock.listen(0) + while 1: + self.session(self.sock.accept()) + + def session(self, connection): + sock, (host, port) = connection + while 1: + try: + call = recvrecord(sock) + except EOFError: + break + except socket.error, msg: + print 'socket error:', msg + break + reply = self.handle(call) + if reply is not None: + sendrecord(sock, reply) + + def forkingloop(self): + # Like loop but uses forksession() + self.sock.listen(0) + while 1: + self.forksession(self.sock.accept()) + + def forksession(self, connection): + # Like session but forks off a subprocess + import os + # Wait for deceased children + try: + while 1: + pid, sts = os.waitpid(0, 1) + except os.error: + pass + pid = None + try: + pid = os.fork() + if pid: # Parent + connection[0].close() + return + # Child + self.session(connection) + finally: + # Make sure we don't fall through in the parent + if pid == 0: + os._exit(0) + + +class UDPServer(Server): + + def register(self): + mapping = self.prog, self.vers, self.prot, self.port + p = UDPPortMapperClient(self.host) + if not p.Set(mapping): + raise RuntimeError, 'register failed' + + def unregister(self): + mapping = self.prog, self.vers, self.prot, self.port + p = UDPPortMapperClient(self.host) + if not p.Unset(mapping): + raise RuntimeError, 'unregister failed' + + def makesocket(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.prot = IPPROTO_UDP + + def loop(self): + while 1: + self.session() + + def session(self): + call, host_port = self.sock.recvfrom(8192) + reply = self.handle(call) + if reply <> None: + self.sock.sendto(reply, host_port) + + +# Simple test program -- dump local portmapper status + +def test(): + pmap = UDPPortMapperClient('') + list = pmap.Dump() + list.sort() + for prog, vers, prot, port in list: + print prog, vers, + if prot == IPPROTO_TCP: print 'tcp', + elif prot == IPPROTO_UDP: print 'udp', + else: print prot, + print port + + +# Test program for broadcast operation -- dump everybody's portmapper status + +def testbcast(): + import sys + if sys.argv[1:]: + bcastaddr = sys.argv[1] + else: + bcastaddr = '' + def rh(reply, fromaddr): + host, port = fromaddr + print host + '\t' + `reply` + pmap = BroadcastUDPPortMapperClient(bcastaddr) + pmap.set_reply_handler(rh) + pmap.set_timeout(5) + replies = pmap.Getport((100002, 1, IPPROTO_UDP, 0)) + + +# Test program for server, with corresponding client +# On machine A: python -c 'import rpc; rpc.testsvr()' +# On machine B: python -c 'import rpc; rpc.testclt()' A +# (A may be == B) + +def testsvr(): + # Simple test class -- proc 1 doubles its string argument as reply + class S(UDPServer): + def handle_1(self): + arg = self.unpacker.unpack_string() + self.turn_around() + print 'RPC function 1 called, arg', `arg` + self.packer.pack_string(arg + arg) + # + s = S('', 0x20000000, 1, 0) + try: + s.unregister() + except RuntimeError, msg: + print 'RuntimeError:', msg, '(ignored)' + s.register() + print 'Service started...' + try: + s.loop() + finally: + s.unregister() + print 'Service interrupted.' + + +def testclt(): + import sys + if sys.argv[1:]: host = sys.argv[1] + else: host = '' + # Client for above server + class C(UDPClient): + def call_1(self, arg): + return self.make_call(1, arg, \ + self.packer.pack_string, \ + self.unpacker.unpack_string) + c = C(host, 0x20000000, 1) + print 'making call...' + reply = c.call_1('hello, world, ') + print 'call returned', `reply` + +def testclt2(): + import sys + host = '127.0.0.1' + # Client for above server + class C(UDPClient): + def call_1(self, arg): + return self.make_call(1, arg, \ + self.packer.pack_string, \ + self.unpacker.unpack_string) + c = C(host, 0x20000000, 1) + print 'making call...' + reply = c.call_1('hello, world, ') + print 'call returned', `reply` + diff -r 9bb8a9f3df6b -r cba1c44060f5 vxi_11.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/vxi_11.py Thu Aug 11 17:11:58 2011 +0930 @@ -0,0 +1,672 @@ +"The basic infrastructure for maintaining a vxi-11 protocol connection to a remote device" +_rcsid="$Id: vxi_11.py 323 2011-04-06 19:10:03Z marcus $" + +import rpc +from rpc import TCPClient, RawTCPClient +import exceptions +import struct +import traceback +import time +import weakref +import sys +import select + +#try: +# import threading +# threads=1 +#except: +# threads=0 +threads=False + +connection_dict={} + +def close_all_connections(): + "disconnect and close out all vxi_11 connections created here, even if their object references have been lost" + for wobj in connection_dict.keys(): + name, wconn=connection_dict[wobj] + conn=wconn() #dereference weak ref + if conn is not None: + try: + conn.disconnect() + except: + conn.log_exception("***vxi_11.close_all_connections exception: ") + + else: + del connection_dict[wobj] #how did this happen? + +class Junk_OneWayAbortClient(RawTCPClient): + """OneWayAbortClient allows one to handle the strange, one-way abort rpc from an Agilent E5810. + Really, it doesn't even do a one-way transmission... it loses aborts, so this is history """ + + def do_call(self): + call = self.packer.get_buf() + rpc.sendrecord(self.sock, call) + self.unpacker.reset('\0\0\0\0') #put a valid return value into the unpacker + +class VXI_11_Error(IOError): + vxi_11_errors={ + 0:"No error", 1:"Syntax error", 3:"Device not accessible", + 4:"Invalid link identifier", 5:"Parameter error", 6:"Channel not established", + 8:"Operation not supported", 9:"Out of resources", 11:"Device locked by another link", + 12:"No lock held by this link", 15:"IO Timeout", 17:"IO Error", 21:"Invalid Address", + 23:"Abort", 29:"Channel already established" , + "eof": "Cut off packet received in rpc.recvfrag()", + "sync":"stream sync lost", + "notconnected": "Device not connected"} + + def identify_vxi_11_error(self, error): + if self.vxi_11_errors.has_key(error): + return `error`+": "+self.vxi_11_errors[error] + else: + return `error`+": Unknown error code" + + + def __init__(self, code, **other_info): + IOError.__init__(self, self.identify_vxi_11_error(code)) + self.code=code + self.other_info=other_info + + def __repr__(self): + if self.other_info: + return str(self)+": "+str(self.other_info) + else: + return str(self) + +class VXI_11_Device_Not_Connected(VXI_11_Error): + def __init__(self): + VXI_11_Error.__init__(self,'notconnected') + +class VXI_11_Device_Not_Locked(VXI_11_Error): + pass + +class VXI_11_Transient_Error(VXI_11_Error): #exceptions having to do with multiple use which might get better + pass + +class VXI_11_Timeout(VXI_11_Transient_Error): + pass + +class VXI_11_Locked_Elsewhere(VXI_11_Transient_Error): + pass + +class VXI_11_Stream_Sync_Lost(VXI_11_Transient_Error): + def __init__(self, code, bytes): + VXI_11_Transient_Error.__init__(self, code) + self.other_info="bytes vacuumed = %d" % bytes + + +class VXI_11_RPC_EOF(VXI_11_Transient_Error): + pass + +_VXI_11_enumerated_exceptions={ #common, correctable exceptions + 15:VXI_11_Timeout, + 11:VXI_11_Locked_Elsewhere, + 12:VXI_11_Device_Not_Locked +} + +class vxi_11_connection: + """vxi_11_connection implements handling of devices compliant with vxi11.1-vxi11.3 protocols, with which + the user should have some familiarity""" + + debug_info=0 + debug_error=1 + debug_warning=2 + debug_all=3 + + debug_level=debug_info + + OneWayAbort=0 #by default, this class uses two-way aborts, per official vxi-11 standard + + def _list_packer(self, args): + l=map(None, self.pack_type_list, args) # combine lists + for packer, data in l: + #print "packing " + str(data) + " with " + str(packer) + packer(data) + + def _list_unpacker(self): + res = [] + for f in self.unpack_type_list: + a = f() + #print "Unpacked " + str(a) + " with " + str(f) + res.append(a) + return res + #return [func() for func in self.unpack_type_list] + + def _link_xdr_defs(self, channel): + "self.link_xdr_defs() creates dictionaries of functions for packing and unpacking the various data types" + p=channel.packer + u=channel.unpacker + + xdr_packer_defs={ + "write": (p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_opaque), + "read": (p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_int, p.pack_int), + "create_link": (p.pack_int, p.pack_bool, p.pack_uint, p.pack_string), + "generic": (p.pack_int, p.pack_int, p.pack_int, p.pack_int), + "lock": (p.pack_int, p.pack_int, p.pack_int), + "id": (p.pack_int,) + } + + xdr_unpacker_defs={ + "write": (u.unpack_int, u.unpack_int), + "read": (u.unpack_int, u.unpack_int, u.unpack_opaque), + # *something* is transforming pack_int to pack_uint which (obviously) doesn't like getting negative numbers + # Since LID is basically opaque unpack it as a uint to paper over the vileness + "create_link": (u.unpack_uint, u.unpack_int, u.unpack_uint, u.unpack_uint), + "read_stb":(u.unpack_int, u.unpack_int), + "error": (u.unpack_int,) + } + + return xdr_packer_defs, xdr_unpacker_defs + + def _setup_core_packing(self, pack, unpack): + #print "setting up packing with " + pack + self.pack_type_list, self.unpack_type_list=self._core_packers[pack],self._core_unpackers[unpack] + #print "pack_type_list now " + str(self.pack_type_list) + #print "unpack_type_list now " + str(self.unpack_type_list) + + def post_init(self): + pass + + def simple_log_error(self, message, level=debug_error, file=None): + if level <= self.debug_level: + if file is None: + file=sys.stderr + print >> file, self.device_name, message + + def fancy_log_error(self, message, level=debug_error, file=None): + if level <= self.debug_level: + message=str(message).strip() + level_str=("**INFO*****", "**ERROR****", "**WARNING**", "**DEBUG****")[level] + if file is None: + file=sys.stderr + print >> file, time.asctime().strip(), '\t', level_str, '\t', self.shortname, '\t', \ + message.replace('\n','\n\t** ').replace('\r','\n\t** ') + + def log_error(self, message, level=debug_error, file=None): + "override log_error() for sending messages to special places or formatting differently" + self.fancy_log_error(message, level, file) + + def log_traceback(self, main_message='', file=None): + exlist=traceback.format_exception(*sys.exc_info()) + s=main_message+'\n' + for i in exlist: + s=s+i + + self.log_error(s, self.debug_error, file) + + def log_info(self, message, file=None): + self.log_error(message, self.debug_info, file) + + def log_warning(self, message, file=None): + self.log_error(message, self.debug_warning, file) + + def log_debug(self, message, file=None): + self.log_error(message, self.debug_all, file) + + def log_exception(self, main_message='', file=None): + self.log_error(main_message+traceback.format_exception_only(*(sys.exc_info()[:2]))[0], self.debug_error, file) + + def __init__(self, host='127.0.0.1', device="inst0", timeout=1000, raise_on_err=None, device_name="Network Device", shortname=None, + portmap_proxy_host=None, portmap_proxy_port=rpc.PMAP_PORT, use_vxi_locking=True): + + self.raise_on_err=raise_on_err + self.lid=None + self.timeout=timeout + self.device_name=device_name + self.device_sicl_name=device + self.host=host + self.portmap_proxy_host=portmap_proxy_host + self.portmap_proxy_port=portmap_proxy_port + self.core=None + self.abortChannel=None + self.mux=None #default is no multiplexer active + self.use_vxi_locking=use_vxi_locking + + if shortname is None: + self.shortname=device_name.strip().replace(' ','').replace('\t','') + else: + self.shortname=shortname.strip().replace(' ','').replace('\t','') + + if threads: + self.threadlock=threading.RLock() + + try: + self.reconnect() + + except VXI_11_Transient_Error: + self.log_exception("Initial connect failed... retry later") + + def setup_mux(self, mux=None, global_name=None): + self.mux=mux + self.global_mux_name=global_name + + def command(self, id, pack, unpack, arglist, ignore_connect=0): + #print "command() id = " + str(id) + ", pack = " + pack + if not (ignore_connect or self.connected): + raise VXI_11_Device_Not_Connected + + #command has been made atomic, so that things like get_status_byte can be done + #in a multi-threaded environment without needed a full vxi-11 lock to make it safe + if threads: + self.threadlock.acquire() #make this atomic + + self._setup_core_packing(pack, unpack) + + try: + try: + result= self.core.make_call(id, arglist, self._list_packer, self._list_unpacker) + except (RuntimeError, EOFError): + #RuntimeError is thrown by recvfrag if the xid is off... it means we lost data in the pipe + #EOFError is thrown if the packet isn't full length, as usually happens when ther is garbage in the pipe read as a length + #so vacuum out the socket, and raise a transient error + rlist=1 + ntotal=0 + while(rlist): + rlist, wlist, xlist=select.select([self.core.sock],[],[], 1.0) + if rlist: + ntotal+=len(self.core.sock.recv(10000) )#get some data from it + raise VXI_11_Stream_Sync_Lost("sync", ntotal) + finally: + if threads: + self.threadlock.release() #let go + + err=result[0] + + if err and self.raise_on_err: + e=_VXI_11_enumerated_exceptions #common, correctable exceptions + if e.has_key(err): + raise e[err](err) #raise these exceptions explicitly + else: + raise VXI_11_Error(err) #raise generic VXI_11 exception + + return result + + def do_timeouts(self, timeout, lock_timeout, channel=None): + + if channel is None: + channel=self.core + + flags=0 + if timeout is None: + timeout=self.timeout + + if not lock_timeout and hasattr(self,"default_lock_timeout"): + lock_timeout=self.default_lock_timeout + + if lock_timeout: + flags |= 1 # append waitlock bit + + if channel: + channel.select_timeout_seconds=0.5+1.5*max(timeout, lock_timeout)/1000.0 #convert ms to sec, and be generous on hard timeout + + return flags, timeout, lock_timeout + + def reconnect(self): #recreate a broken connection + """reconnect() creates or recreates our main connection. Useful in __init__ and in complete communications breakdowns. + If it throws a VXI_11_Transient_Error, the connection exists, but the check_idn() handshake or post_init() failed.""" + + self.connected=0 + + if self.core: + self.core.close() #if this is a reconnect, break old connection the hard way + if self.abortChannel: + self.abortChannel.close() + + self.core=rpc.TCPClient(self.host, 395183, 1, + portmap_proxy_host=self.portmap_proxy_host, + portmap_proxy_port=self.portmap_proxy_port) + + self._core_packers, self._core_unpackers=self._link_xdr_defs(self.core) #construct xdr data type definitions for the core + #print "_core_packers now " + str(self._core_packers) + + err, self.lid, self.abortPort, self.maxRecvSize=self.command( + 10, "create_link","create_link", (0, 0, self.timeout, self.device_sicl_name), ignore_connect=1) #execute create_link + if err: #at this stage, we always raise exceptions since there isn't any way to bail out or retry reasonably + raise VXI_11_Error(err) + + self.maxRecvSize=min(self.maxRecvSize, 1048576) #never transfer more than 1MB at a shot + + if self.OneWayAbort: + #self.abort_channel=OneWayAbortClient(self.host, 395184, 1, self.abortPort) + self.abort_channel=rpc.RawUDPClient(self.host, 395184, 1, self.abortPort) + else: + self.abort_channel=RawTCPClient(self.host, 395184, 1, self.abortPort) + + connection_dict[self.lid]=(self.device_name, weakref.ref(self)) + + self.locklevel=0 + + self.connected=1 + + self.check_idn() + self.post_init() + + + def abort(self): + + self.abort_channel.select_timeout_seconds=self.timeout/1000.0 #convert to seconds + try: + err=self.abort_channel.make_call(1, self.lid, self.abort_channel.packer.pack_int, self.abort_channel.unpacker.unpack_int) #abort + except EOFError: + raise VXI_11_RPC_EOF("eof") + + if err and self.raise_on_err: + raise VXI_11_Error( err) + return err + + def disconnect(self): + if self.connected: + try: + err, =self.command(23, "id", "error", (self.lid,)) #execute destroy_link + except: + self.log_traceback() #if we can't close nicely, we'll close anyway + + self.connected=0 + del connection_dict[self.lid] + self.lid=None + self.core.close() + self.abort_channel.close() + del self.core, self.abort_channel + self.core=None + self.abortChannel=None + + def __del__(self): + if self.lid is not None: + self.raise_on_err=0 #no exceptions here from simple errors + try: + self.abort() + except VXI_11_Error: + pass + try: + self.disconnect() + except VXI_11_Error: + pass + + + + def write(self, data, timeout=None, lock_timeout=0): + """err, bytes_sent=write(data [, timeout] [,lock_timeout]) sends data to device. See do_timeouts() for + semantics of timeout and lock_timeout""" + + flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout) + base=0 + end=len(data) + while baseself.maxRecvSize: + xfer=self.maxRecvSize + else: + xfer=n + flags |= 8 #write end on last byte + + err, count=self.command(11, "write", "write", (self.lid, timeout, lock_timeout, flags, data[base:base+xfer])) + if err: break + base+=count + return err, base + + def read(self, timeout=None, lock_timeout=0, count=None, termChar=None): + """err, reason, result=read([timeout] [,lock_timeout] [,count] [,termChar]) reads up to count bytes from the device, + ending on count, EOI or termChar (if specified). See do_timeouts() for semantics of the timeouts. \n + the returned reason is an inclusive OR of 3 bits (per the VXI-11 specs section B.6.4.device_read): + Bit 2 = END/EOI received, + bit 1 = Terminating Character received, + bit 0 = full requested byte count received. + """ + flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout) + + if termChar is not None: + flags |= 128 # append termchrset bit + act_term=termChar + else: + act_term=0 + + accumdata="" + reason=0 + err=0 + accumlen=0 + + while ( (not err) and (not (reason & 6) ) and + ( (count is None) or (accumlen < count)) ): #wait for END or CHR reason flag or count + + readcount=self.maxRecvSize + if count is not None: + readcount=min(readcount, count-accumlen) + err, reason, data = self.command(12, "read","read", (self.lid, readcount, timeout, lock_timeout, flags, act_term)) + accumdata+=data + accumlen+=len(data) + #print err, reason, len(data), len(accumdata) + + return err, reason, accumdata + + def generic(self, code, timeout, lock_timeout): + flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout) + + err, = self.command(code, "generic", "error", (self.lid, flags, timeout, lock_timeout)) + + return err + + def trigger(self, timeout=None, lock_timeout=0): + return self.generic(14, timeout, lock_timeout) + + def clear(self, timeout=None, lock_timeout=0): + return self.generic(15, timeout, lock_timeout) + + def remote(self, timeout=None, lock_timeout=0): + return self.generic(16, timeout, lock_timeout) + + def local(self, timeout=None, lock_timeout=0): + return self.generic(17, timeout, lock_timeout) + + def read_status_byte(self, timeout=None, lock_timeout=0): + flags, timeout, lock_timeout=self.do_timeouts(timeout, lock_timeout) + + err, status = self.command(13, "generic","read_stb", (self.lid, flags, timeout, lock_timeout)) + + return err, status + + def lock(self, lock_timeout=0): + """lock() acquires a lock on a device and the threadlock. If it fails it leaves the connection cleanly unlocked. + If self.use_vxi_locking is false, it acquires only a thread lock locally, and does not really lock the vxi-11 device. + This is useful if only one process is talking to a given device, and saves time.""" + err=0 + if threads: + self.threadlock.acquire() + + if self.use_vxi_locking and self.locklevel==0: + flags, timeout, lock_timeout=self.do_timeouts(0, lock_timeout) + try: + if self.mux: self.mux.lock_connection(self.global_mux_name) + try: + err, = self.command(18, "lock","error", (self.lid, flags, lock_timeout)) + except: + if self.mux: self.mux.unlock_connection(self.global_mux_name) + raise + except: + if threads: + self.threadlock.release() + raise + + if err: + if threads: + self.threadlock.release() + else: + self.locklevel+=1 + return err + + def is_locked(self): + return self.locklevel > 0 + + def unlock(self, priority=0): + """unlock(priority=0) unwinds one level of locking, and if the level is zero, really unlocks the device. + Calls to lock() and unlock() should be matched. If there is a danger that they are not, due to bad + exception handling, unlock_completely() should be used as a final cleanup for a series of operations. + Setting priority to non-zero will bias the apparent last-used time in a multiplexer (if one is used), + so setting priority to -10 will effectively mark this channel least-recently-used, while setting it to + +2 will post-date the last-used time 2 seconds, so for the next 2 seconds, the device will be hard to kick + out of the channel cache (but not impossible). + """ + + self.locklevel-=1 + assert self.locklevel>=0, "Too many unlocks on device: "+self.device_name + + err=0 + try: + if self.use_vxi_locking and self.locklevel==0: + try: + err, = self.command(19, "id", "error", (self.lid, )) + finally: + if self.mux: + self.mux.unlock_connection(self.global_mux_name, priority) #this cannot fail, no try needed (??) + elif priority and self.mux: + #even on a non-final unlock, a request for changed priority is always remembered + self.mux.adjust_priority(self.global_mux_name, priority) + finally: + if threads: + self.threadlock.release() + + return err + + def unlock_completely(self, priority=0): + "unlock_completely() forces an unwind of any locks all the way back to zero for error cleanup. Only exceptions thrown are fatal." + if threads: + self.threadlock.acquire() #make sure we have the threadlock before we try a (possibly failing) full lock + try: + self.lock() #just to be safe, we should already hold one level of lock! + except VXI_11_Locked_Elsewhere: + pass #this is often called on error cleanup when we don't already have a lock, and we don't really care if we can't get it + except VXI_11_Error: + self.log_exception("Unexpected trouble locking in unlock_completely(): ") + + if threads: + self.threadlock._RLock__count += (1-self.threadlock._RLock__count) + #unwind to single lock the fast way, and make sure this variable really existed, to shield against internal changes + self.locklevel=1 #unwind our own counter, too + try: + self.unlock(priority) + except VXI_11_Device_Not_Locked: + pass #if we couldn't lock above, we will probably get another exception here, and don't care + except VXI_11_Transient_Error: + self.log_exception("Unexpected trouble unlocking in unlock_completely(): ") + except VXI_11_Error: + self.log_exception("Unexpected trouble unlocking in unlock_completely(): ") + raise + + def transaction(self, data, count=None, lock_timeout=0): + """err, reason, result=transaction(data, [, count] [,lock_timeout]) sends data and waits for a response. + It is guaranteed to leave the lock level at its original value on exit, + unless KeyboardInterrupt breaks the normal flow. If count isn't provided, there is no limit to how much data will be accepted. + See do_timeouts() for semantics on lock_timeout.""" + + self.lock(lock_timeout) + reason=None + result=None + try: + err, write_count = self.write(data) + + if not err: + err, reason, result = self.read(count=count) + finally: + self.unlock() + + return err, reason, result + + def check_idn(self): + 'check_idn() executes "*idn?" and aborts if the result does not start with self.idn_head' + if hasattr(self,"idn"): + return #already done + if hasattr(self,"idn_head") and self.idn_head is not None: + + self.lock() + try: + self.clear() + err, reason, idn = self.transaction("*idn?") + finally: + self.unlock() + + check=idn.find(self.idn_head) + self.idn=idn.strip() #save for future reference info + if check: + self.disconnect() + assert check==0, "Wrong device type! expecting: "+self.idn_head+"... got: "+self.idn + else: + self.idn="Device *idn? not checked!" + +import copy + +class device_thread: + + if threads: + Thread=threading.Thread #by default, use canonical threads + + def __init__(self, connection, main_sleep=1.0, name="Device"): + self.running=0 + self.main_sleep=main_sleep + self.__thread=None + self.__name=copy.copy(name) #make a new copy to avoid a possible circular reference + self.__wait_event=threading.Event() + self.set_connection(connection) + + def set_connection(self, connection): + #keep only a weak reference, so the thread cannot prevent the device from being deleted + #such deletion creates an error when the thread tries to run, but that's OK + #this allows the device_thread to be used as a clean mix-in class to a vxi_11 connection + self.__weak_connection=weakref.ref(connection) + + def connection(self): + return self.__weak_connection() #dereference weak reference + + def handle_lock_error(self): + "handle_lock_error can be overridden to count failures and do something if there are too many" + self.connection().log_exception(self.name+": Error while locking device") + + def onepass(self): + connection=self.connection() + + try: + connection.lock() + except VXI_11_Transient_Error: + self.handle_lock_error() + return + + try: + self.get_data() + except: + connection.log_traceback('Uncaught exception in get_data()') + try: + connection.clear() + except: + connection.log_exception('failed to clear connection after error') + self.run=0 + + connection.unlock() + + def monitor(self): + self.connection().log_info("Monitor loop entered") + while(self.run): + try: + self.onepass() + self.__wait_event.wait(self.main_sleep) #wait until timeout or we are cancelled + except KeyboardInterrupt: + self.connection().log_error("Keyboard Interrupt... terminating") + self.run=0 + except: + self.connection().log_traceback() + self.run=0 + + self.running=0 + self.connection().unlock_completely() + + def run_thread(self): + if not self.running: #if it's already running, just keep it up. + self.run=1 + self.__thread=self.Thread(target=self.monitor, name=self.__name) + self.__wait_event.clear() #make sure we don't fall through immediately + self.__thread.start() + self.running=1 + + def get_monitor_thread(self): + return self.__thread + + def stop_thread(self): + if self.running: + self.run=0 + self.__wait_event.set() #cancel any waiting