]> jspc29.x-matter.uni-frankfurt.de Git - mvd_epics.git/commitdiff
MQTT Archiver added
authorPhilipp Klaus <klaus@physik.uni-frankfurt.de>
Fri, 2 Jun 2017 10:52:43 +0000 (12:52 +0200)
committerPhilipp Klaus <klaus@physik.uni-frankfurt.de>
Fri, 2 Jun 2017 10:52:43 +0000 (12:52 +0200)
mqtt_tools/.gitignore [new file with mode: 0644]
mqtt_tools/archiver.py [new file with mode: 0755]
mqtt_tools/example_config.ini [new file with mode: 0644]
mqtt_tools/requirements.txt [new file with mode: 0644]

diff --git a/mqtt_tools/.gitignore b/mqtt_tools/.gitignore
new file mode 100644 (file)
index 0000000..2fa7ce7
--- /dev/null
@@ -0,0 +1 @@
+config.ini
diff --git a/mqtt_tools/archiver.py b/mqtt_tools/archiver.py
new file mode 100755 (executable)
index 0000000..bf04aee
--- /dev/null
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+
+"""
+A program to archive the data from a MQTT broker.
+"""
+
+import paho.mqtt.client as mqtt
+from sqlalchemy import *
+
+import argparse, time, configparser, random, re, logging
+from queue import Queue
+from datetime import datetime as dt
+
+try:
+    clock = time.perf_counter
+except:
+    clock = time.time
+
+logger = None
+
+CREATE_DB = """
+CREATE TABLE messages (
+    id int NOT NULL CONSTRAINT messages_pk PRIMARY KEY,
+    ts datetime NOT NULL,
+    topic text NOT NULL,
+    payload blob NOT NULL
+);
+"""
+
+messages = Table('messages', MetaData(),
+    Column('id', Integer, primary_key=True),
+    Column('ts', DateTime),
+    Column('topic', String(length=150)),
+    Column('payload', LargeBinary),
+)
+
+def main():
+    global logger
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-c', '--config', required=True, help='configuration file')
+    args = parser.parse_args()
+
+    config = configparser.ConfigParser()
+    config.read(args.config)
+
+    # config file: Global section
+    debug = config['Global'].getboolean('Debug', False)
+    system = config['Global'].get('System', '')
+    db_engine = config['Logger'].get('Engine', 'sqlite:///mqtt_log.sqlite')
+    # config file: MQTT section
+    mqtt_section = config['MQTT']
+    host = mqtt_section['Host']
+    port = mqtt_section.getint('Port', 1883)
+    keepalive = mqtt_section.getint('Keepalive', 30)
+
+    logger = logging.getLogger(__name__ + " - " + system)
+    logging.basicConfig()
+    if debug:
+        logger.setLevel(logging.DEBUG)
+
+    client = mqtt.Client()
+
+    msg_queue = Queue()
+
+    def on_connect(client, userdata, flags, rc):
+        # Connected with result code rc
+        if rc != 0:
+            logger.critical("Not connected / result code: %s", rc)
+        else:
+            logger.debug('successfully connected to MQTT server')
+        client.subscribe("#")
+
+    def on_message(client, userdata, message):
+        ts = dt.now()
+        msg_queue.put((ts, message))
+
+    client.on_message = on_message
+    client.on_connect = on_connect
+    client.loop_start()
+
+    path = '/logging/daemon'
+
+    lwt = 'nc'
+    logger.debug('setting last will to "%s" at "%s"', lwt, path)
+    client.will_set(path, payload=lwt, qos=2, retain=False)
+    logger.debug('connecting to "%s" (port %s)', host, port)
+    client.connect(host, port=port, keepalive=keepalive)
+    client.loop_start()
+
+    db = create_engine(db_engine)
+    if debug:
+        db.echo = True
+
+    try:
+        messages.create(db)
+    except:
+        pass
+
+    def log(ts, msg):
+        i = messages.insert()
+        db.execute(i, ts=ts, topic=msg.topic, payload=msg.payload)
+
+    try:
+        while True:
+            item = msg_queue.get()
+            if item is None:
+                continue
+            ts, msg = item
+            log(ts, msg)
+            if debug:
+                logger.debug("%s - %s %s %s'", ts, msg.topic, msg.payload, msg.qos)
+            msg_queue.task_done()
+    except KeyboardInterrupt:
+        pass
+    # if LWT should NOT be activated:
+    #client.disconnect()
+    client.loop_stop()
+
+if __name__ == "__main__":
+    main()
diff --git a/mqtt_tools/example_config.ini b/mqtt_tools/example_config.ini
new file mode 100644 (file)
index 0000000..5e5de3d
--- /dev/null
@@ -0,0 +1,11 @@
+[Global]
+PIDPATH = /tmp
+System = Your System Name here
+#Debug = 1
+
+[Logger]
+Engine = sqlite:///mqtt_log.sqlite
+
+[MQTT]
+Host = yourhost.somedomain.com
+
diff --git a/mqtt_tools/requirements.txt b/mqtt_tools/requirements.txt
new file mode 100644 (file)
index 0000000..b29873b
--- /dev/null
@@ -0,0 +1,2 @@
+paho-mqtt
+SQLAlchemy