--- /dev/null
+# PyTrbNet
+
+This is a Python package wrapping libtrbnet.so.
+It allows accessing trbnet registers from Python.
+
+The package comes with two additional features:
+
+* XmlDb support. Allows to query registers by their
+ name as specified in the corresponding xml file.
+* PCASpy-based EPICS IOC providing TrbNet register
+ status information to the EPICS detector control
+ system.
+
+### Resources
+
+* [The TRB Website](http://trb.gsi.de)
+* [TrbNet Manual](http://jspc29.x-matter.uni-frankfurt.de/docu/trbnetdocu.pdf)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+try:
+ from setuptools import setup
+except ImportError:
+ from distutils.core import setup
+
+try:
+ import pypandoc
+ LDESC = open('README.md', 'r').read()
+ LDESC = pypandoc.convert_text(LDESC, 'rst', format='md')
+except (ImportError, IOError, RuntimeError) as e:
+ print("Could not create long description:")
+ print(str(e))
+ LDESC = ''
+
+setup(name='trbnet',
+ version = '1.0.2',
+ description = 'Interface to TrbNet (wrapping libtrbnet.so with ctypes)',
+ long_description = LDESC,
+ author = 'Philipp Klaus',
+ author_email = 'klaus@physik.uni-frankfurt.de',
+ url = 'https://github.com/pklaus/pytrbnet',
+ license = 'GPL',
+ packages = [
+ 'trbnet',
+ 'trbnet.core',
+ 'trbnet.xmldb',
+ 'trbnet.util',
+ 'trbnet.epics',
+ ],
+ entry_points = {
+ 'console_scripts': [
+ 'trbcmd.py = trbnet.util.trbcmd:cli',
+ ],
+ },
+ include_package_data = False,
+ zip_safe = True,
+ platforms = 'Linux',
+ install_requires = [
+ "lxml",
+ "click",
+ "enum34", # for support of enum.IntEnum on Python < 3.4
+ ],
+ keywords = 'TrbNet PyTrbNet FPGA Low-latency network wrapper',
+ classifiers = [
+ 'Development Status :: 4 - Beta',
+ 'Operating System :: OS Independent',
+ 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 3',
+ 'Topic :: Scientific/Engineering :: Physics',
+ 'Topic :: Scientific/Engineering :: Interface Engine/Protocol Translator',
+ 'Topic :: System :: Hardware :: Hardware Drivers',
+ ]
+)
--- /dev/null
+from .core.lowlevel import _TrbNet
+from .core.highlevel import TrbNet
+from .core.error import TrbException, TrbError
--- /dev/null
+from .highlevel import TrbNet
+from .lowlevel import _TrbNet
+from .error import TrbException, TrbError
--- /dev/null
+import enum
+
+class TrbException(Exception):
+ def __init__(self, msg, errno, errorstr):
+ # Set some exception infomation
+ self.msg = msg
+ try:
+ self.errno = TrbError(errno)
+ except ValueError:
+ self.errno = errno
+ self.errorstr = errorstr
+
+@enum.unique
+class TrbError(enum.IntEnum):
+ """
+ Copied from trbnettools/libtrbnet/trberror.h
+ as the names of the enum values are not accessible
+ from within the library...
+ """
+ TRB_NONE = 0
+ TRB_TX_BUSY = 1
+ TRB_FIFO_NOT_EMPTY = 2
+ TRB_FIFO_TIMEOUT = 3
+ TRB_FIFO_HEADERS = 4
+ TRB_FIFO_SEQUENZ = 5
+ TRB_FIFO_INVALID_MODE = 6
+ TRB_FIFO_INCOMPLETE_PACKAGE = 7
+ TRB_FIFO_INVALID_HEADER = 8
+ TRB_FIFO_MISSING_TERM_HEADER = 9
+ TRB_FAILED_WAIT_IS_VALID = 10
+ TRB_FAILED_WAIT_IS_NOT_VALID = 11
+ TRB_USER_BUFFER_OVF = 12
+ TRB_INVALID_CHANNEL = 13
+ TRB_INVALID_PKG_NUMBER = 14
+ TRB_STATUS_ERROR = 15
+ TRB_INVALID_ADDRESS = 16
+ TRB_INVALID_LENGTH = 17
+ TRB_ENDPOINT_NOT_REACHED = 18
+ TRB_DMA_UNAVAILABLE = 19
+ TRB_DMA_TIMEOUT = 20
+ TRB_READMEM_INVALID_SIZE = 21
+ TRB_HDR_DLEN = 22
+ TRB_PEXOR_OPEN = 23
+ TRB_SEMAPHORE = 24
+ TRB_FIFO_SHARED_MEM = 25
+ TRB_STATUS_WARNING = 26
+ TRB_RPC_ERROR = 27
+ TRB_PEXOR_DATA_ERROR = 28
+ TRB_PEXOR_DEVICE_ERROR = 29
+ TRB_PEXOR_DEVICE_TRB_TIMEOUT = 30
+ TRB_PEXOR_DEVICE_POLLING_TIMEOUT = 31
+ TRB_PEXOR_DEVICE_DMA_EMPTY = 32
+ TRB_PEXOR_DEVICE_INVALID_DMA_SIZE = 33
+ TRB_PEXOR_DEVICE_LOST_CREDENTIAL = 34
+ TRB_PEXOR_DEVICE_FIFO_TRANSFER = 35
+ TRB_TRB3_CMD_NOT_SUPPORTED = 36
+ TRB_TRB3_SOCKET_ERROR = 37
+ TRB_TRB3_SOCKET_TIMEOUT = 38
+ TRB_TRB3_INCOMPLETE_UDP = 39
+ TRB_TRB3_DATA_ERROR = 40
+ TRB_TRB3_INVALID_UDP_HEADER = 41
--- /dev/null
+# -*- coding: utf-8 -*-
+from .lowlevel import _TrbNet
+
+
+class TrbNet(_TrbNet):
+ '''
+ High level wrapper providing utility functions for the TrbNet class
+ '''
+
+ def register_read(self, trb_address, reg_address):
+ lin_data = super().trb_register_read(trb_address, reg_address)
+ if (len(lin_data) % 2) != 0:
+ raise ValueError("len(lin_data) == %d - expected a multiple of %d" % (len(lin_data), 2))
+ result = self._get_dynamic_trb_address_dict(lin_data, force_length=1)
+ return {key: value[0] for key, value in result.items()}
+
+ def register_read_mem(self, trb_address, reg_address, option, size):
+ lin_data = super().trb_register_read_mem(trb_address, reg_address, option, size)
+ return self._get_dynamic_trb_address_dict(lin_data)
+
+ def read_uid(self, trb_address):
+ '''
+ Read unique id of TrbNet nodes
+
+ Arguments:
+ trb_address -- node(s) to be queried
+
+ Returns:
+ dict -- the keys being (uid, endpoint) and the associated value the currently assigned trb address
+ '''
+ lin_data = super().trb_read_uid(trb_address)
+ if (len(lin_data) % 4) != 0:
+ raise ValueError("len(lin_data) == %d - expected a multiple of %d" % (len(lin_data), 4))
+ responses = [lin_data[pos:pos+4] for pos in range(0, len(lin_data), 4)]
+ uid_dict = {((r[0] << 32) + r[1], r[2]): r[3] for r in responses}
+ return uid_dict
+
+ def _get_dynamic_trb_address_dict(self, lin_data, force_length=0):
+ """
+ A utility function to structure response data from the
+ trb_register_read() and trb_register_read_mem() functions.
+ As multiple TrbNet nodes can reply to a single request, it
+ splits the linar response into the data from each eandpoint.
+ Returns a dictionary with the trb_address as keys and the
+ respective data as values.
+
+ Returns:
+ dict -- key: trb_address, value: list(int) (32-bit words)
+ """
+ trb_address_responses = {}
+ offset = 0
+ while len(lin_data) > offset:
+ header = lin_data[offset]
+ offset += 1
+ length, trb_address = (header >> 16), (header & 0xffff)
+ if force_length: length = force_length
+ trb_address_responses[trb_address] = lin_data[offset:offset+length]
+ offset += length
+ return trb_address_responses
--- /dev/null
+# -*- coding: utf-8 -*-
+
+def _find_lib(inp_lib_name):
+ """
+ Find location of a dynamic library
+ Idea found in <https://github.com/pyepics/pyepics/blob/master/epics/ca.py#L117>
+ """
+ # Test 1: if LIBTRBNET env var is set, use it.
+ import os
+ dllpath = os.environ.get('LIBTRBNET', None)
+ if dllpath is not None:
+ dllpath = os.path.expanduser(dllpath)
+ if os.path.exists(dllpath) and os.path.isfile(dllpath):
+ return dllpath
+
+ # Test 2: look in installed python location for dll (not used right now)
+ #dllpath = resource_filename('trbnet.clibs', clib_search_path(inp_lib_name))
+ #if (os.path.exists(dllpath) and os.path.isfile(dllpath)):
+ # return dllpath
+
+ # Test 3: look for library in LD_LIBRARY_PATH with ctypes.util
+ import ctypes.util
+ dllpath = ctypes.util._findLib_ld(inp_lib_name) if hasattr(ctypes.util, '_findLib_ld') else None
+ if dllpath is not None:
+ return dllpath
+
+ raise NameError('cannot find lib'+inp_lib_name)
import ctypes
import os
+from .error import TrbException
+
# TODO: use warnings to indicate access to wrong register or no data
-# dictionary with trb error strings. Copied from trberror.h
-# TODO: better way to access this directly in libtrbnet
-trb_errno_dict = {0: 'TRB_NONE', 1: 'TRB_TX_BUSY', 2: 'TRB_FIFO_NOT_EMPTY',
- 3: 'TRB_FIFO_TIMEOUT', 4: 'TRB_FIFO_HEADERS', 5: 'TRB_FIFO_SEQUENZ',
- 6: 'TRB_FIFO_INVALID_MODE', 7: 'TRB_FIFO_INCOMPLETE_PACKAGE',
- 8: 'TRB_FIFO_INVALID_HEADER', 9: 'TRB_FIFO_MISSING_TERM_HEADER',
- 10: 'TRB_FAILED_WAIT_IS_VALID', 11: 'TRB_FAILED_WAIT_IS_NOT_VALID',
- 12: 'TRB_USER_BUFFER_OVF', 13: 'TRB_INVALID_CHANNEL',
- 14: 'TRB_INVALID_PKG_NUMBER', 15: 'TRB_STATUS_ERROR',
- 16: 'TRB_INVALID_ADDRESS', 17: 'TRB_INVALID_LENGTH',
- 18: 'TRB_ENDPOINT_NOT_REACHED', 19: 'TRB_DMA_UNAVAILABLE',
- 20: 'TRB_DMA_TIMEOUT', 21: 'TRB_READMEM_INVALID_SIZE',
- 22: 'TRB_HDR_DLEN', 23: 'TRB_PEXOR_OPEN', 24: 'TRB_SEMAPHORE',
- 25: 'TRB_FIFO_SHARED_MEM', 26: 'TRB_STATUS_WARNING',
- 27: 'TRB_RPC_ERROR', 28: 'TRB_PEXOR_DATA_ERROR',
- 29: 'TRB_PEXOR_DEVICE_ERROR', 30: 'TRB_PEXOR_DEVICE_TRB_TIMEOUT',
- 31: 'TRB_PEXOR_DEVICE_POLLING_TIMEOUT', 32: 'TRB_PEXOR_DEVICE_DMA_EMPTY',
- 33: 'TRB_PEXOR_DEVICE_INVALID_DMA_SIZE', 34: 'TRB_PEXOR_DEVICE_LOST_CREDENTIAL',
- 35: 'TRB_PEXOR_DEVICE_FIFO_TRANSFER', 36: 'TRB_TRB3_CMD_NOT_SUPPORTED',
- 37: 'TRB_TRB3_SOCKET_ERROR', 38: 'TRB_TRB3_SOCKET_TIMEOUT',
- 39: 'TRB_TRB3_INCOMPLETE_UDP', 40: 'TRB_TRB3_DATA_ERROR',
- 41: 'TRB_TRB3_INVALID_UDP_HEADER'}
-
-
-class PyTRB(object):
+
+class TrbTerm(ctypes.Structure):
+ """
+ The TRB_TERM C Struct representing the information
+ carried by network termination packets.
+ """
+ _fields_ = [ ("status_common", ctypes.c_uint16),
+ ("status_channel", ctypes.c_uint16),
+ ("sequence", ctypes.c_uint16),
+ ("channel", ctypes.c_uint8) ]
+
+class _TrbNet(object):
'''
Wrapper class for trbnet access using python
'''
- def __init__(self, trb_server, daq_server, path_to_lib='libtrbnet.so'):
+ def __init__(self, libtrbnet=None, daqopserver=None, trb3_server=None, buffersize=4194304):
'''
- Default Constructor. Sets enviromental variable and initialises ports.
- The trbnet daemon has to be running.
+ Constructor for the low level TrbNet class.
+ Loads the shared library (libtrbnet), sets enviromental variables and initialises ports.
+
+ Depending on the version of libtrbnet.so used, the connection to TrbNet is established
+ either by directly connecting to a TRB board (trbnettools/libtrbnet/libtrbnet.so) or
+ by connecting to a trbnet daemon instance (if trbnettools/trbnetd/libtrbnet.so is used).
+ Selecting the library can happen by:
+ - Specifying the full path to the library via the libtrbnet keyword argument.
+ - Specifying the full path to the library via the environment variable LIBTRBNET.
+ - Adding the directory, libtrbnet.so resides in to the environment variable LD_LIBRARY_PATH.
+
+ To specify the peer to connect to, the environment variables DAQOPSERVER, TRB3_SERVER
+ are read. For easier scripting, those environment variables can also be set inside this
+ constructor if the keywords arguments daqopserver or trb3_server are specified.
Keyword arguments:
- trbserver -- string for TRB3_SERVER enviromental variable
- daq_server -- string for DAQOPSERVER enviromental variable
- path_to_lib -- full path to libtrbnet.so
+ libtrbnet -- full path to libtrbnet.so
+ daqopserver -- optional override of the DAQOPSERVER enviromental variable
+ trb3_server -- optional override of the TRB3_SERVER enviromental variable
+ buffersize -- Size of the buffer in 32-bit words when reading back data (default: 16MiB)
'''
- self.trblib = ctypes.cdll.LoadLibrary(path_to_lib)
+ if trb3_server: os.environ['TRB3_SERVER'] = trb3_server
+ if daqopserver: os.environ['DAQOPSERVER'] = daqopserver
+ self.buffersize = buffersize
+ if not libtrbnet:
+ from .libutils import _find_lib
+ libtrbnet =_find_lib('trbnet')
+ self.libtrbnet = libtrbnet
+ self.trblib = ctypes.cdll.LoadLibrary(libtrbnet)
self.declare_types()
- os.environ['TRB3_SERVER'] = trb_server
- os.environ['DAQOPSERVER'] = daq_server
status = self.trblib.init_ports()
if status < 0:
- raise Exception('Error initialising ports.')
+ errno = self.trb_errno()
+ raise TrbException('Error initialising ports.', errno, self.trb_errorstr(errno))
def __del__(self):
'''
Destructor.
'''
- self.trblib.close_ports()
+ try:
+ self.trblib.close_ports()
+ except AttributeError:
+ pass
def trb_errno(self):
'''
'''
return ctypes.c_int.in_dll(self.trblib, 'trb_errno').value
+ def trb_term(self):
+ term = TrbTerm.in_dll(self.trblib, 'trb_term')
+ return (term.status_common, term.status_channel, term.sequence, term.channel)
+
def declare_types(self):
'''
Declare argument and return types of libtrbnet calls via ctypes
self.trblib.trb_nettrace.argtypes = [ctypes.c_uint16, ctypes.POINTER(ctypes.c_uint32),
ctypes.c_uint]
self.trblib.trb_nettrace.restypes = ctypes.c_int
+ self.trblib.trb_errorstr.argtypes = [ctypes.c_int]
+ self.trblib.trb_errorstr.restype = ctypes.c_char_p
+ self.trblib.trb_termstr.argtypes = [TrbTerm]
+ self.trblib.trb_termstr.restype = ctypes.c_char_p
+ def trb_errorstr(self, errno):
+ '''
+ Get error string for an integer error number.
+
+ Arguments:
+ errno -- error number
+
+ Returns:
+ python str with description of the error
+ '''
+ errno = ctypes.c_int(errno)
+ _result = self.trblib.trb_errorstr(errno)
+ return _result.decode('ascii')
+
def trb_register_read(self, trb_address, reg_address):
'''
Read value from trb register.
- Keyword arguments:
- trb_address -- trb endpoint address
+ Arguments:
+ trb_address -- node(s) to read from
reg_address -- register address
Returns:
python list [0] TRB-Address of the sender, [1] register value
'''
- data_array = (ctypes.c_uint32 * 2)()
+ data_array = (ctypes.c_uint32 * self.buffersize)()
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
- status = self.trblib.trb_register_read(trb_address, reg_address, data_array, 2)
+ status = self.trblib.trb_register_read(trb_address, reg_address, data_array, self.buffersize)
if status == -1:
- raise Exception('Error while reading trb register, ' + trb_errno_dict[self.trb_errno()])
- print(self.trb_errno())
+ errno = self.trb_errno()
+ raise TrbException('Error while reading trb register.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
def trb_register_write(self, trb_address, reg_address, value):
'''
Write trb register
- Keyword Arguments:
- trb_address -- trb endpoint address
+ Arguments:
+ trb_address -- node(s) to write to
reg_address -- register address
value -- value to write to register
'''
value = ctypes.c_uint(value)
status = self.trblib.trb_register_write(trb_address, reg_address, value)
if status == -1:
- raise Exception('Error while writing trb register, ' + trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error while writing trb register.', errno, self.trb_errorstr(errno))
def trb_register_read_mem(self, trb_address, reg_address, option, size):
'''
Perform several trb register reads
- Keyword Arguments:
- trb_address -- trb endpoint address
+ Arguments:
+ trb_address -- node(s) to read from
reg_address -- register address
option -- read option, 0 = read same register several times 1 = read adjacent registers
size -- number of reads
Returns:
python list [0] TRB-Address of the sender, [1:] register values
'''
- data_array = (ctypes.c_uint32 * (size + 2))()
+
+ data_array = (ctypes.c_uint32 * self.buffersize)()
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
option = ctypes.c_uint8(option)
- status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2))
+ status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(self.buffersize))
if status == -1:
- raise Exception('Error while reading trb register memory, ' +
- trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error while reading trb register memory.',
+ errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
def trb_read_uid(self, trb_address):
'''
- Read unique id of endpoint
+ Read unique id(s) of TrbNet node(s)
- Keyword Arguments:
- trb_address -- address of endpoint
+ Arguments:
+ trb_address -- node(s) to be queried
Returns:
- python list
- [0]: UID High-32Bit Word
- [1]: UID Low-32Bit Word
- [2]: Endpoint Number
- [3]: TRB-Address of the sender
+ python list, length is a multiple of 4
+ [i+0]: UID High-32Bit Word
+ [i+1]: UID Low-32Bit Word
+ [i+2]: Endpoint Number
+ [i+3]: TRB-Address of the sender
'''
- data_array = (ctypes.c_uint32 * 4)()
+ data_array = (ctypes.c_uint32 * self.buffersize)()
trb_address = ctypes.c_uint16(trb_address)
- status = self.trblib.trb_read_uid(trb_address, data_array, 4)
+ status = self.trblib.trb_read_uid(trb_address, data_array, self.buffersize)
if status == -1:
- raise Exception('Error reading trb uid, ' + trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error reading trb uid.',
+ errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
def trb_set_address(self, uid, endpoint, trb_address):
'''
Set trb net address
- Keyword Arguments:
- uid -- the unique ID of the endpoint
+ Arguments:
+ uid -- the unique ID of the node
endpoint -- number of the trb endpoint
trb_address -- new trb address
'''
trb_address = ctypes.c_uint16(trb_address)
status = self.trblib.trb_set_address(uid, endpoint, trb_address)
if status == -1:
- raise Exception('error setting trb address, ' + trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error setting trb address.',
+ errno, self.trb_errorstr(errno))
# rarely used funtions without documentation in trbnet.h
# meaning of arguments and returned data unknown
def trb_fifo_flush(self, channel):
'''flush trb fifo
- Keyword arguments:
+ Arguments:
channel: trb channel (ipu, slowcontrol etc)'''
channel = ctypes.c_uint8(channel)
return self.trblib.trb_fifo_flush(channel)
def trb_send_trigger(self, trigtype, info, random, number):
'''send trigger to trb
- Keyword arguments:
+ Arguments:
trigtype: trigger type (status, calibration)
info: trigger information
random: random trigger number
bitmask, bitvalue)
def trb_registertime_read_mem(self, trb_address, reg_address, option, size):
- data_array = (ctypes.c_uint32 * (size + 2))()
+ data_array = (ctypes.c_uint32 * self.buffersize)()
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
option = ctypes.c_uint8(option)
- status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(size + 2))
+ status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(self.buffersize))
if status == -1:
- raise Exception('Error while reading trb register memory, ' +
- trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error while reading trb register memory.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
def trb_ipu_data_read(self, trg_type, trg_info, trg_random, trg_number, size):
trg_info = ctypes.c_uint8(trg_info)
trg_random = ctypes.c_uint8(trg_random)
trg_number = ctypes.c_uint16(trg_number)
- data_array = (ctypes.c_uint32 * (size + 2))()
- status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(size + 2))
+ data_array = (ctypes.c_uint32 * self.buffersize)()
+ status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(self.buffersize))
if status == -1:
- raise Exception('Error while reading trb ipu data, ' +
- trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error while reading trb ipu data.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
def trb_nettrace(self, trb_address, size):
trb_address = ctypes.c_uint16(trb_address)
- data_array = (ctypes.c_uint32 * (size + 2))()
- status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(size + 2))
+ data_array = (ctypes.c_uint32 * self.buffersize)()
+ status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(self.buffersize))
if status == -1:
- raise Exception('Error while doing net trace, ' +
- trb_errno_dict[self.trb_errno()])
+ errno = self.trb_errno()
+ raise TrbException('Error while doing net trace.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
+
+ def trb_termstr(self, term):
+ '''
+ Get string representation for network termination packet info tuple.
+
+ Arguments:
+ term -- network termination packet info
+ (TrbTerm instance or tuple of four integer values)
+
+ Returns:
+ python str with description of the error
+ '''
+ if isinstance(term, tuple): term = TrbTerm(*term)
+ _result = self.trblib.trb_termstr(term)
+ return _result.decode('ascii')
--- /dev/null
+from .pcaspy_ioc import TrbNetIOC
--- /dev/null
+import logging
+
+class SeenBeforeFilter(logging.Filter):
+ """
+ A filter for the Python logging module rejecting subsequent
+ log entries when they have been logged before.
+ The optional keyword argument restriction_func can restrict
+ the filtering to a subset of messages.
+ If present, it will be called with the tuple (module, levelno, msg, args).
+ If it evaluates to true, the filtering is done, otherwise not.
+ """
+ def __init__(self, restriction_func=None):
+ self.restriction_func = restriction_func
+ self.seen_before = []
+ super().__init__()
+ def filter(self, record):
+ # add other fields if you need more granular comparison, depends on your app
+ current_log = (record.module, record.levelno, record.msg, record.args)
+ if self.restriction_func:
+ if not self.restriction_func(current_log):
+ # we do not restrict the filtering to this log entry,
+ # so just log it as usual...
+ return True
+ if current_log not in self.seen_before:
+ self.seen_before.append(current_log)
+ return True
+ return False
+
--- /dev/null
+#!/usr/bin/env python
+
+import time, threading, logging
+
+from trbnet.core import TrbNet, TrbException
+from trbnet.xmldb import XmlDb
+from trbnet.util.trbcmd import _xmlget as xmlget, _xmlentry as xmlentry
+
+from pcaspy import Driver, SimpleServer, Alarm, Severity
+from pcaspy.driver import manager
+
+from .helpers import SeenBeforeFilter
+
+t = TrbNet()
+db = XmlDb()
+
+logger = logging.getLogger('trbnet.epics.pcaspy_ioc')
+logging.basicConfig(
+ format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
+restr_func = lambda log: log[2].startswith('register missing')
+logger.addFilter(SeenBeforeFilter(restriction_func=restr_func))
+
+
+class TrbNetIOC(object):
+
+ def __init__(self):
+ self.prefix = ''
+ self._initialized = False
+ self._subscriptions = []
+ self._pvdb = {}
+ self._pvdb_manager = None
+ self._expected_trb_addresses = {}
+
+ def before_initialization(func):
+ def func_wrapper(self, *args, **kwargs):
+ if self._initialized:
+ raise NameError('Cannot use the method .%s() after running .initialize().' % func.__name__)
+ return func(self, *args, **kwargs)
+ return func_wrapper
+
+ @before_initialization
+ def add_subscription(self, trb_address, entity, name):
+ self._subscriptions.append((trb_address, entity, name))
+
+ @before_initialization
+ def add_expected_trb_addresses(self, send_to_trb_address, answer_from_trb_addresses):
+ self._expected_trb_addresses[send_to_trb_address] = answer_from_trb_addresses
+
+ def initialize(self):
+ self._pvdb_manager = PvdbManager(self._pvdb, self._expected_trb_addresses)
+ self._pvdb_manager.initialize(self._subscriptions)
+ self._initialized = True
+
+ @property
+ def all_pvs(self):
+ if not self._initialized:
+ raise NameError("Please run .initialize() first")
+ return [self.prefix + pv for pv in self._pvdb.keys()]
+
+ def run(self):
+ if not self._initialized:
+ self.initialize()
+
+ server = SimpleServer()
+ server.createPV(self.prefix, self._pvdb)
+ driver = TrbNetIocDriver(self._subscriptions, self._pvdb_manager)
+
+ while True:
+ # process CA transactions
+ server.process(0.1)
+
+
+class PvdbManager(object):
+
+ def __init__(self, pvdb, expected_trb_addresses):
+ self._pvdb = pvdb
+ self._expected_trb_addresses = expected_trb_addresses
+
+ def _add(self, identifier, definition):
+ self._pvdb[identifier] = {
+ 'type': TYPE_MAPPING[definition['format']][0],
+ 'unit': definition['unit'],
+ }
+ if definition['format'] == 'enum':
+ choices = definition['meta']['choices']
+ vals = list(choices.keys())
+ min_val, max_val = min(vals), max(vals)
+ enums = []
+ for i in range(max_val+1):
+ enums.append(choices[i] if i in choices else 'n/a')
+ self._pvdb[identifier]['enums'] = enums
+ if definition['format'] == 'boolean' and TYPE_MAPPING[definition['format']][0] == 'enum':
+ self._pvdb[identifier]['enums'] = ['false', 'true']
+
+ def initialize(self, subscriptions):
+ for trb_address, entity, name in subscriptions:
+ if trb_address in self._expected_trb_addresses:
+ answer_from_trb_addresses = self._expected_trb_addresses[trb_address]
+ for info in xmlentry(entity, name):
+ slices = len(info['reg_addresses'])
+ for slice in range(slices):
+ slice = slice if slices > 1 else None
+ for answer_from_trb_address in answer_from_trb_addresses:
+ identifier = db._get_field_identifier(entity, info['field_name'], answer_from_trb_address, slice=slice)
+ definition = db._get_field_info(entity, info['field_name'])
+ self._add(identifier, definition)
+ else:
+ for data in xmlget(trb_address, entity, name, logger=logger):
+ self._add(data['context']['identifier'], data)
+
+class TrbNetIocDriver(Driver):
+
+ def __init__(self, subscriptions, scan_period=1.0):
+ Driver.__init__(self)
+ self.scan_period = 1.0
+ self.subscriptions = subscriptions
+ self.start()
+
+ def start(self):
+ if self.scan_period > 0:
+ self.tid = threading.Thread(target=self.scan_all)
+ self.tid.setDaemon(True)
+ self.tid.start()
+
+ def scan_all(self):
+ last_time = time.time()
+ while True:
+ for subscription in self.subscriptions:
+ trb_address, entity, element = subscription
+ for data in xmlget(trb_address, entity, element, logger=logger):
+ reason = data['context']['identifier']
+ try:
+ self.pvDB[reason].mask = 0
+ self.setParamStatus(reason, Alarm.NO_ALARM, Severity.NO_ALARM)
+ self.setParam(reason, data['value'][TYPE_MAPPING[data['format']][1]])
+ manager.pvs[self.port][reason].updateValue(self.pvDB[reason])
+ except Exception as e:
+ logger.error(str(e))
+
+ # if the process was suspended, reset last_time:
+ if time.time() - last_time > self.scan_period:
+ last_time = time.time()
+
+ time.sleep(max(0.0, self.scan_period - (time.time() - last_time)))
+ last_time += self.scan_period
+
+TYPE_MAPPING = {
+ # pcaspy types: 'enum', 'string', 'char', 'float' or 'int'
+ 'unsigned': ('int', 'python'),
+ 'integer': ('int', 'python'),
+ 'signed': ('int', 'python'),
+ 'hex': ('int', 'python'),
+ 'boolean': ('enum', 'raw'),
+ 'bitmask': ('int', 'raw'),
+ 'enum': ('enum', 'raw'),
+ 'float': ('float', 'python'),
+ #'time': ('char', 'string'),
+ 'time': ('int', 'raw'),
+ #'binary': ('char', 'string'),
+ 'binary': ('int', 'raw'),
+}
--- /dev/null
+#!/usr/bin/env python
+
+import click, time, logging
+from trbnet import TrbNet, TrbException, TrbError
+from trbnet.xmldb import XmlDb
+
+t = TrbNet()
+logger = logging.getLogger('trbnet.util.trbcmd')
+
+### Helpers
+
+def _status_warning():
+ if t.trb_errno() == TrbError.TRB_STATUS_WARNING:
+ return "Status-Bit(s) have been set:\n" + t.trb_termstr(t.trb_term())
+ else:
+ return None
+
+### Definition of a Python API to the functions later exposed by the CLI
+
+def _r(trb_address, register):
+ response = t.register_read(trb_address, register)
+ for endpoint in response:
+ str_data = '{:08X}'.format(response[endpoint])
+ print("endpoint 0x{:08X} responded with: {}".format(endpoint, str_data))
+
+def _rm(trb_address, register, size, mode):
+ response = t.register_read_mem(trb_address, register, mode, size)
+ for endpoint in response:
+ str_data = ' '.join('{:08X}'.format(word) for word in response[endpoint])
+ print("endpoint 0x{:08X} responded with: {}".format(endpoint, str_data))
+ status_warning = _status_warning()
+ if status_warning: logger.warning(status_warning)
+
+def _xmlentry(entity, name):
+ db = XmlDb()
+ reg_addresses = db._get_all_element_addresses(entity, name)
+ for field_name in db._contained_fields(entity, name):
+ reg_addresses = db._get_all_element_addresses(entity, field_name)
+ yield {'entity': entity, 'field_name': field_name, 'reg_addresses': reg_addresses}
+
+def _xmlget(trb_address, entity, name, logger=logger):
+ db = XmlDb()
+ register_blocks = db._determine_continuous_register_blocks(entity, name)
+ all_data = {} # dictionary with {'reg_address': {'trb_address': int, ...}, ...}
+ for start, size in register_blocks:
+ if size > 1:
+ try:
+ response = t.register_read_mem(trb_address, start, 0, size)
+ except TrbException as e:
+ if logger: logger.error("TRB Error happened: %s -- Continuing anyways.", repr(e))
+ continue
+ except Exception as e:
+ if logger: logger.error("Other error happened: %s -- Continuing anyways.", repr(e))
+ continue
+ for response_trb_address, data in response.items():
+ if not data:
+ continue
+ for reg_address, word in enumerate(data, start=start):
+ if reg_address not in all_data:
+ all_data[reg_address] = {}
+ all_data[reg_address][response_trb_address] = word
+ else:
+ reg_address = start
+ try:
+ response = t.register_read(trb_address, reg_address)
+ except TrbException as e:
+ if logger: logger.error("TRB Error happened: %s -- Continuing anyways.", repr(e))
+ continue
+ except Exception as e:
+ if logger: logger.error("Other error happened: %s -- Continuing anyways.", repr(e))
+ continue
+ for response_trb_address, word in response.items():
+ if reg_address not in all_data:
+ all_data[reg_address] = {}
+ all_data[reg_address][response_trb_address] = word
+ for field_name in db._contained_fields(entity, name):
+ reg_addresses = db._get_all_element_addresses(entity, field_name)
+ slices = len(reg_addresses)
+ for slice, reg_address in enumerate(reg_addresses):
+ if reg_address not in all_data:
+ fmt = "register missing in response: %s (addr 0x%04x)"
+ if logger: logger.warning(fmt, field_name, reg_address)
+ continue
+ for response_trb_address, value in all_data[reg_address].items():
+ data = db.convert_field(entity, field_name, value, trb_address=response_trb_address, slice=slice if slices > 1 else None)
+ yield data
+
+### Definition of the CLI with the help of the click package:
+
+class BasedIntParamType(click.ParamType):
+ name = 'integer'
+ def convert(self, value, param, ctx):
+ try:
+ if value[:2].lower() == '0x':
+ return int(value[2:], 16)
+ elif value[:1] == '0':
+ return int(value, 8)
+ return int(value, 10)
+ except ValueError:
+ self.fail('%s is not a valid integer' % value, param, ctx)
+
+BASED_INT = BasedIntParamType()
+
+@click.group()
+def cli():
+ pass
+
+@cli.command()
+@click.argument('trb_address', type=BASED_INT)
+@click.argument('register', type=BASED_INT)
+def r(trb_address, register):
+ click.echo('Reading register')
+ _r(trb_address, register)
+
+@cli.command()
+@click.argument('trb_address', type=BASED_INT)
+@click.argument('register', type=BASED_INT)
+@click.argument('size', type=BASED_INT)
+@click.argument('mode', type=BASED_INT)
+def rm(trb_address, register, size, mode):
+ click.echo('Reading register memory')
+ _rm(trb_address, register, size, mode)
+
+@cli.command()
+@click.argument('entity')
+@click.argument('name')
+def xmlentry(entity, name):
+ click.echo('Searching xml register entry')
+ for info in _xmlentry(entity, name):
+ slices = len(info['reg_addresses'])
+ info['reg_addresses'] = ', '.join('0x{:04x}'.format(addr) for addr in info['reg_addresses'])
+ info['slices'] = ' (%d slices)' % slices if slices > 1 else ''
+ print("ENTITY: {entity:10s} FIELD: {field_name:20s} REGISTER(s): {reg_addresses} {slices}".format(**info))
+
+@cli.command()
+@click.argument('trb_address', type=BASED_INT)
+@click.argument('entity')
+@click.argument('name')
+def xmlget(trb_address, entity, name):
+ click.echo('Querying xml register entry from TrbNet')
+ for data in _xmlget(trb_address, entity, name):
+ print("{context[identifier]} {value[unicode]} {unit}".format(**data))
+
+if __name__ == '__main__':
+ cli()
--- /dev/null
+from .db import XmlDb
--- /dev/null
+import os
+import enum
+from datetime import datetime as dt
+from lxml import etree
+
+class XmlDb(object):
+ '''
+ XmlDb is an object representing the XML database used to describe
+ registers of TrbNet systems. In this context, "XML database" means:
+ a set of XML files in a single folder obeying a specific schema.
+
+ The database folder can be provided by the environment variable
+ 'XMLDB' or by specifying the folder as keyword argument when
+ instantiating the class:
+
+ >>> db = XmlDb(folder='./path/to/daqtools/xml-db/database/')
+ '''
+
+ TOP_ENTITY = 'TrbNetEntity'
+ ENTITY_TAGS = ('field', 'register', 'group', 'TrbNetEntity')
+
+ def __init__(self, folder=None):
+ if folder is None:
+ folder = os.environ.get('XMLDB', '.')
+ folder = os.path.expanduser(folder)
+ self.folder = folder
+ self._cache_xml_docs = {}
+ self._cache_elements = {}
+ self._cache_field_hierarchy = {}
+ self._cache_field_info = {}
+
+ def _get_xml_doc(self, entity):
+ # Try to fetch xmldoc from cache and return it:
+ if entity in self._cache_xml_docs:
+ return self._cache_xml_docs[entity]
+ # Otherwise parse the .xml file and add it to the cache:
+ xml_path = os.path.join(self.folder, entity + '.xml')
+ xml_doc = etree.parse(xml_path)
+ ## check schema?
+ #xmlschema_doc = etree.parse(xsd_path)
+ #xmlschema = etree.XMLSchema(xmlschema_doc)
+ #result = xmlschema.validate(xml_doc)
+ self._cache_xml_docs[entity] = xml_doc
+ return xml_doc
+
+ def _get_elements_by_name_attr(self, entity, name_attr, tag='*', amount=None):
+ '''
+ Finds and returns elements from the entity XML tree with the attribute
+ 'name' having the value of this method's argument name_attr.
+
+ Arguments:
+ tag -- Can be pin the elements to search for to specific tag names. Default: wildcard
+ amount -- If set to an integer {0, 2, 3, ...}, the returned list will contain this amount of elements.
+ '''
+ # Try to fetch the elements from the cache
+ key = (entity, name_attr, tag)
+ if key in self._cache_elements:
+ results = self._cache_elements[key]
+ # Otherwise, fetch them from the XML file:
+ else:
+ xml_doc = self._get_xml_doc(entity)
+ results = xml_doc.findall("//"+tag+"[@name='"+name_attr+"']")
+ self._cache_elements[key] = results
+ # Check if we found the right amount of elements
+ if amount is not None and len(results) != amount:
+ fmt = "Could not find the desired amount of tags with attribute name=%s: found %d instead of %d"
+ raise ValueError(fmt % (name_attr, len(results), amount))
+ # Return
+ return results
+
+ def _get_unique_element_by_name_attr(self, entity, name_attr, tag='*'):
+ results = self._get_elements_by_name_attr(entity, name_attr, tag=tag)
+ # Return the result if we found a single one and raise exceptions otherwise:
+ if len(results) == 1:
+ return results[0]
+ elif len(results) == 0:
+ raise ValueError("No such element found: tag '%s' with attribute name=%s" % (tag, name_attr))
+ else:
+ fmt = "Non-unique search for tag '%s' with attribute name=%s! XML Database Error!"
+ raise ValueError(fmt % (tag, name_attr))
+
+ def _get_single_element_by_name_attr_prefer_field(self, entity, name_attr):
+ '''
+ This function will try to find a unique element:
+ * First, it tries if a field' element with that name attribute exists.
+ (Unique only among the 'field' elements)
+ * If that fails with no result, it retries extending the search to any
+ unique element (tag) with the given name attribute.
+ '''
+ try:
+ return self._get_unique_element_by_name_attr(entity, name_attr, tag='field')
+ except:
+ pass
+ return self._get_unique_element_by_name_attr(entity, name_attr, tag='*')
+
+ def find_field(self, entity, field):
+ return self._get_single_element_by_name_attr_prefer_field(entity, field)
+
+ def _get_element_addressing(self, entity, element):
+ '''
+ Determine the addressing of an element in the database:
+
+ * base_address: The address of the first register
+ * slices: The number of slices for this (or a parent) element,
+ set to None if there are no repetitions.
+ * stepsize: The stepsize to determine the address for any additional slices
+ * size: The size (number of elements) of the requested element
+
+ Returns:
+ tuple -- (base_address, slices, stepsize, size)
+ '''
+ if type(element) == str:
+ element = self._get_single_element_by_name_attr_prefer_field(entity, element)
+ base_address = 0
+ slices = None
+ size = int(element.get('size', '1'))
+ stepsize = 0
+ node = element
+ while node.tag in self.ENTITY_TAGS:
+ base_address += int(node.get('address', '0'), 16)
+ if node.tag == self.TOP_ENTITY: break
+ repeat = int(node.get('repeat', 1))
+ if repeat != 1:
+ slices = repeat
+ stepsize = int(node.get('size', '0'), 10)
+ node = node.getparent()
+ return (base_address, slices, stepsize, size)
+
+ def _get_all_element_addresses(self, entity, element):
+ '''
+ Determine all addresses of an element
+
+ Returns:
+ list -- containing all addresses of an element
+ '''
+ base_address, slices, stepsize, size = self._get_element_addressing(entity, element)
+ return [base_address + i * stepsize for i in range(slices or 1)]
+
+ def _contained_fields(self, entity, element):
+ '''
+ name specifies the 'name' attribute of a
+ {<TrbNetEntity>,<group>,<register>,<field>}
+ tag in the .xml file corresponding to entity.
+ Returns a list of all fields contained in the element.
+ '''
+ if type(element) == str:
+ element = self._get_single_element_by_name_attr_prefer_field(entity, element)
+ if element.tag == 'field' and element.get('name'):
+ return [element.get('name')]
+ fields = element.findall(".//field[@name]")
+ return [field.get('name') for field in fields]
+
+ def _determine_continuous_register_blocks(self, entity, element):
+ register_blocks = []
+ if type(element) == str:
+ element = self._get_single_element_by_name_attr_prefer_field(entity, element)
+ base_address, slices, stepsize, size = self._get_element_addressing(entity, element)
+ continuous = element.get('continuous', 'false') == 'true'
+ #print("el:", element.get('name'), "address:", hex(base_address), "size:", size, "last (tent.):", hex(base_address+size-1) ,"continuous:", continuous, "slices:", slices or 1)
+ if continuous or element.tag in ('register', 'field'):
+ register_blocks += [(base_address + i*stepsize, size) for i in range(slices or 1)]
+ else:
+ for child in element:
+ if child.tag not in self.ENTITY_TAGS:
+ continue
+ register_blocks += self._determine_continuous_register_blocks(entity, child)
+ return register_blocks
+
+ def _get_field_identifier(self, entity, field_name, trb_address, slice=None):
+ identifier = "{}-0x{:04x}-{}".format(entity, trb_address, field_name)
+ if slice is not None:
+ identifier += "." + str(slice)
+ return identifier
+
+ def _get_field_hierarchy(self, entity, field):
+ # Try to fetch the field hierarchy from the cache and return it:
+ key = (entity, field)
+ if key in self._cache_field_hierarchy:
+ return self._cache_field_hierarchy[key]
+ # Otherwise, construct the hierarchy by walking up the tree from the
+ # to the top level XML entity and add it to the cache:
+ if type(field) == str:
+ field = self.find_field(entity, field)
+ stack = []
+ node = field
+ while node.tag in self.ENTITY_TAGS:
+ stack.append(node.get('name'))
+ if node.tag == self.TOP_ENTITY: break
+ node = node.getparent()
+ hierarchy = list(reversed(stack))
+ self._cache_field_hierarchy[key] = hierarchy
+ return hierarchy
+
+ def _get_field_info(self, entity, field):
+ # Try to fetch the field info from the cache and return it:
+ key = (entity, field)
+ if key in self._cache_field_info:
+ return self._cache_field_info[key]
+ # Otherwise, construct the field info by reading in its XML information
+ if type(field) == str:
+ field = self.find_field(entity, field)
+ field_name = field.get('name')
+ info = {
+ 'addresses': self._get_all_element_addresses(entity, field),
+ 'start': int(field.get('start', 0)),
+ 'bits': int(field.get('bits', 32)),
+ 'format': field.get('format', 'unsigned'),
+ 'unit': field.get('unit', ''),
+ 'scale': float(field.get('scale', 1.0)),
+ 'scaleoffset': float(field.get('scaleoffset', 0.0)),
+ 'meta': {}
+ #'errorFlag': ,
+ #'invertFlag': ,
+ }
+ if info['format'] == 'enum':
+ results = field.findall("enumItem")
+ choices = {}
+ for result in results:
+ choices[int(result.get('value'))] = result.text
+ info['meta']['choices'] = choices
+ DynamicEnum = enum.Enum(field_name, {v: k for k, v in choices.items()})
+ info['meta']['enum'] = DynamicEnum
+ self._cache_field_info[key] = info
+ return info
+
+ def convert_field(self, entity, field_name, register_word, trb_address=0xffff, slice=None):
+ info = self._get_field_info(entity, field_name)
+ address = info['addresses'][slice if slice is not None else 0]
+ start = info['start']
+ bits = info['bits']
+ format = info['format']
+ unit = info['unit']
+ scale = info['scale']
+ scaleoffset = info['scaleoffset']
+ meta = info['meta']
+ #errorFlag =
+ #invertFlag =
+ raw = (register_word >> start) & ((1 << bits) -1)
+ value = {
+ 'raw': raw,
+ 'string': raw,
+ 'python': None,
+ 'unicode': None,
+ }
+ if format == 'unsigned':
+ val = round(scale * raw + scaleoffset)
+ val = val if val >= 0 else 0
+ value['python'] = val
+ value['string'] = str(val)
+ elif format == 'float':
+ val = float(raw)
+ val *= scale
+ val += scaleoffset
+ value['python'] = val
+ value['string'] = '%.2f' % val
+ elif format == 'time':
+ val = dt.utcfromtimestamp(raw)
+ value['python'] = val
+ value['string'] = val.strftime('%Y-%m-%d %H:%M')
+ elif format == 'hex':
+ fmt = '0x{:0%dx}' % ((bits+3)/4)
+ value['python'] = raw
+ value['string'] = fmt.format(raw)
+ elif format in ('integer', 'signed'):
+ val = round(scale * raw + scaleoffset)
+ value['python'] = val
+ value['string'] = str(val)
+ elif format == 'boolean':
+ val = bool(raw)
+ value['python'] = val
+ value['string'] = str(val).lower()
+ elif format == 'enum':
+ DynamicEnum = meta['enum']
+ if raw in meta['choices']:
+ value['python'] = DynamicEnum(raw)
+ value['string'] = meta['choices'][raw]
+ else:
+ value['python'] = raw
+ value['string'] = str(raw)
+ elif format == 'bitmask':
+ fmt = '{:0%db}' % (bits)
+ val = fmt.format(raw)
+ value['python'] = raw
+ value['string'] = val
+ value['unicode'] = val.replace('0', '□').replace('1', '■')
+ elif format == 'binary':
+ fmt = '0b{:0%db}' % (bits)
+ value['python'] = raw
+ value['string'] = fmt.format(raw)
+ else:
+ raise NotImplementedError('format: ' + format)
+ if value['unicode'] is None:
+ value['unicode'] = value['string']
+ if unit:
+ value['string'] += ' ' + unit
+ value['unicode'] += ' ' + unit
+ identifier = self._get_field_identifier(entity, field_name, trb_address, slice=slice)
+ hierarchy = self._get_field_hierarchy(entity, field_name)
+ context = {
+ 'address': address,
+ 'identifier': identifier,
+ 'hierarchy': hierarchy,
+ 'trb_address': trb_address,
+ 'field_name': field_name,
+ }
+ return {
+ 'value': value,
+ 'unit': unit,
+ 'meta': meta,
+ 'format': format,
+ 'context': context
+ }