From 21b46a76aa9eee0536bb38b36f111a98ee96b50d Mon Sep 17 00:00:00 2001 From: Tobias Weber Date: Fri, 9 Mar 2018 11:38:01 +0100 Subject: [PATCH] Early version of libtrbnet python wrapper. Read/Write access to registers tested. Other functions still need more work. --- libtrbnet/PyTRB.py | 292 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100644 libtrbnet/PyTRB.py diff --git a/libtrbnet/PyTRB.py b/libtrbnet/PyTRB.py new file mode 100644 index 0000000..00a23c2 --- /dev/null +++ b/libtrbnet/PyTRB.py @@ -0,0 +1,292 @@ +# -*- coding: utf-8 -*- +import ctypes +import os + +# TODO: use warnings to indicate access to wrong register or no data + +# dictionary with trb error strings. Copied from trberror.h +# TODO: better way to access this directly in libtrbnet +trb_errno_dict = {0: 'TRB_NONE', 1: 'TRB_TX_BUSY', 2: 'TRB_FIFO_NOT_EMPTY', + 3: 'TRB_FIFO_TIMEOUT', 4: 'TRB_FIFO_HEADERS', 5: 'TRB_FIFO_SEQUENZ', + 6: 'TRB_FIFO_INVALID_MODE', 7: 'TRB_FIFO_INCOMPLETE_PACKAGE', + 8: 'TRB_FIFO_INVALID_HEADER', 9: 'TRB_FIFO_MISSING_TERM_HEADER', + 10: 'TRB_FAILED_WAIT_IS_VALID', 11: 'TRB_FAILED_WAIT_IS_NOT_VALID', + 12: 'TRB_USER_BUFFER_OVF', 13: 'TRB_INVALID_CHANNEL', + 14: 'TRB_INVALID_PKG_NUMBER', 15: 'TRB_STATUS_ERROR', + 16: 'TRB_INVALID_ADDRESS', 17: 'TRB_INVALID_LENGTH', + 18: 'TRB_ENDPOINT_NOT_REACHED', 19: 'TRB_DMA_UNAVAILABLE', + 20: 'TRB_DMA_TIMEOUT', 21: 'TRB_READMEM_INVALID_SIZE', + 22: 'TRB_HDR_DLEN', 23: 'TRB_PEXOR_OPEN', 24: 'TRB_SEMAPHORE', + 25: 'TRB_FIFO_SHARED_MEM', 26: 'TRB_STATUS_WARNING', + 27: 'TRB_RPC_ERROR', 28: 'TRB_PEXOR_DATA_ERROR', + 29: 'TRB_PEXOR_DEVICE_ERROR', 30: 'TRB_PEXOR_DEVICE_TRB_TIMEOUT', + 31: 'TRB_PEXOR_DEVICE_POLLING_TIMEOUT', 32: 'TRB_PEXOR_DEVICE_DMA_EMPTY', + 33: 'TRB_PEXOR_DEVICE_INVALID_DMA_SIZE', 34: 'TRB_PEXOR_DEVICE_LOST_CREDENTIAL', + 35: 'TRB_PEXOR_DEVICE_FIFO_TRANSFER', 36: 'TRB_TRB3_CMD_NOT_SUPPORTED', + 37: 'TRB_TRB3_SOCKET_ERROR', 38: 'TRB_TRB3_SOCKET_TIMEOUT', + 39: 'TRB_TRB3_INCOMPLETE_UDP', 40: 'TRB_TRB3_DATA_ERROR', + 41: 'TRB_TRB3_INVALID_UDP_HEADER'} + + +class PyTRB(object): + ''' + Wrapper class for trbnet access using python + ''' + + def __init__(self, trb_server, daq_server, path_to_lib='libtrbnet.so'): + ''' + Default Constructor. Sets enviromental variable and initialises ports. + The trbnet daemon has to be running. + + Keyword arguments: + trbserver -- string for TRB3_SERVER enviromental variable + daq_server -- string for DAQOPSERVER enviromental variable + path_to_lib -- full path to libtrbnet.so + ''' + self.trblib = ctypes.cdll.LoadLibrary(path_to_lib) + self.declare_types() + os.environ['TRB3_SERVER'] = trb_server + os.environ['DAQOPSERVER'] = daq_server + status = self.trblib.init_ports() + if status < 0: + raise Exception('Error initialising ports.') + + def __del__(self): + ''' + Destructor. + ''' + self.trblib.close_ports() + + def trb_errno(self): + ''' + Returns trb_errno flag value + ''' + return ctypes.c_int.in_dll(self.trblib, 'trb_errno').value + + def declare_types(self): + ''' + Declare argument and return types of libtrbnet calls via ctypes + ''' + self.trblib.trb_register_read.argtypes = [ctypes.c_uint16, ctypes.c_uint16, + ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint] + self.trblib.trb_register_read.restype = ctypes.c_int + self.trblib.trb_register_write.argtypes = [ctypes.c_uint16, ctypes.c_uint16, + ctypes.c_uint32] + self.trblib.trb_register_write.restype = ctypes.c_int + self.trblib.trb_register_read_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8, ctypes.c_uint16, + ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint] + self.trblib.trb_register_read_mem.restype = ctypes.c_int + self.trblib.trb_registertime_read_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8, ctypes.c_uint16, + ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint] + self.trblib.trb_registertime_read_mem.restype = ctypes.c_int + self.trblib.trb_read_uid.argtypes = [ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint] + self.trblib.trb_read_uid.restype = ctypes.c_int + self.trblib.trb_set_address.argtypes = [ctypes.c_uint64, ctypes.c_uint8, ctypes.c_uint16] + self.trblib.trb_set_address.restype = ctypes.c_int + self.trblib.network_reset.restype = ctypes.c_int + self.trblib.com_reset.restype = ctypes.c_int + self.trblib.trb_fifo_flush.argtypes = [ctypes.c_uint8] + self.trblib.trb_fifo_flush.restype = ctypes.c_int + self.trblib.trb_send_trigger.argtypes = [ctypes.c_uint8, ctypes.c_uint16, + ctypes.c_uint8, ctypes.c_uint16] + self.trblib.trb_send_trigger.restype = ctypes.c_int + self.trblib.trb_register_setbit.argtypes = [ctypes.c_uint16, ctypes.c_uint16, + ctypes.c_uint32] + self.trblib.trb_register_setbit.restype = ctypes.c_int + self.trblib.trb_register_clearbit.argtypes = [ctypes.c_uint16, ctypes.c_uint16, + ctypes.c_uint32] + self.trblib.trb_register_clearbit.restype = ctypes.c_int + self.trblib.trb_register_loadbit.argtypes = [ctypes.c_uint16, ctypes.c_uint16, + ctypes.c_uint32, ctypes.c_uint32] + self.trblib.trb_register_loadbit.restypes = ctypes.c_int + self.trblib.trb_ipu_data_read.argtypes = [ctypes.c_uint8, ctypes.c_uint8, ctypes.c_uint8, + ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32), + ctypes.c_uint] + self.trblib.trb_ipu_data_read.restypes = ctypes.c_int + self.trblib.trb_nettrace.argtypes = [ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32), + ctypes.c_uint] + self.trblib.trb_nettrace.restypes = ctypes.c_int + + + def trb_register_read(self, trb_address, reg_address): + ''' + Read value from trb register. + + Keyword arguments: + trb_address -- trb endpoint address + reg_address -- register address + + Returns: + python list [0] TRB-Address of the sender, [1] register value + ''' + data_array = (ctypes.c_uint32 * 2)() + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + status = self.trblib.trb_register_read(trb_address, reg_address, data_array, 2) + if status == -1: + raise Exception('Error while reading trb register, ' + trb_errno_dict[self.trb_errno()]) + print(self.trb_errno()) + return [data_array[i] for i in range(status)] + + def trb_register_write(self, trb_address, reg_address, value): + ''' + Write trb register + + Keyword Arguments: + trb_address -- trb endpoint address + reg_address -- register address + value -- value to write to register + ''' + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + value = ctypes.c_uint(value) + status = self.trblib.trb_register_write(trb_address, reg_address, value) + if status == -1: + raise Exception('Error while writing trb register, ' + trb_errno_dict[self.trb_errno()]) + + def trb_register_read_mem(self, trb_address, reg_address, option, size): + ''' + Perform several trb register reads + + Keyword Arguments: + trb_address -- trb endpoint address + reg_address -- register address + option -- read option, 0 = read same register several times 1 = read adjacent registers + size -- number of reads + + Returns: + python list [0] TRB-Address of the sender, [1:] register values + ''' + data_array = (ctypes.c_uint32 * (size + 2))() + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + option = ctypes.c_uint8(option) + status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2)) + if status == -1: + raise Exception('Error while reading trb register memory, ' + + trb_errno_dict[self.trb_errno()]) + return [data_array[i] for i in range(status)] + + def trb_read_uid(self, trb_address): + ''' + Read unique id of endpoint + + Keyword Arguments: + trb_address -- address of endpoint + + Returns: + python list + [0]: UID High-32Bit Word + [1]: UID Low-32Bit Word + [2]: Endpoint Number + [3]: TRB-Address of the sender + ''' + data_array = (ctypes.c_uint32 * 4)() + trb_address = ctypes.c_uint16(trb_address) + status = self.trblib.trb_read_uid(trb_address, data_array, 4) + if status == -1: + raise Exception('Error reading trb uid, ' + trb_errno_dict[self.trb_errno()]) + return [data_array[i] for i in range(status)] + + def trb_set_address(self, uid, endpoint, trb_address): + ''' + Set trb net address + + Keyword Arguments: + uid -- the unique ID of the endpoint + endpoint -- number of the trb endpoint + trb_address -- new trb address + ''' + uid = ctypes.c_uint64(uid) + endpoint = ctypes.c_uint8(endpoint) + trb_address = ctypes.c_uint16(trb_address) + status = self.trblib.trb_set_address(uid, endpoint, trb_address) + if status == -1: + raise Exception('error setting trb address, ' + trb_errno_dict[self.trb_errno()]) + +# rarely used funtions without documentation in trbnet.h +# meaning of arguments and returned data unknown + def network_reset(self): + '''TRB network reset''' + return self.trblib.network_reset() + + def com_reset(self): + '''communication reset''' + return self.trblib.com_reset() + + def trb_fifo_flush(self, channel): + '''flush trb fifo + + Keyword arguments: + channel: trb channel (ipu, slowcontrol etc)''' + channel = ctypes.c_uint8(channel) + return self.trblib.trb_fifo_flush(channel) + + def trb_send_trigger(self, trigtype, info, random, number): + '''send trigger to trb + + Keyword arguments: + trigtype: trigger type (status, calibration) + info: trigger information + random: random trigger number + number: number of triggers + ''' + trigtype = ctypes.c_uint8(trigtype) + info = ctypes.c_uint32(info) + random = ctypes.c_uint8(random) + number = ctypes.c_uint16(number) + return self.trblib.trb_send_trigger(trigtype, info, random, number) + + def trb_register_setbit(self, trb_address, reg_address, bitmask): + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + bitmask = ctypes.c_uint32(bitmask) + return self.trblib.trb_register_setbit(trb_address, reg_address, + bitmask) + + def trb_register_clearbit(self, trb_address, reg_address, bitmask): + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + bitmask = ctypes.c_uint32(bitmask) + return self.trblib.trb_register_clearbit(trb_address, reg_address, + bitmask) + + def trb_register_loadbit(self, trb_address, reg_address, bitmask, bitvalue): + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + bitmask = ctypes.c_uint32(bitmask) + bitvalue = ctypes.c_uint32(bitvalue) + return self.trblib.trb_register_loadbit(trb_address, reg_address, + bitmask, bitvalue) + + def trb_registertime_read_mem(self, trb_address, reg_address, option, size): + data_array = (ctypes.c_uint32 * (size + 2))() + trb_address = ctypes.c_uint16(trb_address) + reg_address = ctypes.c_uint16(reg_address) + option = ctypes.c_uint8(option) + status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2)) + if status == -1: + raise Exception('Error while reading trb register memory, ' + + trb_errno_dict[self.trb_errno()]) + return [data_array[i] for i in range(status)] + + def trb_ipu_data_read(self, trg_type, trg_info, trg_random, trg_number, size): + trg_type = ctypes.c_uint8(trg_type) + trg_info = ctypes.c_uint8(trg_info) + trg_random = ctypes.c_uint8(trg_random) + trg_number = ctypes.c_uint16(trg_number) + data_array = (ctypes.c_uint32 * (size + 2))() + status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(size + 2)) + if status == -1: + raise Exception('Error while reading trb ipu data, ' + + trb_errno_dict[self.trb_errno()]) + return [data_array[i] for i in range(status)] + + def trb_nettrace(self, trb_address, size): + trb_address = ctypes.c_uint16(trb_address) + data_array = (ctypes.c_uint32 * (size + 2))() + status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(size + 2)) + if status == -1: + raise Exception('Error while doing net trace, ' + + trb_errno_dict[self.trb_errno()]) + return [data_array[i] for i in range(status)] -- 2.43.0