+++ /dev/null
-#!/usr/bin/env python
-
-"""
-A program to archive the data from a MQTT broker.
-"""
-
-import paho.mqtt.client as mqtt
-from sqlalchemy import *
-
-import argparse, time, configparser, random, re, logging
-from queue import Queue
-from datetime import datetime as dt
-
-try:
- clock = time.perf_counter
-except:
- clock = time.time
-
-logger = None
-
-CREATE_DB = """
-CREATE TABLE messages (
- id int NOT NULL CONSTRAINT messages_pk PRIMARY KEY,
- ts datetime NOT NULL,
- topic text NOT NULL,
- payload blob NOT NULL
-);
-"""
-
-messages = Table('messages', MetaData(),
- Column('id', Integer, primary_key=True),
- Column('ts', DateTime),
- Column('topic', String(length=150)),
- Column('payload', LargeBinary),
-)
-
-def main():
- global logger
-
- parser = argparse.ArgumentParser()
- parser.add_argument('-c', '--config', required=True, help='configuration file')
- args = parser.parse_args()
-
- config = configparser.ConfigParser()
- config.read(args.config)
-
- # config file: Global section
- debug = config['Global'].getboolean('Debug', False)
- system = config['Global'].get('System', '')
- db_engine = config['Logger'].get('Engine', 'sqlite:///mqtt_log.sqlite')
- # config file: MQTT section
- mqtt_section = config['MQTT']
- host = mqtt_section['Host']
- port = mqtt_section.getint('Port', 1883)
- keepalive = mqtt_section.getint('Keepalive', 30)
-
- logger = logging.getLogger(__name__ + " - " + system)
- logging.basicConfig()
- if debug:
- logger.setLevel(logging.DEBUG)
-
- client = mqtt.Client()
-
- msg_queue = Queue()
-
- def on_connect(client, userdata, flags, rc):
- # Connected with result code rc
- if rc != 0:
- logger.critical("Not connected / result code: %s", rc)
- else:
- logger.debug('successfully connected to MQTT server')
- client.subscribe("#")
-
- def on_message(client, userdata, message):
- ts = dt.now()
- msg_queue.put((ts, message))
-
- client.on_message = on_message
- client.on_connect = on_connect
- client.loop_start()
-
- path = '/logging/daemon'
-
- lwt = 'nc'
- logger.debug('setting last will to "%s" at "%s"', lwt, path)
- client.will_set(path, payload=lwt, qos=2, retain=False)
- logger.debug('connecting to "%s" (port %s)', host, port)
- client.connect(host, port=port, keepalive=keepalive)
- client.loop_start()
-
- db = create_engine(db_engine)
- if debug:
- db.echo = True
-
- try:
- messages.create(db)
- except:
- pass
-
- def log(ts, msg):
- i = messages.insert()
- db.execute(i, ts=ts, topic=msg.topic, payload=msg.payload)
-
- try:
- while True:
- item = msg_queue.get()
- if item is None:
- continue
- ts, msg = item
- log(ts, msg)
- if debug:
- logger.debug("%s - %s %s %s'", ts, msg.topic, msg.payload, msg.qos)
- msg_queue.task_done()
- except KeyboardInterrupt:
- pass
- # if LWT should NOT be activated:
- #client.disconnect()
- client.loop_stop()
-
-if __name__ == "__main__":
- main()