# PyTrbNet
-This is a Python package wrapping libtrbnet.so.
+This is a Python package wrapping libtrbnet.so
+from trbnettools.
It allows accessing trbnet registers from Python.
The package comes with two additional features:
status information to the EPICS detector control
system.
+### Installation
+
+This package can be installed from the Python Package
+Index via:
+
+ pip install trbnet
+
+### Configuration / Setup
+
+As this Python package depends on the shared library
+libtrbnet.so for the communication with TrbNet,
+its location needs to be provided.
+This can be done by setting the environment variable
+`LIBTRBNET`.
+
+libtrbnet.so in turn requires the environment variables
+`DAQOPSERVER` (if talking to TrbNet via a trbnetd daemon)
+or the IP of a TRB Board: `TRB3_SERVER` (if talking to
+TrbNet directly).
+
+A fourth environment variable `XMLDB` comes into play,
+if the xmldb capabilities of this Python package are
+to be used. It should point to the location of the
+xml-db for your system.
+
+Those environment variables can also be set from within
+Python with their lowercase variants
+`libtrbnet`, `daqopserver`, and `trb3_server` upon
+instantiating the TrbNet() class.
+
+Here's an example using environment variables in the shell:
+
+
+```sh
+# Setting the relevant environment variables
+export LIBTRBNET=/local/gitrepos/trbnettools/trbnetd/libtrbnet.so
+export DAQOPSERVER=jspc55.x-matter.uni-frankfurt.de:1
+export XMLDB=/local/gitrepos/daqtools/xml-db/database
+
+# example call to get the value in the compile time
+# register for all reachable TRBs:
+trbcmd.py xmlget 0xffff TrbNet CompileTime
+
+# With the environment variables set, you could also
+# run Python and instantiate TrbNet(). It would
+# pick up the settings from the exported variables.
+```
+
+or by setting the variables from within Python:
+
+```python
+import os
+from trbnet import TrbNet
+
+lib = '/local/gitrepos/trbnettools/trbnetd/libtrbnet.so'
+host = 'trbnetd_host:8888'
+t = TrbNet(libtrbnet=lib, daqopserver=host)
+# 0x40 is the register address of CompileTime
+t.register_read(0xffff, 0x40)
+```
+
+### Usage with Python
+
+To read the content of the register address 0x0 for all
+connected TrbNet boards (broadcast address 0xffff), do:
+
+```python
+import os
+from trbnet import TrbNet
+
+t = TrbNet()
+
+response = t.register_read(0xffff, 0x0)
+for endpoint in response:
+ print("endpoint 0x{:08X} responded with: 0x{:08X}".format(endpoint, response[endpoint]))
+```
+
+The TrbNet() class has the following methods:
+
+* `register_read(trb_address, reg_address)`
+* `register_write(trb_address, reg_address, value)`
+* `register_read_mem(trb_address, reg_address, option, size)`
+* `read_uid(trb_address)`
+* Furthermore, multiple methods starting with `trb_` (e.g. `trb_set_address(uid, endpoint, trb_address)`)
+ can be called as they are inherited from [the parent class `_TrbNet`][trbnet/core/lowlevel.py].
+
+### Usage of the Terminal Utility trbcmd.py
+
+The package comes with a simple command line utility called `trbcmd.py`.
+It is a Python counterpart for the trbcmd utility provided
+by trbnettools.
+
+What it can do:
+
+**read register values**
+
+```
+trbcmd.py r 0xffff 0x0
+```
+
+**read memory (subsequent register addresses)**
+
+Read three registers starting at 0x8005 from all boards:
+```
+trbcmd.py rm 0xffff 0x8005 0x3 0x0
+```
+
+**xml-db queries**
+
+Ask all TrbNet nodes (broadcast 0xffff) for the register value of CompileTime as set in TrbNet.xml:
+
+```
+trbcmd.py xmlget 0xffff TrbNet CompileTime
+```
+
### Resources
* [The TRB Website](http://trb.gsi.de)
* [TrbNet Manual](http://jspc29.x-matter.uni-frankfurt.de/docu/trbnetdocu.pdf)
+
+[trbnet/core/lowlevel.py]: https://github.com/pklaus/pytrbnet/blob/master/trbnet/core/lowlevel.py
import ctypes
import os
+from typing import List, Tuple, Union
+
from .error import TrbException
# TODO: use warnings to indicate access to wrong register or no data
Wrapper class for trbnet access using python
'''
- def __init__(self, libtrbnet=None, daqopserver=None, trb3_server=None, buffersize=4194304):
+ def __init__(self, libtrbnet: str = None, daqopserver: str = None, trb3_server: str = None, buffersize: int = 4194304):
'''
Constructor for the low level TrbNet class.
Loads the shared library (libtrbnet), sets enviromental variables and initialises ports.
trb3_server -- optional override of the TRB3_SERVER enviromental variable
buffersize -- Size of the buffer in 32-bit words when reading back data (default: 16MiB)
'''
- if trb3_server: os.environ['TRB3_SERVER'] = trb3_server
- if daqopserver: os.environ['DAQOPSERVER'] = daqopserver
- self.buffersize = buffersize
if not libtrbnet:
from .libutils import _find_lib
libtrbnet =_find_lib('trbnet')
self.libtrbnet = libtrbnet
+ if daqopserver: os.environ['DAQOPSERVER'] = daqopserver
+ if trb3_server: os.environ['TRB3_SERVER'] = trb3_server
+ self.buffersize = buffersize
self.trblib = ctypes.cdll.LoadLibrary(libtrbnet)
self.declare_types()
status = self.trblib.init_ports()
except AttributeError:
pass
- def trb_errno(self):
+ def trb_errno(self) -> int:
'''
Returns trb_errno flag value
'''
return ctypes.c_int.in_dll(self.trblib, 'trb_errno').value
- def trb_term(self):
+ def trb_term(self) -> Tuple[int, int, int, int]:
+ '''
+ Return the TRB_TERM info as tuple consisting of:
+ (status_common, status_channel, sequence, channel)
+ '''
term = TrbTerm.in_dll(self.trblib, 'trb_term')
return (term.status_common, term.status_channel, term.sequence, term.channel)
self.trblib.trb_register_read_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8, ctypes.c_uint16,
ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint]
self.trblib.trb_register_read_mem.restype = ctypes.c_int
+ self.trblib.trb_register_write_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8,
+ ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint16]
+ self.trblib.trb_register_read_mem.restype = ctypes.c_int
self.trblib.trb_registertime_read_mem.argtypes = [ctypes.c_uint16, ctypes.c_uint16, ctypes.c_uint8, ctypes.c_uint16,
ctypes.POINTER(ctypes.c_uint32), ctypes.c_uint]
self.trblib.trb_registertime_read_mem.restype = ctypes.c_int
self.trblib.trb_termstr.restype = ctypes.c_char_p
- def trb_errorstr(self, errno):
+ def trb_errorstr(self, errno: int) -> str:
'''
Get error string for an integer error number.
_result = self.trblib.trb_errorstr(errno)
return _result.decode('ascii')
- def trb_register_read(self, trb_address, reg_address):
+ def trb_register_read(self, trb_address: int, reg_address: int) -> List[int]:
'''
Read value from trb register.
raise TrbException('Error while reading trb register.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
- def trb_register_write(self, trb_address, reg_address, value):
+ def trb_register_write(self, trb_address: int, reg_address: int, value: int):
'''
Write trb register
errno = self.trb_errno()
raise TrbException('Error while writing trb register.', errno, self.trb_errorstr(errno))
- def trb_register_read_mem(self, trb_address, reg_address, option, size):
+ def trb_register_read_mem(self, trb_address: int, reg_address: int, option: int, size: int) -> List[int]:
'''
Perform several trb register reads
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
option = ctypes.c_uint8(option)
- status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(self.buffersize))
+ size = ctypes.c_uint16(size)
+ dsize = ctypes.c_uint(self.buffersize)
+ status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, size, data_array, dsize)
if status == -1:
errno = self.trb_errno()
raise TrbException('Error while reading trb register memory.',
errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
- def trb_read_uid(self, trb_address):
+ def trb_register_write_mem(self, trb_address: int, reg_address: int, option: int, values: List[int], size: int = None):
+ '''
+ Write several trb registers
+
+ Arguments:
+ trb_address -- node(s) to write to
+ reg_address -- register address
+ option -- write option, 0 = write same register several times 1 = write adjacent registers
+ values -- list of values to write to register(s)
+ '''
+
+ data_array = (ctypes.c_uint32 * len(values))(*values)
+ trb_address = ctypes.c_uint16(trb_address)
+ reg_address = ctypes.c_uint16(reg_address)
+ option = ctypes.c_uint8(option)
+ size = size or ctypes.c_uint16(len(values))
+ status = self.trblib.trb_register_write_mem(trb_address, reg_address, option, data_array, size)
+ if status == -1:
+ errno = self.trb_errno()
+ raise TrbException('Error while writing trb register memory.', errno, self.trb_errorstr(errno))
+
+ def trb_read_uid(self, trb_address: int) -> List[int]:
'''
Read unique id(s) of TrbNet node(s)
errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
- def trb_set_address(self, uid, endpoint, trb_address):
+ def trb_set_address(self, uid: int, endpoint: int, trb_address: int):
'''
Set trb net address
# rarely used funtions without documentation in trbnet.h
# meaning of arguments and returned data unknown
- def network_reset(self):
+ def network_reset(self) -> int:
'''TRB network reset'''
return self.trblib.network_reset()
- def com_reset(self):
+ def com_reset(self) -> int:
'''communication reset'''
return self.trblib.com_reset()
- def trb_fifo_flush(self, channel):
+ def trb_fifo_flush(self, channel: int) -> int:
'''flush trb fifo
Arguments:
channel = ctypes.c_uint8(channel)
return self.trblib.trb_fifo_flush(channel)
- def trb_send_trigger(self, trigtype, info, random, number):
+ def trb_send_trigger(self, trigtype: int, info: int, random: int, number: int) -> int:
'''send trigger to trb
Arguments:
number = ctypes.c_uint16(number)
return self.trblib.trb_send_trigger(trigtype, info, random, number)
- def trb_register_setbit(self, trb_address, reg_address, bitmask):
+ def trb_register_setbit(self, trb_address: int, reg_address: int, bitmask: int) -> int:
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
bitmask = ctypes.c_uint32(bitmask)
return self.trblib.trb_register_setbit(trb_address, reg_address,
bitmask)
- def trb_register_clearbit(self, trb_address, reg_address, bitmask):
+ def trb_register_clearbit(self, trb_address: int, reg_address: int, bitmask: int) -> int:
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
bitmask = ctypes.c_uint32(bitmask)
return self.trblib.trb_register_clearbit(trb_address, reg_address,
bitmask)
- def trb_register_loadbit(self, trb_address, reg_address, bitmask, bitvalue):
+ def trb_register_loadbit(self, trb_address: int, reg_address: int, bitmask: int, bitvalue: int) -> int:
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
bitmask = ctypes.c_uint32(bitmask)
return self.trblib.trb_register_loadbit(trb_address, reg_address,
bitmask, bitvalue)
- def trb_registertime_read_mem(self, trb_address, reg_address, option, size):
- data_array = (ctypes.c_uint32 * self.buffersize)()
+ def trb_registertime_read_mem(self, trb_address: int, reg_address: int, option: int, size: int) -> List[int]:
trb_address = ctypes.c_uint16(trb_address)
reg_address = ctypes.c_uint16(reg_address)
option = ctypes.c_uint8(option)
- status = self.trblib.trb_register_read_mem(trb_address, reg_address, option, ctypes.c_uint16(size), data_array, ctypes.c_uint(self.buffersize))
+ size = ctypes.c_uint16(size)
+ data_array = (ctypes.c_uint32 * self.buffersize)()
+ dsize = ctypes.c_uint(self.buffersize)
+ status = self.trblib.trb_registertime_read_mem(trb_address, reg_address, option, size, data_array, dsize)
if status == -1:
errno = self.trb_errno()
raise TrbException('Error while reading trb register memory.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
- def trb_ipu_data_read(self, trg_type, trg_info, trg_random, trg_number, size):
+ def trb_ipu_data_read(self, trg_type: int, trg_info: int, trg_random: int, trg_number: int, size: int) -> List[int]:
trg_type = ctypes.c_uint8(trg_type)
trg_info = ctypes.c_uint8(trg_info)
trg_random = ctypes.c_uint8(trg_random)
trg_number = ctypes.c_uint16(trg_number)
data_array = (ctypes.c_uint32 * self.buffersize)()
- status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, ctypes.c_uint(self.buffersize))
+ dsize = ctypes.c_uint(self.buffersize)
+ status = self.trblib.trb_ipu_data_read(trg_type, trg_info, trg_random, trg_number, data_array, dsize)
if status == -1:
errno = self.trb_errno()
raise TrbException('Error while reading trb ipu data.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
- def trb_nettrace(self, trb_address, size):
+ def trb_nettrace(self, trb_address: int):
trb_address = ctypes.c_uint16(trb_address)
data_array = (ctypes.c_uint32 * self.buffersize)()
- status = self.trblib.trb_nettrace(trb_address, data_array, ctypes.c_uint(self.buffersize))
+ dsize = ctypes.c_uint(self.buffersize)
+ status = self.trblib.trb_nettrace(trb_address, data_array, dsize)
if status == -1:
errno = self.trb_errno()
raise TrbException('Error while doing net trace.', errno, self.trb_errorstr(errno))
return [data_array[i] for i in range(status)]
- def trb_termstr(self, term):
+ def trb_termstr(self, term: Union[Tuple[int, int, int, int], TrbTerm]) -> str:
'''
Get string representation for network termination packet info tuple.