From: Philipp Klaus Date: Fri, 1 Feb 2019 09:46:14 +0000 (+0100) Subject: New Python binding for TrbNet X-Git-Url: https://jspc29.x-matter.uni-frankfurt.de/git/?a=commitdiff_plain;h=609fdb410e2c5e26252671f5b8d0325091832a74;p=trbnettools.git New Python binding for TrbNet The new Python binding for TrbNet is based on the code originally submitted by Tobias Weber as "PyTRB.py". The upgraded code is: * a proper Python package called 'trbnet' * divided into low- and high-level classes trbnet.core.lowlevel._TrbNet() and trbnet.core.highlevel.TrbNet() * also uploaded to the Python Package Index PyPI * in this commit in the version 1.0.2 The code was for some time developed on Github in the repository https://github.com/pklaus/pytrbnet and is now being synced to trbnettools under the subfolder libtrbnet_python. --- diff --git a/libtrbnet_python/README.md b/libtrbnet_python/README.md new file mode 100644 index 0000000..49171ed --- /dev/null +++ b/libtrbnet_python/README.md @@ -0,0 +1,17 @@ +# PyTrbNet + +This is a Python package wrapping libtrbnet.so. +It allows accessing trbnet registers from Python. + +The package comes with two additional features: + +* XmlDb support. Allows to query registers by their + name as specified in the corresponding xml file. +* PCASpy-based EPICS IOC providing TrbNet register + status information to the EPICS detector control + system. + +### Resources + +* [The TRB Website](http://trb.gsi.de) +* [TrbNet Manual](http://jspc29.x-matter.uni-frankfurt.de/docu/trbnetdocu.pdf) diff --git a/libtrbnet_python/setup.py b/libtrbnet_python/setup.py new file mode 100644 index 0000000..2ac7c1c --- /dev/null +++ b/libtrbnet_python/setup.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + +try: + import pypandoc + LDESC = open('README.md', 'r').read() + LDESC = pypandoc.convert_text(LDESC, 'rst', format='md') +except (ImportError, IOError, RuntimeError) as e: + print("Could not create long description:") + print(str(e)) + LDESC = '' + +setup(name='trbnet', + version = '1.0.2', + description = 'Interface to TrbNet (wrapping libtrbnet.so with ctypes)', + long_description = LDESC, + author = 'Philipp Klaus', + author_email = 'klaus@physik.uni-frankfurt.de', + url = 'https://github.com/pklaus/pytrbnet', + license = 'GPL', + packages = [ + 'trbnet', + 'trbnet.core', + 'trbnet.xmldb', + 'trbnet.util', + 'trbnet.epics', + ], + entry_points = { + 'console_scripts': [ + 'trbcmd.py = trbnet.util.trbcmd:cli', + ], + }, + include_package_data = False, + zip_safe = True, + platforms = 'Linux', + install_requires = [ + "lxml", + "click", + "enum34", # for support of enum.IntEnum on Python < 3.4 + ], + keywords = 'TrbNet PyTrbNet FPGA Low-latency network wrapper', + classifiers = [ + 'Development Status :: 4 - Beta', + 'Operating System :: OS Independent', + 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Topic :: Scientific/Engineering :: Physics', + 'Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator', + 'Topic :: System :: Hardware :: Hardware Drivers', + ] +) diff --git a/libtrbnet_python/trbnet/__init__.py b/libtrbnet_python/trbnet/__init__.py new file mode 100644 index 0000000..a9f45c3 --- /dev/null +++ b/libtrbnet_python/trbnet/__init__.py @@ -0,0 +1,3 @@ +from .core.lowlevel import _TrbNet +from .core.highlevel import TrbNet +from .core.error import TrbException, TrbError diff --git a/libtrbnet_python/trbnet/core/__init__.py b/libtrbnet_python/trbnet/core/__init__.py new file mode 100644 index 0000000..fd2d3d1 --- /dev/null +++ b/libtrbnet_python/trbnet/core/__init__.py @@ -0,0 +1,3 @@ +from .highlevel import TrbNet +from .lowlevel import _TrbNet +from .error import TrbException, TrbError diff --git a/libtrbnet_python/trbnet/core/error.py b/libtrbnet_python/trbnet/core/error.py new file mode 100644 index 0000000..923e19e --- /dev/null +++ b/libtrbnet_python/trbnet/core/error.py @@ -0,0 +1,61 @@ +import enum + +class TrbException(Exception): + def __init__(self, msg, errno, errorstr): + # Set some exception infomation + self.msg = msg + try: + self.errno = TrbError(errno) + except ValueError: + self.errno = errno + self.errorstr = errorstr + +@enum.unique +class TrbError(enum.IntEnum): + """ + Copied from trbnettools/libtrbnet/trberror.h + as the names of the enum values are not accessible + from within the library... + """ + TRB_NONE = 0 + TRB_TX_BUSY = 1 + TRB_FIFO_NOT_EMPTY = 2 + TRB_FIFO_TIMEOUT = 3 + TRB_FIFO_HEADERS = 4 + TRB_FIFO_SEQUENZ = 5 + TRB_FIFO_INVALID_MODE = 6 + TRB_FIFO_INCOMPLETE_PACKAGE = 7 + TRB_FIFO_INVALID_HEADER = 8 + TRB_FIFO_MISSING_TERM_HEADER = 9 + TRB_FAILED_WAIT_IS_VALID = 10 + TRB_FAILED_WAIT_IS_NOT_VALID = 11 + TRB_USER_BUFFER_OVF = 12 + TRB_INVALID_CHANNEL = 13 + TRB_INVALID_PKG_NUMBER = 14 + TRB_STATUS_ERROR = 15 + TRB_INVALID_ADDRESS = 16 + TRB_INVALID_LENGTH = 17 + TRB_ENDPOINT_NOT_REACHED = 18 + TRB_DMA_UNAVAILABLE = 19 + TRB_DMA_TIMEOUT = 20 + TRB_READMEM_INVALID_SIZE = 21 + TRB_HDR_DLEN = 22 + TRB_PEXOR_OPEN = 23 + TRB_SEMAPHORE = 24 + TRB_FIFO_SHARED_MEM = 25 + TRB_STATUS_WARNING = 26 + TRB_RPC_ERROR = 27 + TRB_PEXOR_DATA_ERROR = 28 + TRB_PEXOR_DEVICE_ERROR = 29 + TRB_PEXOR_DEVICE_TRB_TIMEOUT = 30 + TRB_PEXOR_DEVICE_POLLING_TIMEOUT = 31 + TRB_PEXOR_DEVICE_DMA_EMPTY = 32 + TRB_PEXOR_DEVICE_INVALID_DMA_SIZE = 33 + TRB_PEXOR_DEVICE_LOST_CREDENTIAL = 34 + TRB_PEXOR_DEVICE_FIFO_TRANSFER = 35 + TRB_TRB3_CMD_NOT_SUPPORTED = 36 + TRB_TRB3_SOCKET_ERROR = 37 + TRB_TRB3_SOCKET_TIMEOUT = 38 + TRB_TRB3_INCOMPLETE_UDP = 39 + TRB_TRB3_DATA_ERROR = 40 + TRB_TRB3_INVALID_UDP_HEADER = 41 diff --git a/libtrbnet_python/trbnet/core/highlevel.py b/libtrbnet_python/trbnet/core/highlevel.py new file mode 100644 index 0000000..2d441b6 --- /dev/null +++ b/libtrbnet_python/trbnet/core/highlevel.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +from .lowlevel import _TrbNet + + +class TrbNet(_TrbNet): + ''' + High level wrapper providing utility functions for the TrbNet class + ''' + + def register_read(self, trb_address, reg_address): + lin_data = super().trb_register_read(trb_address, reg_address) + if (len(lin_data) % 2) != 0: + raise ValueError("len(lin_data) == %d - expected a multiple of %d" % (len(lin_data), 2)) + result = self._get_dynamic_trb_address_dict(lin_data, force_length=1) + return {key: value[0] for key, value in result.items()} + + def register_read_mem(self, trb_address, reg_address, option, size): + lin_data = super().trb_register_read_mem(trb_address, reg_address, option, size) + return self._get_dynamic_trb_address_dict(lin_data) + + def read_uid(self, trb_address): + ''' + Read unique id of TrbNet nodes + + Arguments: + trb_address -- node(s) to be queried + + Returns: + dict -- the keys being (uid, endpoint) and the associated value the currently assigned trb address + ''' + lin_data = super().trb_read_uid(trb_address) + if (len(lin_data) % 4) != 0: + raise ValueError("len(lin_data) == %d - expected a multiple of %d" % (len(lin_data), 4)) + responses = [lin_data[pos:pos+4] for pos in range(0, len(lin_data), 4)] + uid_dict = {((r[0] << 32) + r[1], r[2]): r[3] for r in responses} + return uid_dict + + def _get_dynamic_trb_address_dict(self, lin_data, force_length=0): + """ + A utility function to structure response data from the + trb_register_read() and trb_register_read_mem() functions. + As multiple TrbNet nodes can reply to a single request, it + splits the linar response into the data from each eandpoint. + Returns a dictionary with the trb_address as keys and the + respective data as values. + + Returns: + dict -- key: trb_address, value: list(int) (32-bit words) + """ + trb_address_responses = {} + offset = 0 + while len(lin_data) > offset: + header = lin_data[offset] + offset += 1 + length, trb_address = (header >> 16), (header & 0xffff) + if force_length: length = force_length + trb_address_responses[trb_address] = lin_data[offset:offset+length] + offset += length + return trb_address_responses diff --git a/libtrbnet_python/trbnet/core/libutils.py b/libtrbnet_python/trbnet/core/libutils.py new file mode 100644 index 0000000..589a25a --- /dev/null +++ b/libtrbnet_python/trbnet/core/libutils.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- + +def _find_lib(inp_lib_name): + """ + Find location of a dynamic library + Idea found in + """ + # Test 1: if LIBTRBNET env var is set, use it. + import os + dllpath = os.environ.get('LIBTRBNET', None) + if dllpath is not None: + dllpath = os.path.expanduser(dllpath) + if os.path.exists(dllpath) and os.path.isfile(dllpath): + return dllpath + + # Test 2: look in installed python location for dll (not used right now) + #dllpath = resource_filename('trbnet.clibs', clib_search_path(inp_lib_name)) + #if (os.path.exists(dllpath) and os.path.isfile(dllpath)): + # return dllpath + + # Test 3: look for library in LD_LIBRARY_PATH with ctypes.util + import ctypes.util + dllpath = ctypes.util._findLib_ld(inp_lib_name) if hasattr(ctypes.util, '_findLib_ld') else None + if dllpath is not None: + return dllpath + + raise NameError('cannot find lib'+inp_lib_name) diff --git a/libtrbnet/PyTRB.py b/libtrbnet_python/trbnet/core/lowlevel.py similarity index 60% rename from libtrbnet/PyTRB.py rename to libtrbnet_python/trbnet/core/lowlevel.py index 00a23c2..2f1be02 100644 --- a/libtrbnet/PyTRB.py +++ b/libtrbnet_python/trbnet/core/lowlevel.py @@ -2,60 +2,71 @@ import ctypes import os +from .error import TrbException + # TODO: use warnings to indicate access to wrong register or no data -# dictionary with trb error strings. Copied from trberror.h -# TODO: better way to access this directly in libtrbnet -trb_errno_dict = {0: 'TRB_NONE', 1: 'TRB_TX_BUSY', 2: 'TRB_FIFO_NOT_EMPTY', - 3: 'TRB_FIFO_TIMEOUT', 4: 'TRB_FIFO_HEADERS', 5: 'TRB_FIFO_SEQUENZ', - 6: 'TRB_FIFO_INVALID_MODE', 7: 'TRB_FIFO_INCOMPLETE_PACKAGE', - 8: 'TRB_FIFO_INVALID_HEADER', 9: 'TRB_FIFO_MISSING_TERM_HEADER', - 10: 'TRB_FAILED_WAIT_IS_VALID', 11: 'TRB_FAILED_WAIT_IS_NOT_VALID', - 12: 'TRB_USER_BUFFER_OVF', 13: 'TRB_INVALID_CHANNEL', - 14: 'TRB_INVALID_PKG_NUMBER', 15: 'TRB_STATUS_ERROR', - 16: 'TRB_INVALID_ADDRESS', 17: 'TRB_INVALID_LENGTH', - 18: 'TRB_ENDPOINT_NOT_REACHED', 19: 'TRB_DMA_UNAVAILABLE', - 20: 'TRB_DMA_TIMEOUT', 21: 'TRB_READMEM_INVALID_SIZE', - 22: 'TRB_HDR_DLEN', 23: 'TRB_PEXOR_OPEN', 24: 'TRB_SEMAPHORE', - 25: 'TRB_FIFO_SHARED_MEM', 26: 'TRB_STATUS_WARNING', - 27: 'TRB_RPC_ERROR', 28: 'TRB_PEXOR_DATA_ERROR', - 29: 'TRB_PEXOR_DEVICE_ERROR', 30: 'TRB_PEXOR_DEVICE_TRB_TIMEOUT', - 31: 'TRB_PEXOR_DEVICE_POLLING_TIMEOUT', 32: 'TRB_PEXOR_DEVICE_DMA_EMPTY', - 33: 'TRB_PEXOR_DEVICE_INVALID_DMA_SIZE', 34: 'TRB_PEXOR_DEVICE_LOST_CREDENTIAL', - 35: 'TRB_PEXOR_DEVICE_FIFO_TRANSFER', 36: 'TRB_TRB3_CMD_NOT_SUPPORTED', - 37: 'TRB_TRB3_SOCKET_ERROR', 38: 'TRB_TRB3_SOCKET_TIMEOUT', - 39: 'TRB_TRB3_INCOMPLETE_UDP', 40: 'TRB_TRB3_DATA_ERROR', - 41: 'TRB_TRB3_INVALID_UDP_HEADER'} - - -class PyTRB(object): + +class TrbTerm(ctypes.Structure): + """ + The TRB_TERM C Struct representing the information + carried by network termination packets. + """ + _fields_ = [ ("status_common", ctypes.c_uint16), + ("status_channel", ctypes.c_uint16), + ("sequence", ctypes.c_uint16), + ("channel", ctypes.c_uint8) ] + +class _TrbNet(object): ''' Wrapper class for trbnet access using python ''' - def __init__(self, trb_server, daq_server, path_to_lib='libtrbnet.so'): + def __init__(self, libtrbnet=None, daqopserver=None, trb3_server=None, buffersize=4194304): ''' - Default Constructor. Sets enviromental variable and initialises ports. - The trbnet daemon has to be running. + Constructor for the low level TrbNet class. + Loads the shared library (libtrbnet), sets enviromental variables and initialises ports. + + Depending on the version of libtrbnet.so used, the connection to TrbNet is established + either by directly connecting to a TRB board (trbnettools/libtrbnet/libtrbnet.so) or + by connecting to a trbnet daemon instance (if trbnettools/trbnetd/libtrbnet.so is used). + Selecting the library can happen by: + - Specifying the full path to the library via the libtrbnet keyword argument. + - Specifying the full path to the library via the environment variable LIBTRBNET. + - Adding the directory, libtrbnet.so resides in to the environment variable LD_LIBRARY_PATH. + + To specify the peer to connect to, the environment variables DAQOPSERVER, TRB3_SERVER + are read. For easier scripting, those environment variables can also be set inside this + constructor if the keywords arguments daqopserver or trb3_server are specified. Keyword arguments: - trbserver -- string for TRB3_SERVER enviromental variable - daq_server -- string for DAQOPSERVER enviromental variable - path_to_lib -- full path to libtrbnet.so + libtrbnet -- full path to libtrbnet.so + daqopserver -- optional override of the DAQOPSERVER enviromental variable + trb3_server -- optional override of the TRB3_SERVER enviromental variable + buffersize -- Size of the buffer in 32-bit words when reading back data (default: 16MiB) ''' - self.trblib = ctypes.cdll.LoadLibrary(path_to_lib) + if trb3_server: os.environ['TRB3_SERVER'] = trb3_server + if daqopserver: os.environ['DAQOPSERVER'] = daqopserver + self.buffersize = buffersize + if not libtrbnet: + from .libutils import _find_lib + libtrbnet =_find_lib('trbnet') + self.libtrbnet = libtrbnet + self.trblib = ctypes.cdll.LoadLibrary(libtrbnet) self.declare_types() - os.environ['TRB3_SERVER'] = trb_server - os.environ['DAQOPSERVER'] = daq_server status = self.trblib.init_ports() if status < 0: - raise Exception('Error initialising ports.') + errno = self.trb_errno() + raise TrbException('Error initialising ports.', errno, self.trb_errorstr(errno)) def __del__(self): ''' Destructor. ''' - self.trblib.close_ports() + try: + self.trblib.close_ports() + except AttributeError: + pass def trb_errno(self): ''' @@ -63,6 +74,10 @@ class PyTRB(object): ''' return ctypes.c_int.in_dll(self.trblib, 'trb_errno').value + def trb_term(self): + term = TrbTerm.in_dll(self.trblib, 'trb_term') + return (term.status_common, term.status_channel, term.sequence, term.channel) + def declare_types(self): ''' Declare argument and return types of libtrbnet calls via ctypes @@ -106,34 +121,52 @@ class PyTRB(object): self.trblib.trb_nettrace.argtypes = [ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint] self.trblib.trb_nettrace.restypes = ctypes.c_int + self.trblib.trb_errorstr.argtypes = [ctypes.c_int] + self.trblib.trb_errorstr.restype = ctypes.c_char_p + self.trblib.trb_termstr.argtypes = [TrbTerm] + self.trblib.trb_termstr.restype = ctypes.c_char_p + def trb_errorstr(self, errno): + ''' + Get error string for an integer error number. + + Arguments: + errno -- error number + + Returns: + python str with description of the error + ''' + errno = ctypes.c_int(errno) + _result = self.trblib.trb_errorstr(errno) + return _result.decode('ascii') + def trb_register_read(self, trb_address, reg_address): ''' Read value from trb register. - Keyword arguments: - trb_address -- trb endpoint address + Arguments: + trb_address -- node(s) to read from reg_address -- register address Returns: python list [0] TRB-Address of the sender, [1] register value ''' - data_array = (ctypes.c_uint32 * 2)() + data_array = (ctypes.c_uint32 * self.buffersize)() trb_address = ctypes.c_uint16(trb_address) reg_address = ctypes.c_uint16(reg_address) - status = self.trblib.trb_register_read(trb_address, reg_address, data_array, 2) + status = self.trblib.trb_register_read(trb_address, reg_address, data_array, self.buffersize) if status == -1: - raise Exception('Error while reading trb register, ' + trb_errno_dict[self.trb_errno()]) - print(self.trb_errno()) + errno = self.trb_errno() + raise TrbException('Error while reading trb register.', errno, self.trb_errorstr(errno)) return [data_array[i] for i in range(status)] def trb_register_write(self, trb_address, reg_address, value): ''' Write trb register - Keyword Arguments: - trb_address -- trb endpoint address + Arguments: + trb_address -- node(s) to write to reg_address -- register address value -- value to write to register ''' @@ -142,14 +175,15 @@ class PyTRB(object): value = ctypes.c_uint(value) status = self.trblib.trb_register_write(trb_address, reg_address, value) if status == -1: - raise Exception('Error while writing trb register, ' + trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error while writing trb register.', errno, self.trb_errorstr(errno)) def trb_register_read_mem(self, trb_address, reg_address, option, size): ''' Perform several trb register reads - Keyword Arguments: - trb_address -- trb endpoint address + Arguments: + trb_address -- node(s) to read from reg_address -- register address option -- read option, 0 = read same register several times 1 = read adjacent registers size -- number of reads @@ -157,43 +191,47 @@ class PyTRB(object): Returns: python list [0] TRB-Address of the sender, [1:] register values ''' - data_array = (ctypes.c_uint32 * (size + 2))() + + data_array = (ctypes.c_uint32 * self.buffersize)() trb_address = ctypes.c_uint16(trb_address) reg_address = ctypes.c_uint16(reg_address) option = ctypes.c_uint8(option) - status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2)) + status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(self.buffersize)) if status == -1: - raise Exception('Error while reading trb register memory, ' + - trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error while reading trb register memory.', + errno, self.trb_errorstr(errno)) return [data_array[i] for i in range(status)] def trb_read_uid(self, trb_address): ''' - Read unique id of endpoint + Read unique id(s) of TrbNet node(s) - Keyword Arguments: - trb_address -- address of endpoint + Arguments: + trb_address -- node(s) to be queried Returns: - python list - [0]: UID High-32Bit Word - [1]: UID Low-32Bit Word - [2]: Endpoint Number - [3]: TRB-Address of the sender + python list, length is a multiple of 4 + [i+0]: UID High-32Bit Word + [i+1]: UID Low-32Bit Word + [i+2]: Endpoint Number + [i+3]: TRB-Address of the sender ''' - data_array = (ctypes.c_uint32 * 4)() + data_array = (ctypes.c_uint32 * self.buffersize)() trb_address = ctypes.c_uint16(trb_address) - status = self.trblib.trb_read_uid(trb_address, data_array, 4) + status = self.trblib.trb_read_uid(trb_address, data_array, self.buffersize) if status == -1: - raise Exception('Error reading trb uid, ' + trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error reading trb uid.', + errno, self.trb_errorstr(errno)) return [data_array[i] for i in range(status)] def trb_set_address(self, uid, endpoint, trb_address): ''' Set trb net address - Keyword Arguments: - uid -- the unique ID of the endpoint + Arguments: + uid -- the unique ID of the node endpoint -- number of the trb endpoint trb_address -- new trb address ''' @@ -202,7 +240,9 @@ class PyTRB(object): trb_address = ctypes.c_uint16(trb_address) status = self.trblib.trb_set_address(uid, endpoint, trb_address) if status == -1: - raise Exception('error setting trb address, ' + trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error setting trb address.', + errno, self.trb_errorstr(errno)) # rarely used funtions without documentation in trbnet.h # meaning of arguments and returned data unknown @@ -217,7 +257,7 @@ class PyTRB(object): def trb_fifo_flush(self, channel): '''flush trb fifo - Keyword arguments: + Arguments: channel: trb channel (ipu, slowcontrol etc)''' channel = ctypes.c_uint8(channel) return self.trblib.trb_fifo_flush(channel) @@ -225,7 +265,7 @@ class PyTRB(object): def trb_send_trigger(self, trigtype, info, random, number): '''send trigger to trb - Keyword arguments: + Arguments: trigtype: trigger type (status, calibration) info: trigger information random: random trigger number @@ -260,14 +300,14 @@ class PyTRB(object): bitmask, bitvalue) def trb_registertime_read_mem(self, trb_address, reg_address, option, size): - data_array = (ctypes.c_uint32 * (size + 2))() + data_array = (ctypes.c_uint32 * self.buffersize)() trb_address = ctypes.c_uint16(trb_address) reg_address = ctypes.c_uint16(reg_address) option = ctypes.c_uint8(option) - status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2)) + status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(self.buffersize)) if status == -1: - raise Exception('Error while reading trb register memory, ' + - trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error while reading trb register memory.', errno, self.trb_errorstr(errno)) return [data_array[i] for i in range(status)] def trb_ipu_data_read(self, trg_type, trg_info, trg_random, trg_number, size): @@ -275,18 +315,33 @@ class PyTRB(object): trg_info = ctypes.c_uint8(trg_info) trg_random = ctypes.c_uint8(trg_random) trg_number = ctypes.c_uint16(trg_number) - data_array = (ctypes.c_uint32 * (size + 2))() - status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(size + 2)) + data_array = (ctypes.c_uint32 * self.buffersize)() + status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(self.buffersize)) if status == -1: - raise Exception('Error while reading trb ipu data, ' + - trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error while reading trb ipu data.', errno, self.trb_errorstr(errno)) return [data_array[i] for i in range(status)] def trb_nettrace(self, trb_address, size): trb_address = ctypes.c_uint16(trb_address) - data_array = (ctypes.c_uint32 * (size + 2))() - status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(size + 2)) + data_array = (ctypes.c_uint32 * self.buffersize)() + status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(self.buffersize)) if status == -1: - raise Exception('Error while doing net trace, ' + - trb_errno_dict[self.trb_errno()]) + errno = self.trb_errno() + raise TrbException('Error while doing net trace.', errno, self.trb_errorstr(errno)) return [data_array[i] for i in range(status)] + + def trb_termstr(self, term): + ''' + Get string representation for network termination packet info tuple. + + Arguments: + term -- network termination packet info + (TrbTerm instance or tuple of four integer values) + + Returns: + python str with description of the error + ''' + if isinstance(term, tuple): term = TrbTerm(*term) + _result = self.trblib.trb_termstr(term) + return _result.decode('ascii') diff --git a/libtrbnet_python/trbnet/epics/__init__.py b/libtrbnet_python/trbnet/epics/__init__.py new file mode 100644 index 0000000..2a44ae5 --- /dev/null +++ b/libtrbnet_python/trbnet/epics/__init__.py @@ -0,0 +1 @@ +from .pcaspy_ioc import TrbNetIOC diff --git a/libtrbnet_python/trbnet/epics/helpers.py b/libtrbnet_python/trbnet/epics/helpers.py new file mode 100644 index 0000000..e3163de --- /dev/null +++ b/libtrbnet_python/trbnet/epics/helpers.py @@ -0,0 +1,28 @@ +import logging + +class SeenBeforeFilter(logging.Filter): + """ + A filter for the Python logging module rejecting subsequent + log entries when they have been logged before. + The optional keyword argument restriction_func can restrict + the filtering to a subset of messages. + If present, it will be called with the tuple (module, levelno, msg, args). + If it evaluates to true, the filtering is done, otherwise not. + """ + def __init__(self, restriction_func=None): + self.restriction_func = restriction_func + self.seen_before = [] + super().__init__() + def filter(self, record): + # add other fields if you need more granular comparison, depends on your app + current_log = (record.module, record.levelno, record.msg, record.args) + if self.restriction_func: + if not self.restriction_func(current_log): + # we do not restrict the filtering to this log entry, + # so just log it as usual... + return True + if current_log not in self.seen_before: + self.seen_before.append(current_log) + return True + return False + diff --git a/libtrbnet_python/trbnet/epics/pcaspy_ioc.py b/libtrbnet_python/trbnet/epics/pcaspy_ioc.py new file mode 100755 index 0000000..fe099cb --- /dev/null +++ b/libtrbnet_python/trbnet/epics/pcaspy_ioc.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +import time, threading, logging + +from trbnet.core import TrbNet, TrbException +from trbnet.xmldb import XmlDb +from trbnet.util.trbcmd import _xmlget as xmlget, _xmlentry as xmlentry + +from pcaspy import Driver, SimpleServer, Alarm, Severity +from pcaspy.driver import manager + +from .helpers import SeenBeforeFilter + +t = TrbNet() +db = XmlDb() + +logger = logging.getLogger('trbnet.epics.pcaspy_ioc') +logging.basicConfig( + format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +restr_func = lambda log: log[2].startswith('register missing') +logger.addFilter(SeenBeforeFilter(restriction_func=restr_func)) + + +class TrbNetIOC(object): + + def __init__(self): + self.prefix = '' + self._initialized = False + self._subscriptions = [] + self._pvdb = {} + self._pvdb_manager = None + self._expected_trb_addresses = {} + + def before_initialization(func): + def func_wrapper(self, *args, **kwargs): + if self._initialized: + raise NameError('Cannot use the method .%s() after running .initialize().' % func.__name__) + return func(self, *args, **kwargs) + return func_wrapper + + @before_initialization + def add_subscription(self, trb_address, entity, name): + self._subscriptions.append((trb_address, entity, name)) + + @before_initialization + def add_expected_trb_addresses(self, send_to_trb_address, answer_from_trb_addresses): + self._expected_trb_addresses[send_to_trb_address] = answer_from_trb_addresses + + def initialize(self): + self._pvdb_manager = PvdbManager(self._pvdb, self._expected_trb_addresses) + self._pvdb_manager.initialize(self._subscriptions) + self._initialized = True + + @property + def all_pvs(self): + if not self._initialized: + raise NameError("Please run .initialize() first") + return [self.prefix + pv for pv in self._pvdb.keys()] + + def run(self): + if not self._initialized: + self.initialize() + + server = SimpleServer() + server.createPV(self.prefix, self._pvdb) + driver = TrbNetIocDriver(self._subscriptions, self._pvdb_manager) + + while True: + # process CA transactions + server.process(0.1) + + +class PvdbManager(object): + + def __init__(self, pvdb, expected_trb_addresses): + self._pvdb = pvdb + self._expected_trb_addresses = expected_trb_addresses + + def _add(self, identifier, definition): + self._pvdb[identifier] = { + 'type': TYPE_MAPPING[definition['format']][0], + 'unit': definition['unit'], + } + if definition['format'] == 'enum': + choices = definition['meta']['choices'] + vals = list(choices.keys()) + min_val, max_val = min(vals), max(vals) + enums = [] + for i in range(max_val+1): + enums.append(choices[i] if i in choices else 'n/a') + self._pvdb[identifier]['enums'] = enums + if definition['format'] == 'boolean' and TYPE_MAPPING[definition['format']][0] == 'enum': + self._pvdb[identifier]['enums'] = ['false', 'true'] + + def initialize(self, subscriptions): + for trb_address, entity, name in subscriptions: + if trb_address in self._expected_trb_addresses: + answer_from_trb_addresses = self._expected_trb_addresses[trb_address] + for info in xmlentry(entity, name): + slices = len(info['reg_addresses']) + for slice in range(slices): + slice = slice if slices > 1 else None + for answer_from_trb_address in answer_from_trb_addresses: + identifier = db._get_field_identifier(entity, info['field_name'], answer_from_trb_address, slice=slice) + definition = db._get_field_info(entity, info['field_name']) + self._add(identifier, definition) + else: + for data in xmlget(trb_address, entity, name, logger=logger): + self._add(data['context']['identifier'], data) + +class TrbNetIocDriver(Driver): + + def __init__(self, subscriptions, scan_period=1.0): + Driver.__init__(self) + self.scan_period = 1.0 + self.subscriptions = subscriptions + self.start() + + def start(self): + if self.scan_period > 0: + self.tid = threading.Thread(target=self.scan_all) + self.tid.setDaemon(True) + self.tid.start() + + def scan_all(self): + last_time = time.time() + while True: + for subscription in self.subscriptions: + trb_address, entity, element = subscription + for data in xmlget(trb_address, entity, element, logger=logger): + reason = data['context']['identifier'] + try: + self.pvDB[reason].mask = 0 + self.setParamStatus(reason, Alarm.NO_ALARM, Severity.NO_ALARM) + self.setParam(reason, data['value'][TYPE_MAPPING[data['format']][1]]) + manager.pvs[self.port][reason].updateValue(self.pvDB[reason]) + except Exception as e: + logger.error(str(e)) + + # if the process was suspended, reset last_time: + if time.time() - last_time > self.scan_period: + last_time = time.time() + + time.sleep(max(0.0, self.scan_period - (time.time() - last_time))) + last_time += self.scan_period + +TYPE_MAPPING = { + # pcaspy types: 'enum', 'string', 'char', 'float' or 'int' + 'unsigned': ('int', 'python'), + 'integer': ('int', 'python'), + 'signed': ('int', 'python'), + 'hex': ('int', 'python'), + 'boolean': ('enum', 'raw'), + 'bitmask': ('int', 'raw'), + 'enum': ('enum', 'raw'), + 'float': ('float', 'python'), + #'time': ('char', 'string'), + 'time': ('int', 'raw'), + #'binary': ('char', 'string'), + 'binary': ('int', 'raw'), +} diff --git a/libtrbnet_python/trbnet/util/trbcmd.py b/libtrbnet_python/trbnet/util/trbcmd.py new file mode 100755 index 0000000..8b2fa56 --- /dev/null +++ b/libtrbnet_python/trbnet/util/trbcmd.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python + +import click, time, logging +from trbnet import TrbNet, TrbException, TrbError +from trbnet.xmldb import XmlDb + +t = TrbNet() +logger = logging.getLogger('trbnet.util.trbcmd') + +### Helpers + +def _status_warning(): + if t.trb_errno() == TrbError.TRB_STATUS_WARNING: + return "Status-Bit(s) have been set:\n" + t.trb_termstr(t.trb_term()) + else: + return None + +### Definition of a Python API to the functions later exposed by the CLI + +def _r(trb_address, register): + response = t.register_read(trb_address, register) + for endpoint in response: + str_data = '{:08X}'.format(response[endpoint]) + print("endpoint 0x{:08X} responded with: {}".format(endpoint, str_data)) + +def _rm(trb_address, register, size, mode): + response = t.register_read_mem(trb_address, register, mode, size) + for endpoint in response: + str_data = ' '.join('{:08X}'.format(word) for word in response[endpoint]) + print("endpoint 0x{:08X} responded with: {}".format(endpoint, str_data)) + status_warning = _status_warning() + if status_warning: logger.warning(status_warning) + +def _xmlentry(entity, name): + db = XmlDb() + reg_addresses = db._get_all_element_addresses(entity, name) + for field_name in db._contained_fields(entity, name): + reg_addresses = db._get_all_element_addresses(entity, field_name) + yield {'entity': entity, 'field_name': field_name, 'reg_addresses': reg_addresses} + +def _xmlget(trb_address, entity, name, logger=logger): + db = XmlDb() + register_blocks = db._determine_continuous_register_blocks(entity, name) + all_data = {} # dictionary with {'reg_address': {'trb_address': int, ...}, ...} + for start, size in register_blocks: + if size > 1: + try: + response = t.register_read_mem(trb_address, start, 0, size) + except TrbException as e: + if logger: logger.error("TRB Error happened: %s -- Continuing anyways.", repr(e)) + continue + except Exception as e: + if logger: logger.error("Other error happened: %s -- Continuing anyways.", repr(e)) + continue + for response_trb_address, data in response.items(): + if not data: + continue + for reg_address, word in enumerate(data, start=start): + if reg_address not in all_data: + all_data[reg_address] = {} + all_data[reg_address][response_trb_address] = word + else: + reg_address = start + try: + response = t.register_read(trb_address, reg_address) + except TrbException as e: + if logger: logger.error("TRB Error happened: %s -- Continuing anyways.", repr(e)) + continue + except Exception as e: + if logger: logger.error("Other error happened: %s -- Continuing anyways.", repr(e)) + continue + for response_trb_address, word in response.items(): + if reg_address not in all_data: + all_data[reg_address] = {} + all_data[reg_address][response_trb_address] = word + for field_name in db._contained_fields(entity, name): + reg_addresses = db._get_all_element_addresses(entity, field_name) + slices = len(reg_addresses) + for slice, reg_address in enumerate(reg_addresses): + if reg_address not in all_data: + fmt = "register missing in response: %s (addr 0x%04x)" + if logger: logger.warning(fmt, field_name, reg_address) + continue + for response_trb_address, value in all_data[reg_address].items(): + data = db.convert_field(entity, field_name, value, trb_address=response_trb_address, slice=slice if slices > 1 else None) + yield data + +### Definition of the CLI with the help of the click package: + +class BasedIntParamType(click.ParamType): + name = 'integer' + def convert(self, value, param, ctx): + try: + if value[:2].lower() == '0x': + return int(value[2:], 16) + elif value[:1] == '0': + return int(value, 8) + return int(value, 10) + except ValueError: + self.fail('%s is not a valid integer' % value, param, ctx) + +BASED_INT = BasedIntParamType() + +@click.group() +def cli(): + pass + +@cli.command() +@click.argument('trb_address', type=BASED_INT) +@click.argument('register', type=BASED_INT) +def r(trb_address, register): + click.echo('Reading register') + _r(trb_address, register) + +@cli.command() +@click.argument('trb_address', type=BASED_INT) +@click.argument('register', type=BASED_INT) +@click.argument('size', type=BASED_INT) +@click.argument('mode', type=BASED_INT) +def rm(trb_address, register, size, mode): + click.echo('Reading register memory') + _rm(trb_address, register, size, mode) + +@cli.command() +@click.argument('entity') +@click.argument('name') +def xmlentry(entity, name): + click.echo('Searching xml register entry') + for info in _xmlentry(entity, name): + slices = len(info['reg_addresses']) + info['reg_addresses'] = ', '.join('0x{:04x}'.format(addr) for addr in info['reg_addresses']) + info['slices'] = ' (%d slices)' % slices if slices > 1 else '' + print("ENTITY: {entity:10s} FIELD: {field_name:20s} REGISTER(s): {reg_addresses} {slices}".format(**info)) + +@cli.command() +@click.argument('trb_address', type=BASED_INT) +@click.argument('entity') +@click.argument('name') +def xmlget(trb_address, entity, name): + click.echo('Querying xml register entry from TrbNet') + for data in _xmlget(trb_address, entity, name): + print("{context[identifier]} {value[unicode]} {unit}".format(**data)) + +if __name__ == '__main__': + cli() diff --git a/libtrbnet_python/trbnet/xmldb/__init__.py b/libtrbnet_python/trbnet/xmldb/__init__.py new file mode 100644 index 0000000..bfae94a --- /dev/null +++ b/libtrbnet_python/trbnet/xmldb/__init__.py @@ -0,0 +1 @@ +from .db import XmlDb diff --git a/libtrbnet_python/trbnet/xmldb/db.py b/libtrbnet_python/trbnet/xmldb/db.py new file mode 100644 index 0000000..836de7b --- /dev/null +++ b/libtrbnet_python/trbnet/xmldb/db.py @@ -0,0 +1,312 @@ +import os +import enum +from datetime import datetime as dt +from lxml import etree + +class XmlDb(object): + ''' + XmlDb is an object representing the XML database used to describe + registers of TrbNet systems. In this context, "XML database" means: + a set of XML files in a single folder obeying a specific schema. + + The database folder can be provided by the environment variable + 'XMLDB' or by specifying the folder as keyword argument when + instantiating the class: + + >>> db = XmlDb(folder='./path/to/daqtools/xml-db/database/') + ''' + + TOP_ENTITY = 'TrbNetEntity' + ENTITY_TAGS = ('field', 'register', 'group', 'TrbNetEntity') + + def __init__(self, folder=None): + if folder is None: + folder = os.environ.get('XMLDB', '.') + folder = os.path.expanduser(folder) + self.folder = folder + self._cache_xml_docs = {} + self._cache_elements = {} + self._cache_field_hierarchy = {} + self._cache_field_info = {} + + def _get_xml_doc(self, entity): + # Try to fetch xmldoc from cache and return it: + if entity in self._cache_xml_docs: + return self._cache_xml_docs[entity] + # Otherwise parse the .xml file and add it to the cache: + xml_path = os.path.join(self.folder, entity + '.xml') + xml_doc = etree.parse(xml_path) + ## check schema? + #xmlschema_doc = etree.parse(xsd_path) + #xmlschema = etree.XMLSchema(xmlschema_doc) + #result = xmlschema.validate(xml_doc) + self._cache_xml_docs[entity] = xml_doc + return xml_doc + + def _get_elements_by_name_attr(self, entity, name_attr, tag='*', amount=None): + ''' + Finds and returns elements from the entity XML tree with the attribute + 'name' having the value of this method's argument name_attr. + + Arguments: + tag -- Can be pin the elements to search for to specific tag names. Default: wildcard + amount -- If set to an integer {0, 2, 3, ...}, the returned list will contain this amount of elements. + ''' + # Try to fetch the elements from the cache + key = (entity, name_attr, tag) + if key in self._cache_elements: + results = self._cache_elements[key] + # Otherwise, fetch them from the XML file: + else: + xml_doc = self._get_xml_doc(entity) + results = xml_doc.findall("//"+tag+"[@name='"+name_attr+"']") + self._cache_elements[key] = results + # Check if we found the right amount of elements + if amount is not None and len(results) != amount: + fmt = "Could not find the desired amount of tags with attribute name=%s: found %d instead of %d" + raise ValueError(fmt % (name_attr, len(results), amount)) + # Return + return results + + def _get_unique_element_by_name_attr(self, entity, name_attr, tag='*'): + results = self._get_elements_by_name_attr(entity, name_attr, tag=tag) + # Return the result if we found a single one and raise exceptions otherwise: + if len(results) == 1: + return results[0] + elif len(results) == 0: + raise ValueError("No such element found: tag '%s' with attribute name=%s" % (tag, name_attr)) + else: + fmt = "Non-unique search for tag '%s' with attribute name=%s! XML Database Error!" + raise ValueError(fmt % (tag, name_attr)) + + def _get_single_element_by_name_attr_prefer_field(self, entity, name_attr): + ''' + This function will try to find a unique element: + * First, it tries if a field' element with that name attribute exists. + (Unique only among the 'field' elements) + * If that fails with no result, it retries extending the search to any + unique element (tag) with the given name attribute. + ''' + try: + return self._get_unique_element_by_name_attr(entity, name_attr, tag='field') + except: + pass + return self._get_unique_element_by_name_attr(entity, name_attr, tag='*') + + def find_field(self, entity, field): + return self._get_single_element_by_name_attr_prefer_field(entity, field) + + def _get_element_addressing(self, entity, element): + ''' + Determine the addressing of an element in the database: + + * base_address: The address of the first register + * slices: The number of slices for this (or a parent) element, + set to None if there are no repetitions. + * stepsize: The stepsize to determine the address for any additional slices + * size: The size (number of elements) of the requested element + + Returns: + tuple -- (base_address, slices, stepsize, size) + ''' + if type(element) == str: + element = self._get_single_element_by_name_attr_prefer_field(entity, element) + base_address = 0 + slices = None + size = int(element.get('size', '1')) + stepsize = 0 + node = element + while node.tag in self.ENTITY_TAGS: + base_address += int(node.get('address', '0'), 16) + if node.tag == self.TOP_ENTITY: break + repeat = int(node.get('repeat', 1)) + if repeat != 1: + slices = repeat + stepsize = int(node.get('size', '0'), 10) + node = node.getparent() + return (base_address, slices, stepsize, size) + + def _get_all_element_addresses(self, entity, element): + ''' + Determine all addresses of an element + + Returns: + list -- containing all addresses of an element + ''' + base_address, slices, stepsize, size = self._get_element_addressing(entity, element) + return [base_address + i * stepsize for i in range(slices or 1)] + + def _contained_fields(self, entity, element): + ''' + name specifies the 'name' attribute of a + {,,,} + tag in the .xml file corresponding to entity. + Returns a list of all fields contained in the element. + ''' + if type(element) == str: + element = self._get_single_element_by_name_attr_prefer_field(entity, element) + if element.tag == 'field' and element.get('name'): + return [element.get('name')] + fields = element.findall(".//field[@name]") + return [field.get('name') for field in fields] + + def _determine_continuous_register_blocks(self, entity, element): + register_blocks = [] + if type(element) == str: + element = self._get_single_element_by_name_attr_prefer_field(entity, element) + base_address, slices, stepsize, size = self._get_element_addressing(entity, element) + continuous = element.get('continuous', 'false') == 'true' + #print("el:", element.get('name'), "address:", hex(base_address), "size:", size, "last (tent.):", hex(base_address+size-1) ,"continuous:", continuous, "slices:", slices or 1) + if continuous or element.tag in ('register', 'field'): + register_blocks += [(base_address + i*stepsize, size) for i in range(slices or 1)] + else: + for child in element: + if child.tag not in self.ENTITY_TAGS: + continue + register_blocks += self._determine_continuous_register_blocks(entity, child) + return register_blocks + + def _get_field_identifier(self, entity, field_name, trb_address, slice=None): + identifier = "{}-0x{:04x}-{}".format(entity, trb_address, field_name) + if slice is not None: + identifier += "." + str(slice) + return identifier + + def _get_field_hierarchy(self, entity, field): + # Try to fetch the field hierarchy from the cache and return it: + key = (entity, field) + if key in self._cache_field_hierarchy: + return self._cache_field_hierarchy[key] + # Otherwise, construct the hierarchy by walking up the tree from the + # to the top level XML entity and add it to the cache: + if type(field) == str: + field = self.find_field(entity, field) + stack = [] + node = field + while node.tag in self.ENTITY_TAGS: + stack.append(node.get('name')) + if node.tag == self.TOP_ENTITY: break + node = node.getparent() + hierarchy = list(reversed(stack)) + self._cache_field_hierarchy[key] = hierarchy + return hierarchy + + def _get_field_info(self, entity, field): + # Try to fetch the field info from the cache and return it: + key = (entity, field) + if key in self._cache_field_info: + return self._cache_field_info[key] + # Otherwise, construct the field info by reading in its XML information + if type(field) == str: + field = self.find_field(entity, field) + field_name = field.get('name') + info = { + 'addresses': self._get_all_element_addresses(entity, field), + 'start': int(field.get('start', 0)), + 'bits': int(field.get('bits', 32)), + 'format': field.get('format', 'unsigned'), + 'unit': field.get('unit', ''), + 'scale': float(field.get('scale', 1.0)), + 'scaleoffset': float(field.get('scaleoffset', 0.0)), + 'meta': {} + #'errorFlag': , + #'invertFlag': , + } + if info['format'] == 'enum': + results = field.findall("enumItem") + choices = {} + for result in results: + choices[int(result.get('value'))] = result.text + info['meta']['choices'] = choices + DynamicEnum = enum.Enum(field_name, {v: k for k, v in choices.items()}) + info['meta']['enum'] = DynamicEnum + self._cache_field_info[key] = info + return info + + def convert_field(self, entity, field_name, register_word, trb_address=0xffff, slice=None): + info = self._get_field_info(entity, field_name) + address = info['addresses'][slice if slice is not None else 0] + start = info['start'] + bits = info['bits'] + format = info['format'] + unit = info['unit'] + scale = info['scale'] + scaleoffset = info['scaleoffset'] + meta = info['meta'] + #errorFlag = + #invertFlag = + raw = (register_word >> start) & ((1 << bits) -1) + value = { + 'raw': raw, + 'string': raw, + 'python': None, + 'unicode': None, + } + if format == 'unsigned': + val = round(scale * raw + scaleoffset) + val = val if val >= 0 else 0 + value['python'] = val + value['string'] = str(val) + elif format == 'float': + val = float(raw) + val *= scale + val += scaleoffset + value['python'] = val + value['string'] = '%.2f' % val + elif format == 'time': + val = dt.utcfromtimestamp(raw) + value['python'] = val + value['string'] = val.strftime('%Y-%m-%d %H:%M') + elif format == 'hex': + fmt = '0x{:0%dx}' % ((bits+3)/4) + value['python'] = raw + value['string'] = fmt.format(raw) + elif format in ('integer', 'signed'): + val = round(scale * raw + scaleoffset) + value['python'] = val + value['string'] = str(val) + elif format == 'boolean': + val = bool(raw) + value['python'] = val + value['string'] = str(val).lower() + elif format == 'enum': + DynamicEnum = meta['enum'] + if raw in meta['choices']: + value['python'] = DynamicEnum(raw) + value['string'] = meta['choices'][raw] + else: + value['python'] = raw + value['string'] = str(raw) + elif format == 'bitmask': + fmt = '{:0%db}' % (bits) + val = fmt.format(raw) + value['python'] = raw + value['string'] = val + value['unicode'] = val.replace('0', '□').replace('1', '■') + elif format == 'binary': + fmt = '0b{:0%db}' % (bits) + value['python'] = raw + value['string'] = fmt.format(raw) + else: + raise NotImplementedError('format: ' + format) + if value['unicode'] is None: + value['unicode'] = value['string'] + if unit: + value['string'] += ' ' + unit + value['unicode'] += ' ' + unit + identifier = self._get_field_identifier(entity, field_name, trb_address, slice=slice) + hierarchy = self._get_field_hierarchy(entity, field_name) + context = { + 'address': address, + 'identifier': identifier, + 'hierarchy': hierarchy, + 'trb_address': trb_address, + 'field_name': field_name, + } + return { + 'value': value, + 'unit': unit, + 'meta': meta, + 'format': format, + 'context': context + }