--- /dev/null
+
+from pcaspy import Driver, Alarm, Severity
+from opus20 import Opus20, OPUS20_CHANNEL_SPEC, Opus20ConnectionException
+
+class Opus20Driver(Driver):
+ def __init__(self, hostname, opus20_port=None, opus20_timeout=0.1):
+ self.hostname = hostname
+ self.opus20_port = opus20_port
+ self.opus20_timeout = opus20_timeout
+
+ self.connect_opus20()
+
+ super(Opus20Driver, self).__init__()
+
+ def connect_opus20(self):
+ kwargs = {}
+ if self.opus20_port: kwargs['port'] = self.opus20_port
+ if self.opus20_timeout: kwargs['timeout'] = self.opus20_timeout
+ self.o20 = Opus20(self.hostname, **kwargs)
+
+ def read(self, reason):
+ if reason in ('Temperature', 'RelativeHumidity', 'AbsoluteHumidity', 'Dewpoint', 'BatteryVoltage'):
+ mapping = {
+ 'Temperature': 0x0064,
+ 'RelativeHumidity': 0x00c8,
+ 'AbsoluteHumidity': 0x00cd,
+ 'Dewpoint': 0x006e,
+ 'BatteryVoltage': 0x2724,
+ }
+ try:
+ value = self.o20.channel_value(mapping[reason])
+ self.setParamStatus(reason, Alarm.NO_ALARM, Severity.NO_ALARM)
+ self.setParam(reason, value)
+ except:
+ self.setParamStatus(reason, Alarm.COMM_ALARM, Severity.MINOR_ALARM)
+ value = self.getParam(reason)
+ else:
+ value = self.getParam(reason)
+
+ return value
+
#!/usr/bin/env python
# external dependencies: PCASpy and opus20
-from pcaspy import Driver, SimpleServer, Alarm, Severity
-from opus20 import Opus20, OPUS20_CHANNEL_SPEC, PickleStore, Opus20ConnectionException
+import pcaspy
+import opus20
-# local module containing our PVs:
+# local modules
from lufft_opus20_pvdb import pvdb
+from lufft_opus20_driver import Opus20Driver
prefix = '{sys}:{sub}:ENVIRON:{esys}:'
-class Opus20Driver(Driver):
- def __init__(self, hostname, opus20_port=None, opus20_timeout=0.1):
- self.hostname = hostname
- self.opus20_port = opus20_port
- self.opus20_timeout = opus20_timeout
-
- self.connect_opus20()
-
- super(Opus20Driver, self).__init__()
-
- def connect_opus20(self):
- kwargs = {}
- if self.opus20_port: kwargs['port'] = self.opus20_port
- if self.opus20_timeout: kwargs['timeout'] = self.opus20_timeout
- self.o20 = Opus20(self.hostname, **kwargs)
-
- def read(self, reason):
- if reason in ('Temperature', 'RelativeHumidity', 'AbsoluteHumidity', 'Dewpoint', 'BatteryVoltage'):
- mapping = {
- 'Temperature': 0x0064,
- 'RelativeHumidity': 0x00c8,
- 'AbsoluteHumidity': 0x00cd,
- 'Dewpoint': 0x006e,
- 'BatteryVoltage': 0x2724,
- }
- try:
- value = self.o20.channel_value(mapping[reason])
- self.setParamStatus(reason, Alarm.NO_ALARM, Severity.NO_ALARM)
- self.setParam(reason, value)
- except:
- self.setParamStatus(reason, Alarm.COMM_ALARM, Severity.MINOR_ALARM)
- value = self.getParam(reason)
- else:
- value = self.getParam(reason)
-
- return value
-
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--esys', required=True, help='The environmental sub system')
args = parser.parse_args()
- server = SimpleServer()
+ server = pcaspy.SimpleServer()
prefix = prefix.format(sys=args.sys, sub=args.sub, esys=args.esys)