from .opus20 import CHANNEL_SPEC as OPUS20_CHANNEL_SPEC
from .opus20 import Opus20Exception, Opus20ConnectionException
from .opus20 import Object
+from .opus20 import discover_OPUS20_devices
+from .fakeserver import Opus20FakeServer
from .webapp import PlotWebServer
--- /dev/null
+
+
+from datetime import datetime, timedelta
+import socket
+
+from .opus20 import Frame
+
+class Opus20FakeServer(object):
+ """
+ A TCP server imitating (faking) the
+ behaviour of a Lufft OPUS20 device.
+ """
+ def __init__(self, host='', port=52015):
+ self.host = host
+ self.port = port
+ self.communication_samples = []
+
+ def bind_and_serve(self):
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.bind((self.host, self.port))
+ self.s.listen(1)
+ try:
+ while True:
+ conn, addr = self.s.accept()
+ print('Connected by', addr)
+ while True:
+ data = conn.recv(1024)
+ if not data: break
+ input_frame = Frame(data)
+ output_frame = self.react_to_input_frame(input_frame)
+ conn.sendall(output_frame.data)
+ conn.close()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ self.s.close()
+
+ def react_to_input_frame(self, input_frame):
+ output_frame = None
+ input_frame.validate()
+ in_props = input_frame.props
+ for sample in self.communication_samples:
+ sample_props = sample['in'].props
+ if in_props.cmd != sample_props.cmd:
+ continue
+ if in_props.payload != sample_props.payload:
+ continue
+ output_frame = sample['out']
+ break
+ if output_frame is None:
+ # 0x10 - Unknown CMD
+ output_frame = Frame.from_cmd_and_payload(in_props.cmd, b"\x10")
+ return output_frame
+
+ def feed_with_communication_log(self, l2p_frames_file):
+ """ Feed the fake server with l2p frames stored in a communication log file """
+ num_incoming, num_outgoing = 0, 0
+ num_short, num_long = 0, 0
+
+
+ def gt(dt_str):
+ dt, _, us= dt_str.partition(".")
+ dt= datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S")
+ us= int(us.rstrip("Z"), 10)
+ return dt + timedelta(microseconds=us)
+ #return dt
+
+ with open(l2p_frames_file) as fp:
+
+ timestamp = None
+ incoming = None
+ outgoing = None
+
+ for line in fp:
+
+ line = line.strip()
+ if not line: continue
+
+ kind = None
+ if line.startswith('Timestamp'):
+ kind = 'timestamp'
+ timestamp = gt(line.split(' ')[1].replace(' ', 'T'))
+ elif line.startswith('<- '):
+ kind = 'incoming'
+ num_incoming += 1
+ elif line.startswith('-> '):
+ kind = 'outgoing'
+ num_outgoing += 1
+
+ if kind in ('incoming', 'outgoing'):
+ frame_bytes = bytes(int(byte, 16) for byte in line[3:].split())
+ frm = Frame(frame_bytes)
+ try:
+ frm.validate()
+ except Exception as e:
+ print(e)
+ pdb.set_trace()
+
+ if kind == 'incoming':
+ incoming = frm
+ elif kind == 'outgoing':
+ outgoing = frm
+ self.communication_samples.append({'ts': timestamp, 'in': incoming, 'out': outgoing})
+
import logging
from datetime import datetime, timedelta
import pickle
+import threading
+import ipaddress
+clock = time.perf_counter
logger = logging.getLogger(__name__)
class Opus20(object):
self.device_id = ''.join("{:02X}".format(byte) for byte in answer.props.payload[2:2+6])
logger.info("Connected to device with ID: " + self.device_id)
+ def sync_datetime(self, new_datetime=None, tz_offset=None):
+ if not new_datetime: new_datetime = datetime.now().replace(microsecond=0)
+ if not tz_offset: tz_offset = round((datetime.now() - datetime.utcnow()).total_seconds())
+ offset_sign = '+' if tz_offset >= 0 else '-'
+ offset_hours = abs(tz_offset) // 3600;
+ offset_minutes = (abs(tz_offset) % 3600) // 60;
+ logger.info("Setting date & time on device to {}{:+03}{:02}".format(new_datetime.isoformat(), offset_hours, offset_minutes))
+ new_datetime = int(new_datetime.timestamp())
+ frame = Frame.from_cmd_and_payload(0x27, struct.pack('<ii', new_datetime, tz_offset))
+ #answer = self.query_frame(frame)
+ answer = Frame.from_cmd_and_payload(0x27, b"\x00")
+ answer.validate()
+ assert answer.props.cmd == 0x27
+ answer.assert_status()
+
+ def clear_log(self):
+ frame = Frame.from_cmd_and_payload(0x46, b"")
+ answer = self.query_frame(frame)
+ answer.validate()
+ assert answer.props.cmd == 0x46
+ answer.assert_status()
+
+ def start_logging(self):
+ self.set_logging_state(True)
+
+ def stop_logging(self):
+ self.set_logging_state(False)
+
+ def set_logging_state(self, enable_logging=True):
+ enable_logging = b"\x01" if enable_logging else b"\x00"
+ frame = Frame.from_cmd_and_payload(0x45, b"\x43" + enable_logging)
+ answer = self.query_frame(frame)
+ answer.validate()
+ assert answer.props.cmd == 0x45
+ answer.assert_status()
+
+ def get_logging_state(self):
+ frame = Frame.from_cmd_and_payload(0x44, b"\x43")
+ answer = self.query_frame(frame)
+ answer.validate()
+ props = answer.props
+ assert len(props.payload) == 3
+ sub_cmd, state = struct.unpack('<xB?', props.payload)
+ assert props.cmd == 0x44
+ answer.assert_status()
+ assert sub_cmd == 0x43
+ return state
+
+ def set_channel_logging_state(self, channel, enable_logging=True):
+ payload = b"\x22" + struct.pack('<H?', channel, enable_logging)
+ frame = Frame.from_cmd_and_payload(0x45, payload)
+ answer = self.query_frame(frame)
+ answer.validate()
+ props = answer.props
+ assert props.cmd == 0x45
+ assert len(props.payload) == 6
+ assert props.payload[1] == 0x22
+ answer.assert_status()
+ return struct.unpack('<I', props.payload[2:6])[0]
+
+ def get_channel_logging_state(self, channel):
+ payload = b"\x22" + struct.pack('<H', channel)
+ frame = Frame.from_cmd_and_payload(0x44, payload)
+ answer = self.query_frame(frame)
+ answer.validate()
+ props = answer.props
+ assert len(props.payload) == 5
+ sub_cmd, nch, state = struct.unpack('<xBH?', props.payload)
+ assert props.cmd == 0x44
+ answer.assert_status()
+ assert sub_cmd == 0x22
+ assert nch == channel
+ return state
+
def channel_value(self, channel: int):
query_frame = Frame.from_cmd_and_payload(0x23, struct.pack('<H', channel))
answer_frame = self.query_frame(query_frame)
FRAME_KINDS = [
#
Object(cmd=0x1E, payload_check=[], payload_length= 0, name='network discovery request'),
- Object(cmd=0x1E, payload_check=[], payload_length= 35, name='network discovery answer'),
+ Object(cmd=0x1E, payload_check=[0x00], payload_length= 35, name='network discovery answer', func=self.discovery_result),
#
Object(cmd=0x23, payload_check=[], payload_length= 2, name='online single channel request'),
Object(cmd=0x23, payload_check=[0x00,], payload_length= 8, name='online single channel answer', func=self.online_data_request_single),
if payload_matches: return knd
return None
+ def discovery_result(self):
+ props = self.props
+
+ assert props.cmd == 0x1E
+ assert len(props.payload) == 35
+ answer.assert_status()
+
+ dr = Object()
+ dr.device_id = ''.join("{:02X}".format(byte) for byte in props.payload[1:1+6])
+ dr.ip = ipaddress.IPv4Address(props.payload[9:9+4])
+ dr.gw = ipaddress.IPv4Address(props.payload[13:13+4])
+ dr.mask = ipaddress.IPv4Address(props.payload[17:17+4])
+ dr.net = ipaddress.IPv4Network('{}/{}'.format(dr.ip, dr.mask), strict=False)
+ return dr.to_dict()
+
def available_channels(self):
# cmd="31 10" (which channels are available in device?)
props = self.props
assert props.length >= 3, 'message too short for an answer containing the available channels'
- assert props.cmd == 0x31 and props.verc == 0x10 and props.payload[1] == 0x16
+ assert props.cmd == 0x31 and props.payload[1] == 0x16
self.assert_status()
logger.debug("Channel Query (31 10 16)")
def channel_properties(self):
props = self.props
assert len(props.payload) == 85
- assert props.cmd == 0x31 and props.verc == 0x10
- assert props.payload[0:2] == b"\x00\x30"
+ assert props.cmd == 0x31
+ self.assert_status()
+ assert props.payload[1] == 0x30
channel, group, name, unit, kind, min, max = struct.unpack('<HB40s30sBxff', props.payload[2:2+83])
name = name.decode('ascii').replace('\x00','').strip()
unit = unit.decode('utf-16-le').replace('\x00','').strip()
props = self.props
assert props.length >= 3, 'message too short for an online data request with a single channel'
- assert props.cmd == 0x23 and props.verc == 0x10
+ assert props.cmd == 0x23
self.assert_status()
logger.debug("Online Data Request (single channel) (23 10)")
channel_value = Frame.read_channel_value(props.payload, 1, length=7, status=self.status)
props = self.props
assert props.length >= 3, 'message too short for an online data request with multiple channels'
- assert props.cmd == 0x2F and props.verc == 0x10
+ assert props.cmd == 0x2F
self.assert_status()
logger.debug("Online Data Request (multiple channels) (2F 10)")
props = self.props
assert props.length >= 21, 'message too short for a log data message'
- assert props.cmd == 0x24 and props.verc == 0x10 and props.payload[0:2] == b"\x00\x20"
+ assert props.cmd == 0x24 and props.payload[1] == 0x20
+ self.assert_status()
is_final, begin, end, interval, num_blocks = struct.unpack('<xx?xxxxiiIH', props.payload[0:21])
begin = datetime.fromtimestamp(begin)
return ' '.join('{:02X}'.format(byte) for byte in raw)
+class UdpListenerThread(threading.Thread):
+ DETECTION_TIMEOUT = 0.25
+ def __init__ (self,port,callback):
+ """
+ Listens for packages via UDP. Calls callback for each response.
+ callback([frm, (ip, port), answer_time])
+ """
+ threading.Thread.__init__(self)
+ self.__port = port
+ self.__callback = callback
+ self.__start_time = clock()
+ def run(self):
+ addr = ('', self.__port)
+ UDPinsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ UDPinsock.bind(addr)
+ UDPinsock.settimeout(self.DETECTION_TIMEOUT)
+ while True:
+ try:
+ """ Receive messages """
+ data, addr = UDPinsock.recvfrom(1024)
+ # keep timestamp of arriving package
+ answer_time = clock()
+ except:
+ """ server timeout """
+ break
+ frm = Frame(data)
+ frm.validate()
+ try:
+ frm.validate()
+ except:
+ logger.warning("received a response that didn't validate: " + repr(data))
+ self.__callback((frm, addr, (answer_time - self.__start_time)*1000))
+ UDPinsock.close()
+
+def discover_OPUS20_devices(callback, bind_addr=""):
+ dest = ('<broadcast>',52010)
+ myUDPintsockThread = UdpListenerThread(52005, callback)
+ myUDPintsockThread.start()
+
+ UDPoutsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ # to allow broadcast communication:
+ UDPoutsock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+ UDPoutsock.bind((bind_addr,0))
+ frm = Frame.from_cmd_and_payload(0x1e, b"")
+ UDPoutsock.sendto(frm.data, dest)
+
+ myUDPintsockThread.join()
+
def crc16(data : bytes):
""" Calculates a CRC-16 CCITT checksum. data should be of type bytes() """
# https://en.wikipedia.org/wiki/Cyclic_redundancy_check
# Find out where our resource files are located:
try:
- from pkg_resources import resource_filename, Requirement
+ from pkg_resources import resource_filename, Requirement, require
PATH = resource_filename("opus20", "webapp")
except:
PATH = './'
@view('about.jinja2')
def _about_page(self):
- return self._atg({'active': 'about'})
+ version = require("opus20")[0].version
+ return self._atg({'active': 'about', 'opus20_version': version})
@view('plots.jinja2')
def _plots_page(self):
<p class="lead">
Environmental Monitoring with OPUS20
</p>
+ <p>Version: v{{opus20_version}}</p>
<p style="width: 23em; margin-left: auto; margin-right: auto;">
The Python package opus20 allows for easy environmental monitoring using the OPUS20 logger by Lufft.
It was written by <a href="mailto:klaus@physik.uni-frankfurt.de">Philipp Klaus</a> for use in the clean room laboratory of the <a href="https://www.uni-frankfurt.de/51839189/X-treme-Matter-Group">X-Matter Group</a> at the University of Frankfurt.
var device_id = "{{ device_id }}";
+{% endblock %}
+
+{% block bottom_javascript %}
+
$(function(){
$('#fetchData').on('click', function(){
var $btn = $(this);
from opus20 import Opus20, OPUS20_CHANNEL_SPEC, PickleStore, Opus20ConnectionException
clock = time.perf_counter
-logger = logging.getLogger('opus_cli')
+logger = logging.getLogger('opus20_cli')
def extended_int(string):
if string.startswith('0x'):
def main():
- parser = argparse.ArgumentParser(description="CLI for the Lufft Opus20")
+ parser = argparse.ArgumentParser(description="CLI for the Lufft OPUS20")
parser.add_argument('host', help='hostname of the device')
- parser.add_argument('--port', '-p', type=int, help='port for TCP connections')
- parser.add_argument('--timeout', '-t', type=float, help='timeout for the TCP connection')
- parser.add_argument('--loglevel', '-l', choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], help='log level')
- subparsers = parser.add_subparsers(help='cmd help', dest='cmd')
- parser_a = subparsers.add_parser('list', help='list all possible channels')
- parser_b = subparsers.add_parser('get', help='get the value(s) of specific channel(s)')
- parser_b.add_argument('channel', type=extended_int, nargs='+', help='The selected channel(s)')
- parser_c = subparsers.add_parser('download', help='download the logs')
- parser_c.add_argument('persistance_file', help='file to store the logs in')
+ parser.add_argument('--port', '-p', type=int, help='TCP port of the OPUS20')
+ parser.add_argument('--timeout', '-t', type=float, help='Timeout of the TCP connection in seconds')
+ parser.add_argument('--loglevel', choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], help='Sets the verbosity of this script')
+ subparsers = parser.add_subparsers(title='commands', help='', dest='cmd')
+ parser_list = subparsers.add_parser('list', help='list all possible measurement channels')
+ parser_get = subparsers.add_parser('get', help='get the value(s) of specific channel(s)')
+ parser_get.add_argument('channel', type=extended_int, nargs='+', help='The selected channel(s)')
+ parser_download = subparsers.add_parser('download', help='download the logs and store them locally')
+ parser_download.add_argument('persistance_file', help='file to store the logs in')
+ parser_logging = subparsers.add_parser('logging', help='change or query global logging settings')
+ subsubparsers = parser_logging.add_subparsers(help='Action to perform w/ respect to logging', dest='action')
+ parser_logging_action_status = subsubparsers.add_parser('status', help='Query the current logging status of the device')
+ parser_logging_action_start = subsubparsers.add_parser('start', help='Start logging altogether on the device')
+ parser_logging_action_stop = subsubparsers.add_parser('stop', help='Stop logging altogether on the device')
+ parser_logging_action_clear = subsubparsers.add_parser('clear', help='Clear the log history on the device')
+ parser_enable = subparsers.add_parser('enable', help='enable logging for a specific channel')
+ parser_enable.add_argument('channel', type=extended_int, nargs='+', help='The selected channel(s)')
+ parser_disable = subparsers.add_parser('disable', help='disable logging for a specific channel')
+ parser_disable.add_argument('channel', type=extended_int, nargs='+', help='The selected channel(s)')
args = parser.parse_args()
if not args.cmd: parser.error('please select a command')
+ if args.cmd == 'logging' and not args.action: parser.error('please select a logging action')
if args.loglevel:
logging.basicConfig(level=getattr(logging, args.loglevel.upper()))
log_data = o20.download_logs(start_datetime=max_ts)
ps.add_data(o20.device_id, log_data)
ps.persist()
+ if args.cmd == 'logging':
+ def logging_in_words(): return 'enabled' if o20.get_logging_state() else 'disabled'
+ if args.action == 'status':
+ print("Logging is currently " + logging_in_words() + ".")
+ elif args.action in ('start', 'stop'):
+ o20.set_logging_state(args.action == 'start')
+ logger.info("Logging is now " + logging_in_words() + ".")
+ elif args.action == 'clear':
+ o20.clear_log()
+ print('Clearing the log now. This will take a couple of minutes.')
+ print('You cannot make requests to the device during that time.')
+ o20.disconnect()
+ if args.cmd in ('enable', 'disable'):
+ enable = args.cmd == 'enable'
+ for channel in args.channel:
+ o20.set_channel_logging_state(enable)
except Opus20ConnectionException as e:
parser.error(str(e))
--- /dev/null
+#!/usr/bin/env python
+
+import sys
+import time
+import argparse
+import logging
+import functools
+
+from opus20 import Opus20, OPUS20_CHANNEL_SPEC, PickleStore, Opus20ConnectionException, discover_OPUS20_devices
+
+clock = time.perf_counter
+logger = logging.getLogger('opus20_discovery')
+
+def main():
+
+ parser = argparse.ArgumentParser(description="Discovery of Lufft OPUS20 devices on the local network")
+ parser.add_argument('bind_address', default="", nargs="?", help='The IP to bind to')
+ parser.add_argument('--loglevel', '-l', default="WARNING", choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], help='log level')
+ args = parser.parse_args()
+
+ if args.loglevel:
+ logging.basicConfig(level=getattr(logging, args.loglevel.upper()))
+
+ found_devices = []
+ def full_callback(found_devices, answer):
+ frm, host, answer_time = answer
+ dev_props = frm.kind.func()
+ dev_props['answer_time'] = answer_time
+ found_devices.append(dev_props)
+ callback = functools.partial(full_callback, found_devices)
+
+ start = clock()
+
+ print("\nTrying to find devices on interface {}...\n".format(args.bind_address))
+ discover_OPUS20_devices(callback, bind_addr=args.bind_address)
+ for device in found_devices:
+ print("[{answer_time:.2f} ms] Device ID: {device_id}, IP: {ip}, Gateway: {gw}, Network: {net}".format(**device))
+ print("\nFound a total number of {} devices.\n".format(len(found_devices)))
+
+ end = clock()
+ logger.info("script running time (net): {:.6f} seconds.".format(end-start))
+
+ sys.exit(0 if found_devices else 1)
+
+if __name__ == "__main__": main()
--- /dev/null
+#!/usr/bin/env python
+
+import argparse
+import logging
+import pdb
+
+from opus20 import Opus20FakeServer
+
+logger = logging.getLogger('opus20_fakeserver')
+
+def main():
+
+ parser = argparse.ArgumentParser(description="Discovery of Lufft OPUS20 devices on the local network")
+ parser.add_argument('bind_address', default="", nargs="?", help='The IP to bind to')
+ parser.add_argument('--feed_logfile', help='A log file to feed the fake server with l2p frames')
+ parser.add_argument('--loglevel', '-l', default="INFO", choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], help='log level')
+ args = parser.parse_args()
+
+ if args.loglevel:
+ logging.basicConfig(level=getattr(logging, args.loglevel.upper()))
+
+ fs = Opus20FakeServer(args.bind_address)
+ if args.feed_logfile:
+ logger.info("Feeding server with l2p example communication...".format(args.bind_address))
+ fs.feed_with_communication_log(args.feed_logfile)
+ logger.info("Starting fake OPUS20 server on interface {}...".format(args.bind_address))
+ fs.bind_and_serve()
+ logger.info("Fake OPUS20 server closed.")
+
+if __name__ == "__main__": main()
import argparse
import logging
-logger = logging.getLogger('opus_web')
+logger = logging.getLogger('opus20_web')
def main():
url = '',
license = 'GPL',
packages = ['opus20', 'opus20.webapp'],
- scripts = ['scripts/opus20_cli', 'scripts/opus20_web'],
+ scripts = [
+ 'scripts/opus20_cli',
+ 'scripts/opus20_web',
+ 'scripts/opus20_discovery',
+ 'scripts/opus20_fakeserver',
+ ],
include_package_data = True,
zip_safe = True,
platforms = 'any',