--- /dev/null
+#!/usr/bin/env python
+
+"""
+A program to archive the data from a MQTT broker.
+"""
+
+import paho.mqtt.client as mqtt
+from sqlalchemy import *
+
+import argparse, time, configparser, random, re, logging
+from queue import Queue
+from datetime import datetime as dt
+
+try:
+ clock = time.perf_counter
+except:
+ clock = time.time
+
+logger = None
+
+CREATE_DB = """
+CREATE TABLE messages (
+ id int NOT NULL CONSTRAINT messages_pk PRIMARY KEY,
+ ts datetime NOT NULL,
+ topic text NOT NULL,
+ payload blob NOT NULL
+);
+"""
+
+messages = Table('messages', MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('ts', DateTime),
+ Column('topic', String(length=150)),
+ Column('payload', LargeBinary),
+)
+
+def main():
+ global logger
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-c', '--config', required=True, help='configuration file')
+ args = parser.parse_args()
+
+ config = configparser.ConfigParser()
+ config.read(args.config)
+
+ # config file: Global section
+ debug = config['Global'].getboolean('Debug', False)
+ system = config['Global'].get('System', '')
+ db_engine = config['Logger'].get('Engine', 'sqlite:///mqtt_log.sqlite')
+ # config file: MQTT section
+ mqtt_section = config['MQTT']
+ host = mqtt_section['Host']
+ port = mqtt_section.getint('Port', 1883)
+ keepalive = mqtt_section.getint('Keepalive', 30)
+
+ logger = logging.getLogger(__name__ + " - " + system)
+ logging.basicConfig()
+ if debug:
+ logger.setLevel(logging.DEBUG)
+
+ client = mqtt.Client()
+
+ msg_queue = Queue()
+
+ def on_connect(client, userdata, flags, rc):
+ # Connected with result code rc
+ if rc != 0:
+ logger.critical("Not connected / result code: %s", rc)
+ else:
+ logger.debug('successfully connected to MQTT server')
+ client.subscribe("#")
+
+ def on_message(client, userdata, message):
+ ts = dt.now()
+ msg_queue.put((ts, message))
+
+ client.on_message = on_message
+ client.on_connect = on_connect
+ client.loop_start()
+
+ path = '/logging/daemon'
+
+ lwt = 'nc'
+ logger.debug('setting last will to "%s" at "%s"', lwt, path)
+ client.will_set(path, payload=lwt, qos=2, retain=False)
+ logger.debug('connecting to "%s" (port %s)', host, port)
+ client.connect(host, port=port, keepalive=keepalive)
+ client.loop_start()
+
+ db = create_engine(db_engine)
+ if debug:
+ db.echo = True
+
+ try:
+ messages.create(db)
+ except:
+ pass
+
+ def log(ts, msg):
+ i = messages.insert()
+ db.execute(i, ts=ts, topic=msg.topic, payload=msg.payload)
+
+ try:
+ while True:
+ item = msg_queue.get()
+ if item is None:
+ continue
+ ts, msg = item
+ log(ts, msg)
+ if debug:
+ logger.debug("%s - %s %s %s'", ts, msg.topic, msg.payload, msg.qos)
+ msg_queue.task_done()
+ except KeyboardInterrupt:
+ pass
+ # if LWT should NOT be activated:
+ #client.disconnect()
+ client.loop_stop()
+
+if __name__ == "__main__":
+ main()