]> jspc29.x-matter.uni-frankfurt.de Git - trbnettools.git/commitdiff
Early version of libtrbnet python wrapper. Read/Write access to registers tested...
authorTobias Weber <toweber86@gmail.com>
Fri, 9 Mar 2018 10:38:01 +0000 (11:38 +0100)
committerTobias Weber <toweber86@gmail.com>
Fri, 9 Mar 2018 10:38:01 +0000 (11:38 +0100)
libtrbnet/PyTRB.py [new file with mode: 0644]

diff --git a/libtrbnet/PyTRB.py b/libtrbnet/PyTRB.py
new file mode 100644 (file)
index 0000000..00a23c2
--- /dev/null
@@ -0,0 +1,292 @@
+# -*- coding: utf-8 -*-
+import ctypes
+import os
+
+# TODO: use warnings to indicate access to wrong register or no data
+
+# dictionary with trb error strings. Copied from trberror.h
+# TODO: better way to access this directly in libtrbnet
+trb_errno_dict = {0: 'TRB_NONE', 1: 'TRB_TX_BUSY', 2: 'TRB_FIFO_NOT_EMPTY',
+                  3: 'TRB_FIFO_TIMEOUT', 4: 'TRB_FIFO_HEADERS', 5: 'TRB_FIFO_SEQUENZ',
+                  6: 'TRB_FIFO_INVALID_MODE', 7: 'TRB_FIFO_INCOMPLETE_PACKAGE',
+                  8: 'TRB_FIFO_INVALID_HEADER', 9: 'TRB_FIFO_MISSING_TERM_HEADER',
+                  10: 'TRB_FAILED_WAIT_IS_VALID', 11: 'TRB_FAILED_WAIT_IS_NOT_VALID',
+                  12: 'TRB_USER_BUFFER_OVF', 13: 'TRB_INVALID_CHANNEL',
+                  14: 'TRB_INVALID_PKG_NUMBER', 15: 'TRB_STATUS_ERROR',
+                  16: 'TRB_INVALID_ADDRESS', 17: 'TRB_INVALID_LENGTH',
+                  18: 'TRB_ENDPOINT_NOT_REACHED', 19: 'TRB_DMA_UNAVAILABLE',
+                  20: 'TRB_DMA_TIMEOUT', 21: 'TRB_READMEM_INVALID_SIZE',
+                  22: 'TRB_HDR_DLEN', 23: 'TRB_PEXOR_OPEN', 24: 'TRB_SEMAPHORE',
+                  25: 'TRB_FIFO_SHARED_MEM', 26: 'TRB_STATUS_WARNING',
+                  27: 'TRB_RPC_ERROR', 28: 'TRB_PEXOR_DATA_ERROR',
+                  29: 'TRB_PEXOR_DEVICE_ERROR', 30: 'TRB_PEXOR_DEVICE_TRB_TIMEOUT',
+                  31: 'TRB_PEXOR_DEVICE_POLLING_TIMEOUT', 32: 'TRB_PEXOR_DEVICE_DMA_EMPTY',
+                  33: 'TRB_PEXOR_DEVICE_INVALID_DMA_SIZE', 34: 'TRB_PEXOR_DEVICE_LOST_CREDENTIAL',
+                  35: 'TRB_PEXOR_DEVICE_FIFO_TRANSFER', 36: 'TRB_TRB3_CMD_NOT_SUPPORTED',
+                  37: 'TRB_TRB3_SOCKET_ERROR', 38: 'TRB_TRB3_SOCKET_TIMEOUT',
+                  39: 'TRB_TRB3_INCOMPLETE_UDP', 40: 'TRB_TRB3_DATA_ERROR',
+                  41: 'TRB_TRB3_INVALID_UDP_HEADER'}
+
+
+class PyTRB(object):
+    '''
+    Wrapper class for trbnet access using python
+    '''
+
+    def __init__(self, trb_server, daq_server, path_to_lib='libtrbnet.so'):
+        '''
+        Default Constructor. Sets enviromental variable and initialises ports.
+        The trbnet daemon has to be running.
+
+        Keyword arguments:
+        trbserver -- string for TRB3_SERVER enviromental variable
+        daq_server -- string for DAQOPSERVER enviromental variable
+        path_to_lib -- full path to libtrbnet.so
+        '''
+        self.trblib = ctypes.cdll.LoadLibrary(path_to_lib)
+        self.declare_types()
+        os.environ['TRB3_SERVER'] = trb_server
+        os.environ['DAQOPSERVER'] = daq_server
+        status = self.trblib.init_ports()
+        if status < 0:
+            raise Exception('Error initialising ports.')
+
+    def __del__(self):
+        '''
+        Destructor.
+        '''
+        self.trblib.close_ports()
+
+    def trb_errno(self):
+        '''
+        Returns trb_errno flag value
+        '''
+        return ctypes.c_int.in_dll(self.trblib, 'trb_errno').value
+
+    def declare_types(self):
+        '''
+        Declare argument and return types of libtrbnet calls via ctypes
+        '''
+        self.trblib.trb_register_read.argtypes = [ctypes.c_uint16, ctypes.c_uint16,
+                                                  ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint]
+        self.trblib.trb_register_read.restype = ctypes.c_int
+        self.trblib.trb_register_write.argtypes = [ctypes.c_uint16, ctypes.c_uint16,
+                                                   ctypes.c_uint32]
+        self.trblib.trb_register_write.restype = ctypes.c_int
+        self.trblib.trb_register_read_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8, ctypes.c_uint16,
+                                                      ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint]
+        self.trblib.trb_register_read_mem.restype = ctypes.c_int
+        self.trblib.trb_registertime_read_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8, ctypes.c_uint16,
+                                                          ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint]
+        self.trblib.trb_registertime_read_mem.restype = ctypes.c_int
+        self.trblib.trb_read_uid.argtypes = [ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint]
+        self.trblib.trb_read_uid.restype = ctypes.c_int
+        self.trblib.trb_set_address.argtypes = [ctypes.c_uint64, ctypes.c_uint8, ctypes.c_uint16]
+        self.trblib.trb_set_address.restype = ctypes.c_int
+        self.trblib.network_reset.restype = ctypes.c_int
+        self.trblib.com_reset.restype = ctypes.c_int
+        self.trblib.trb_fifo_flush.argtypes = [ctypes.c_uint8]
+        self.trblib.trb_fifo_flush.restype = ctypes.c_int
+        self.trblib.trb_send_trigger.argtypes = [ctypes.c_uint8, ctypes.c_uint16,
+                                                 ctypes.c_uint8, ctypes.c_uint16]
+        self.trblib.trb_send_trigger.restype = ctypes.c_int
+        self.trblib.trb_register_setbit.argtypes = [ctypes.c_uint16, ctypes.c_uint16,
+                                                    ctypes.c_uint32]
+        self.trblib.trb_register_setbit.restype = ctypes.c_int
+        self.trblib.trb_register_clearbit.argtypes = [ctypes.c_uint16, ctypes.c_uint16,
+                                                      ctypes.c_uint32]
+        self.trblib.trb_register_clearbit.restype = ctypes.c_int
+        self.trblib.trb_register_loadbit.argtypes = [ctypes.c_uint16, ctypes.c_uint16,
+                                                     ctypes.c_uint32, ctypes.c_uint32]
+        self.trblib.trb_register_loadbit.restypes = ctypes.c_int
+        self.trblib.trb_ipu_data_read.argtypes = [ctypes.c_uint8, ctypes.c_uint8, ctypes.c_uint8,
+                                                  ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32),
+                                                  ctypes.c_uint]
+        self.trblib.trb_ipu_data_read.restypes = ctypes.c_int
+        self.trblib.trb_nettrace.argtypes = [ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32),
+                                                  ctypes.c_uint]
+        self.trblib.trb_nettrace.restypes = ctypes.c_int
+        
+
+    def trb_register_read(self, trb_address, reg_address):
+        '''
+        Read value from trb register.
+
+        Keyword arguments:
+        trb_address -- trb endpoint address
+        reg_address -- register address
+
+        Returns:
+        python list [0] TRB-Address of the sender, [1] register value
+        '''
+        data_array = (ctypes.c_uint32 * 2)()
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        status = self.trblib.trb_register_read(trb_address, reg_address, data_array, 2)
+        if status == -1:
+            raise Exception('Error while reading trb register, ' + trb_errno_dict[self.trb_errno()])
+        print(self.trb_errno())
+        return [data_array[i] for i in range(status)]
+
+    def trb_register_write(self, trb_address, reg_address, value):
+        '''
+        Write trb register
+
+        Keyword Arguments:
+        trb_address -- trb endpoint address
+        reg_address -- register address
+        value -- value to write to register
+        '''
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        value = ctypes.c_uint(value)
+        status = self.trblib.trb_register_write(trb_address, reg_address, value)
+        if status == -1:
+            raise Exception('Error while writing trb register, ' + trb_errno_dict[self.trb_errno()])
+
+    def trb_register_read_mem(self, trb_address, reg_address, option, size):
+        '''
+        Perform several trb register reads
+
+        Keyword Arguments:
+        trb_address -- trb endpoint address
+        reg_address -- register address
+        option -- read option, 0 = read same register several times 1 = read adjacent registers
+        size -- number of reads
+
+        Returns:
+        python list [0] TRB-Address of the sender, [1:] register values
+        '''
+        data_array = (ctypes.c_uint32 * (size + 2))()
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        option = ctypes.c_uint8(option)
+        status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2))
+        if status == -1:
+            raise Exception('Error while reading trb register memory, ' +
+                            trb_errno_dict[self.trb_errno()])
+        return [data_array[i] for i in range(status)]
+
+    def trb_read_uid(self, trb_address):
+        '''
+        Read unique id of endpoint
+
+        Keyword Arguments:
+        trb_address -- address of endpoint
+
+        Returns:
+        python list
+        [0]:  UID High-32Bit Word
+        [1]: UID Low-32Bit Word
+        [2]:  Endpoint Number
+        [3]: TRB-Address of the sender
+        '''
+        data_array = (ctypes.c_uint32 * 4)()
+        trb_address = ctypes.c_uint16(trb_address)
+        status = self.trblib.trb_read_uid(trb_address, data_array, 4)
+        if status == -1:
+            raise Exception('Error reading trb uid, ' + trb_errno_dict[self.trb_errno()])
+        return [data_array[i] for i in range(status)]
+
+    def trb_set_address(self, uid, endpoint, trb_address):
+        '''
+        Set trb net address
+
+        Keyword Arguments:
+        uid -- the unique ID of the endpoint
+        endpoint -- number of the trb endpoint
+        trb_address -- new trb address
+        '''
+        uid = ctypes.c_uint64(uid)
+        endpoint = ctypes.c_uint8(endpoint)
+        trb_address = ctypes.c_uint16(trb_address)
+        status = self.trblib.trb_set_address(uid, endpoint, trb_address)
+        if status == -1:
+            raise Exception('error setting trb address, ' + trb_errno_dict[self.trb_errno()])
+
+#  rarely used funtions without documentation in trbnet.h
+#  meaning of arguments and returned data unknown
+    def network_reset(self):
+        '''TRB network reset'''
+        return self.trblib.network_reset()
+
+    def com_reset(self):
+        '''communication reset'''
+        return self.trblib.com_reset()
+
+    def trb_fifo_flush(self, channel):
+        '''flush trb fifo
+
+        Keyword arguments:
+        channel: trb channel (ipu, slowcontrol etc)'''
+        channel = ctypes.c_uint8(channel)
+        return self.trblib.trb_fifo_flush(channel)
+
+    def trb_send_trigger(self, trigtype, info, random, number):
+        '''send trigger to trb
+
+        Keyword arguments:
+        trigtype: trigger type (status, calibration)
+        info: trigger information
+        random: random trigger number
+        number: number of triggers
+        '''
+        trigtype = ctypes.c_uint8(trigtype)
+        info = ctypes.c_uint32(info)
+        random = ctypes.c_uint8(random)
+        number = ctypes.c_uint16(number)
+        return self.trblib.trb_send_trigger(trigtype, info, random, number)
+
+    def trb_register_setbit(self, trb_address, reg_address, bitmask):
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        bitmask = ctypes.c_uint32(bitmask)
+        return self.trblib.trb_register_setbit(trb_address, reg_address,
+                                               bitmask)
+
+    def trb_register_clearbit(self, trb_address, reg_address, bitmask):
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        bitmask = ctypes.c_uint32(bitmask)
+        return self.trblib.trb_register_clearbit(trb_address, reg_address,
+                                                 bitmask)
+
+    def trb_register_loadbit(self, trb_address, reg_address, bitmask, bitvalue):
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        bitmask = ctypes.c_uint32(bitmask)
+        bitvalue = ctypes.c_uint32(bitvalue)
+        return self.trblib.trb_register_loadbit(trb_address, reg_address,
+                                                bitmask, bitvalue)
+
+    def trb_registertime_read_mem(self, trb_address, reg_address, option, size):
+        data_array = (ctypes.c_uint32 * (size + 2))()
+        trb_address = ctypes.c_uint16(trb_address)
+        reg_address = ctypes.c_uint16(reg_address)
+        option = ctypes.c_uint8(option)
+        status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2))
+        if status == -1:
+            raise Exception('Error while reading trb register memory, ' +
+                            trb_errno_dict[self.trb_errno()])
+        return [data_array[i] for i in range(status)]
+
+    def trb_ipu_data_read(self, trg_type, trg_info, trg_random, trg_number, size):
+        trg_type = ctypes.c_uint8(trg_type)
+        trg_info = ctypes.c_uint8(trg_info)
+        trg_random = ctypes.c_uint8(trg_random)
+        trg_number = ctypes.c_uint16(trg_number)
+        data_array = (ctypes.c_uint32 * (size + 2))()
+        status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(size + 2))
+        if status == -1:
+            raise Exception('Error while reading trb ipu data, ' +
+                            trb_errno_dict[self.trb_errno()])
+        return [data_array[i] for i in range(status)]
+
+    def trb_nettrace(self, trb_address, size):
+        trb_address = ctypes.c_uint16(trb_address)
+        data_array = (ctypes.c_uint32 * (size + 2))()
+        status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(size + 2))
+        if status == -1:
+            raise Exception('Error while doing net trace, ' +
+                            trb_errno_dict[self.trb_errno()])
+        return [data_array[i] for i in range(status)]