from pcaspy import Driver, Alarm, Severity
from pcaspy.driver import manager
-from pcaspy import PVInfo
-from pcaspy import cas
-import pcaspy
from opus20 import Opus20, OPUS20_CHANNEL_SPEC, Opus20ConnectionException
import time, threading
-class ExtendedPVInfo(PVInfo):
- """
- Derived class from PVInfo
- Aim: monkey patching PVInfo to support MDEL/ADEL
- """
- def __init__(self, info):
- # initialize from info dict with defaults
- self.mdel = info.get('mdel', None)
- self.adel = info.get('adel', None)
- PVInfo.__init__(self, info)
-
-# apply PVInfo monkey patch:
-pcaspy.PVInfo = ExtendedPVInfo
-pcaspy.driver.PVInfo = ExtendedPVInfo
-
class Opus20Driver(Driver):
def __init__(self, hostname, opus20_port=None, opus20_timeout=0.1, scan_period=5.0):
self.hostname = hostname
super(Opus20Driver, self).__init__()
- def setParamValue(self, reason, value):
- """
- overriding Driver.setParamValue()
- <https://github.com/paulscherrerinstitute/pcaspy/blob/master/pcaspy/driver.py#L155>
- Made to support the ADEL/MDEL fields
- """
- # check whether update is needed
- same = self.pvDB[reason].value == value
- if (type(same) == bool and not same) or (hasattr(same, 'all') and not same.all()):
- # make a copy of mutable objects, list, numpy.ndarray
- if isinstance(value, list):
- value = value[:]
- elif 'numpy.ndarray' in str(type(value)):
- value = value.copy()
- mask = 0
- pv = manager.pvs[self.port][reason]
- abs_delta = abs(value-self.pvDB[reason].value)
- if not pv.info.mdel or (abs_delta > pv.info.mdel):
- mask |= cas.DBE_VALUE
- if not pv.info.adel or (abs_delta > pv.info.adel):
- mask |= cas.DBE_LOG
- self.pvDB[reason].value = value
- self.pvDB[reason].flag = True
- self.pvDB[reason].mask = mask
- self.pvDB[reason].time = cas.epicsTimeStamp()
-
def connect_opus20(self):
kwargs = {}
if self.opus20_port: kwargs['port'] = self.opus20_port