]> jspc29.x-matter.uni-frankfurt.de Git - mvd_epics.git/commitdiff
CaRecPlay: first working version added
authorPhilipp Klaus <philipp.klaus@gmail.com>
Sat, 23 Sep 2017 04:00:44 +0000 (12:00 +0800)
committerPhilipp Klaus <philipp.klaus@gmail.com>
Sat, 23 Sep 2017 04:00:44 +0000 (12:00 +0800)
python_suite/carecplay/README.md [new file with mode: 0644]
python_suite/carecplay/caplay.py [new file with mode: 0755]
python_suite/carecplay/carec.py [new file with mode: 0755]
python_suite/carecplay/requirements.txt [new file with mode: 0644]

diff --git a/python_suite/carecplay/README.md b/python_suite/carecplay/README.md
new file mode 100644 (file)
index 0000000..5c1f390
--- /dev/null
@@ -0,0 +1,31 @@
+## CaRecPlay
+
+### Prerequisits
+
+    pip install --upgrade -r requirements.txt
+
+### Recording
+
+Start recording using:
+
+    ./carec.py your_list_of_pv_names.txt
+
+### Playback
+
+    ./caplay.py --wrap your_recorded_data_file.pickle
+
+### Tips'n'tricks
+
+You can fix errors like
+
+> Traceback (most recent call last):
+>   File "\_ctypes/callbacks.c", line 234, in 'calling callback function'
+>   File "/local/pyvenv/playground-3.6\_local/lib/python3.6/site-packages/epics/ca.py", line 542, in \_onMonitorEvent
+>     kwds[attr] = BYTES2STR(getattr(tmpv, attr, None))
+>   File "/local/pyvenv/playground-3.6\_local/lib/python3.6/site-packages/epics/utils3.py", line 25, in b2s
+>     return str(st1, EPICS\_STR_ENCODING)
+> UnicodeDecodeError: 'ascii' codec can't decode byte 0xb0 in position 0: ordinal not in range(128)
+
+by setting the appropriate encoding:
+
+    export PYTHONIOENCODING=latin1
diff --git a/python_suite/carecplay/caplay.py b/python_suite/carecplay/caplay.py
new file mode 100755 (executable)
index 0000000..bf6fdb3
--- /dev/null
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+
+import os, sys, logging, pickle, time
+from threading import Thread, Event, Lock
+from pcaspy import Driver, SimpleServer
+
+VALUE_UPDATES = None
+CONNECTION_CHANGES = None
+LATEST_VALUE_UPDATE_BY_PV = None
+PV_NAMES = None
+
+class myDriver(Driver):
+
+    def __init__(self, pvdb, **kwargs):
+        self.pvdb = pvdb
+        self.offset = kwargs.get('offset', 0.0)
+        self.wrap = kwargs.get('wrap', False)
+        self.logger = logging.getLogger('PCASpy_Driver')
+        super(myDriver, self).__init__()
+        self.eid = Event()
+        self.tid = Thread(target = self.runIOC) 
+        self.tid.setDaemon(True)
+        self.tid.start()
+        self.updatePVs()
+
+
+    def read(self, reason):
+        self.logger.warning(reason)
+        return super(myDriver, self).read(reason)
+
+    def runIOC(self):
+
+        idx = 0
+        while True:
+
+            vu = VALUE_UPDATES[idx]
+            ts_next_update = vu['ts'] + self.offset
+            ts_now = time.time()
+            if ts_next_update > (ts_now + 0.05):
+                self.logger.debug("Sleeping for {:.3f}".format(ts_next_update - ts_now))
+                self.eid.wait(ts_next_update - ts_now)
+            self.setParam(vu['pvname'], vu['value'])
+            # may want to set setParamInfo(reason, info) here as well (if limits etc changed):
+            # https://pcaspy.readthedocs.io/en/latest/api.html#pcaspy.Driver.setParamInfo
+            self.logger.info("Setting PV {pvname} to {value}".format(**vu))
+            self.updatePVs()
+            idx += 1
+            if idx == len(VALUE_UPDATES):
+                if self.wrap:
+                    idx = 0
+                    self.offset = time.time() - VALUE_UPDATES[0]['ts']
+                else:
+                    return
+
+def main():
+    global VALUE_UPDATES, CONNECTION_CHANGES, LATEST_VALUE_UPDATE_BY_PV, PV_NAMES
+
+    import argparse
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--prefix', default='')
+    parser.add_argument('--debug', action='store_true')
+    parser.add_argument('--wrap', action='store_true')
+    parser.add_argument('data_file')
+    args = parser.parse_args()
+
+    level = logging.DEBUG if args.debug else logging.INFO
+    fmt = "%(asctime)s - %(funcName)s - %(levelname)s - %(message)s"
+    logging.basicConfig(level=level, format=fmt)
+
+    with open(args.data_file, 'rb') as f:
+        data = pickle.load(f)
+        VALUE_UPDATES = data['value_updates']
+        CONNECTION_CHANGES = data['connection_changes']
+        LATEST_VALUE_UPDATE_BY_PV = data['latest_value_update_by_pv']
+        PV_NAMES = data['pv_names']
+
+    if not VALUE_UPDATES: parser.error("No value update contained in the recorded data set. Exiting.")
+
+    VALUE_UPDATES.sort(key=lambda x: x['ts'])
+    CONNECTION_CHANGES.sort(key=lambda x: x['ts'])
+
+    offset = time.time() - VALUE_UPDATES[0]['ts']
+
+    pvdb = {}
+
+    for pv_name, vu in LATEST_VALUE_UPDATE_BY_PV.items():
+        type_map = {'ctrl_double': 'float', 'time_string': 'string', 'ctrl_enum': 'enum'}
+        pv_type = type_map[vu['type']]
+        pv_entry = {'type': pv_type, 'value': vu['value'], 'unit': vu['units'], 'prec': vu['precision']}
+        enum_strs = vu['enum_strs']
+        if pv_type == 'enum' and enum_strs is not None:
+            enum_strs = list(enum_strs)
+            for i in range(len(enum_strs)):
+                #enum_strs[i] = enum_strs[i].decode('ascii')
+                try: enum_strs[i] = enum_strs[i].decode('ascii')
+                except: pass
+            pv_entry['enums'] = enum_strs
+        pvdb[pv_name] = pv_entry
+
+    server = SimpleServer()
+    server.createPV(args.prefix, pvdb)
+
+    driver = myDriver(pvdb, offset=offset, wrap=args.wrap)
+
+    # process CA transactions
+    while True:
+        server.process(0.1)
+
+if __name__ == '__main__': main()
diff --git a/python_suite/carecplay/carec.py b/python_suite/carecplay/carec.py
new file mode 100755 (executable)
index 0000000..b7b6791
--- /dev/null
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+
+import epics, logging, time, pickle, copy
+
+logger = logging.getLogger(__name__)
+
+PV_NAMES = []
+PVS = {}
+VALUE_UPDATES = []
+LATEST_VALUE_UPDATE_BY_PV = {}
+CONNECTION_CHANGES = []
+
+def serializable_copy_value_update(pv_dict):
+    pv_dict = copy.copy(pv_dict)
+    del pv_dict['cb_info']
+    return pv_dict
+
+def cb_value_update(**kwargs):
+    global VALUE_UPDATES, LATEST_VALUE_UPDATE_BY_PV
+    kwargs = serializable_copy_value_update(kwargs)
+    kwargs['ts'] = time.time()
+    VALUE_UPDATES.append(kwargs)
+    LATEST_VALUE_UPDATE_BY_PV[kwargs['pvname']] = kwargs
+    logger.info("PV: {pvname:75s} new value: {char_value}".format(**kwargs))
+
+def serializable_copy_connection_change(cc_dict):
+    cc_dict = copy.copy(cc_dict)
+    del cc_dict['pv']
+    return cc_dict
+
+def cb_connection_change(**kwargs):
+    global CONNECTION_CHANGES
+    kwargs = serializable_copy_connection_change(kwargs)
+    kwargs['ts'] = time.time()
+    CONNECTION_CHANGES.append(kwargs)
+    logger.info("PV: {pvname:75s} conn: {conn}".format(**kwargs))
+
+def register(pv_names):
+    global PVS, PV_NAMES
+    for pv_name in pv_names:
+        PVS[pv_name] = epics.PV(pv_name, auto_monitor=True, form='ctrl', callback=cb_value_update, connection_callback=cb_connection_change)
+        PV_NAMES.append(pv_name)
+
+def main():
+    import argparse
+    parser = argparse.ArgumentParser()
+    parser.add_argument('pv_list', type=argparse.FileType('r', encoding='UTF-8'))
+    parser.add_argument('--debug', action='store_true')
+    args = parser.parse_args()
+
+    level = logging.DEBUG if args.debug else logging.WARNING
+    logging.basicConfig(level=level)
+
+    pv_list = args.pv_list.read().split('\n')
+    pv_list = [pv_name.strip() for pv_name in pv_list] # strip off whitespace
+    pv_list = [pv_name for pv_name in pv_list if pv_name] # clean out empty entries
+
+    start = time.time()
+    register(pv_list)
+
+    try:
+        print(f"Started the recording. It can be stopped with Ctrl-C.")
+        while True:
+            time.sleep(1.)
+    except KeyboardInterrupt:
+        print("Ctrl-C pressed. Stopping capture...")
+        print("Proceeding to serialize the data...")
+    finally:
+        for pv_name in pv_list:
+            PVS[pv_name].clear_auto_monitor()
+            #PVS[pv_name].disconnect()
+            del PVS[pv_name]
+        end = time.time()
+        print(f"Stopped capture after {end-start:.2f} s.")
+        print(f"Registered {len(VALUE_UPDATES)} value updates.")
+        print(f"Registered {len(CONNECTION_CHANGES)} connection changes.")
+
+        print(f"Do you want to serialize (store) the registered data to the output file now?")
+        answer = input("Y/n > ")
+        if answer.strip().lower().endswith('n'):
+            logger.warn("Closing without storing the data.")
+
+        with open('data.pickle', 'wb') as f:
+            data_to_serialize = {
+              'value_updates': VALUE_UPDATES,
+              'connection_changes': CONNECTION_CHANGES,
+              'latest_value_update_by_pv': LATEST_VALUE_UPDATE_BY_PV,
+              'pv_names': PV_NAMES,
+            }
+            pickle.dump(data_to_serialize, f, pickle.HIGHEST_PROTOCOL)
+
+if __name__ == "__main__": main()
diff --git a/python_suite/carecplay/requirements.txt b/python_suite/carecplay/requirements.txt
new file mode 100644 (file)
index 0000000..6925277
--- /dev/null
@@ -0,0 +1,4 @@
+# CaRec
+pyepics
+# CaPlay
+pycaspy