From 0a3aeb6d902d945e43a7c5c380554f986f4362bd Mon Sep 17 00:00:00 2001 From: Philipp Klaus Date: Fri, 2 Jun 2017 12:52:43 +0200 Subject: [PATCH] MQTT Archiver added --- mqtt_tools/.gitignore | 1 + mqtt_tools/archiver.py | 121 ++++++++++++++++++++++++++++++++++ mqtt_tools/example_config.ini | 11 ++++ mqtt_tools/requirements.txt | 2 + 4 files changed, 135 insertions(+) create mode 100644 mqtt_tools/.gitignore create mode 100755 mqtt_tools/archiver.py create mode 100644 mqtt_tools/example_config.ini create mode 100644 mqtt_tools/requirements.txt diff --git a/mqtt_tools/.gitignore b/mqtt_tools/.gitignore new file mode 100644 index 0000000..2fa7ce7 --- /dev/null +++ b/mqtt_tools/.gitignore @@ -0,0 +1 @@ +config.ini diff --git a/mqtt_tools/archiver.py b/mqtt_tools/archiver.py new file mode 100755 index 0000000..bf04aee --- /dev/null +++ b/mqtt_tools/archiver.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +""" +A program to archive the data from a MQTT broker. +""" + +import paho.mqtt.client as mqtt +from sqlalchemy import * + +import argparse, time, configparser, random, re, logging +from queue import Queue +from datetime import datetime as dt + +try: + clock = time.perf_counter +except: + clock = time.time + +logger = None + +CREATE_DB = """ +CREATE TABLE messages ( + id int NOT NULL CONSTRAINT messages_pk PRIMARY KEY, + ts datetime NOT NULL, + topic text NOT NULL, + payload blob NOT NULL +); +""" + +messages = Table('messages', MetaData(), + Column('id', Integer, primary_key=True), + Column('ts', DateTime), + Column('topic', String(length=150)), + Column('payload', LargeBinary), +) + +def main(): + global logger + + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--config', required=True, help='configuration file') + args = parser.parse_args() + + config = configparser.ConfigParser() + config.read(args.config) + + # config file: Global section + debug = config['Global'].getboolean('Debug', False) + system = config['Global'].get('System', '') + db_engine = config['Logger'].get('Engine', 'sqlite:///mqtt_log.sqlite') + # config file: MQTT section + mqtt_section = config['MQTT'] + host = mqtt_section['Host'] + port = mqtt_section.getint('Port', 1883) + keepalive = mqtt_section.getint('Keepalive', 30) + + logger = logging.getLogger(__name__ + " - " + system) + logging.basicConfig() + if debug: + logger.setLevel(logging.DEBUG) + + client = mqtt.Client() + + msg_queue = Queue() + + def on_connect(client, userdata, flags, rc): + # Connected with result code rc + if rc != 0: + logger.critical("Not connected / result code: %s", rc) + else: + logger.debug('successfully connected to MQTT server') + client.subscribe("#") + + def on_message(client, userdata, message): + ts = dt.now() + msg_queue.put((ts, message)) + + client.on_message = on_message + client.on_connect = on_connect + client.loop_start() + + path = '/logging/daemon' + + lwt = 'nc' + logger.debug('setting last will to "%s" at "%s"', lwt, path) + client.will_set(path, payload=lwt, qos=2, retain=False) + logger.debug('connecting to "%s" (port %s)', host, port) + client.connect(host, port=port, keepalive=keepalive) + client.loop_start() + + db = create_engine(db_engine) + if debug: + db.echo = True + + try: + messages.create(db) + except: + pass + + def log(ts, msg): + i = messages.insert() + db.execute(i, ts=ts, topic=msg.topic, payload=msg.payload) + + try: + while True: + item = msg_queue.get() + if item is None: + continue + ts, msg = item + log(ts, msg) + if debug: + logger.debug("%s - %s %s %s'", ts, msg.topic, msg.payload, msg.qos) + msg_queue.task_done() + except KeyboardInterrupt: + pass + # if LWT should NOT be activated: + #client.disconnect() + client.loop_stop() + +if __name__ == "__main__": + main() diff --git a/mqtt_tools/example_config.ini b/mqtt_tools/example_config.ini new file mode 100644 index 0000000..5e5de3d --- /dev/null +++ b/mqtt_tools/example_config.ini @@ -0,0 +1,11 @@ +[Global] +PIDPATH = /tmp +System = Your System Name here +#Debug = 1 + +[Logger] +Engine = sqlite:///mqtt_log.sqlite + +[MQTT] +Host = yourhost.somedomain.com + diff --git a/mqtt_tools/requirements.txt b/mqtt_tools/requirements.txt new file mode 100644 index 0000000..b29873b --- /dev/null +++ b/mqtt_tools/requirements.txt @@ -0,0 +1,2 @@ +paho-mqtt +SQLAlchemy -- 2.43.0