Skip to content

Commit

Permalink
Merge pull request #1 from bigclownlabs/base
Browse files Browse the repository at this point in the history
Base
  • Loading branch information
blavka authored Jan 21, 2018
2 parents f58043d + 98561bf commit c580a15
Show file tree
Hide file tree
Showing 15 changed files with 517 additions and 2 deletions.
7 changes: 5 additions & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true

[Makefile]
indent_style = tab
[*.md]
indent_size = 2

[*.yml]
indent_size = 2
7 changes: 7 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

export VIRTUAL_ENV="$(pwd)/.venv"
export PATH="$VIRTUAL_ENV/bin:$PATH"
unset PYTHONHOME

hash -r 2>&1 >/dev/null
14 changes: 14 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
sudo: false
language: python
python: 3.4
script:
- "./script/test"
- "./script/build"
deploy:
provider: pypi
skip_cleanup: true
user: bigclownlabs
password:
secure: fnGbFMzx757fpJ0jo6ZQ7Szj8OdG7MOEnfWrSkvj9cG9UZjvExog/1v31uuTGUwXfMwtQkDI4gdWXTfq89KxM6KcLyAXTDQj4uRXvxXSLzYN+iUE0F+q2uGSmHsJAXRrfdE5b1GXK3UBKjScJLGgopBr9ReufeMVjPJgCy+qWSs9+Ebl+9lzXVlcldNnt76LSJB6XdmkkNjQR0yzqnuDE8DcGsLw0zU322ZlEb6GZevSagPUb9kPnVNDWzKbdzVsMm35dcfc0C3lBk4dPJSMn/EFO0v7z84D0PB5b25Vk0Ozi4k5q1pYene3GKMKYcoKhe+PdMHMsW/IxcLtzBxGqTmFZnXAVDkGU6R1cFLyEycVzyi8uulUVMHG9FVPNW5/AbHp+OSDHOJFP4RdOCl8+6IjXX0nWasGXZakF3+rNlaXDQXg8K08aeJ8qPpYZDWWCUr1K5JaBsj1T+5nOB+e2znaeo9AmsQ7ePolkSr5VZsvI/oVDHHJ0yTUdNghvMfeWiNaGXFW79HMdUwzVTtLeTpRqotu+E8CMTZxPd8NuiSz0m6x8FeL3bmG36SWTsF5UNRrTADvd2wtrqUQ1KSFYqfc9b6GWbnKrJAW4QQ0O1Pwvx7GqJ/ZM9QvRYDrS4AEE2atMl+K19AyTdAz4qVerQkIy/FwWR9UdSCLPzvJQjs=
on:
tags: true
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<a href="https://www.bigclown.com/"><img src="https://bigclown.sirv.com/logo.png" width="200" height="59" alt="BigClown Logo" align="right"></a>

# MQTT to InfluxDB

[![Travis](https://img.shields.io/travis/bigclownlabs/bch-mqtt2influxdb/master.svg)](https://travis-ci.org/bigclownlabs/bch-mqtt2influxdb)
[![Release](https://img.shields.io/github/release/bigclownlabs/bch-mqtt2influxdb.svg)](https://github.com/bigclownlabs/bch-mqtt2influxdb/releases)
[![License](https://img.shields.io/github/license/bigclownlabs/bch-mqtt2influxdb.svg)](https://github.com/bigclownlabs/bch-mqtt2influxdb/blob/master/LICENSE)
[![Twitter](https://img.shields.io/twitter/follow/BigClownLabs.svg?style=social&label=Follow)](https://twitter.com/BigClownLabs)


## Example

```
mqtt2influxdb -c /etc/bigclown/mqtt2influxdb.yaml --debug
```



## License

This project is licensed under the [MIT License](https://opensource.org/licenses/MIT/) - see the [LICENSE](LICENSE) file for details.

---

Made with &#x2764;&nbsp; by [**HARDWARIO s.r.o.**](https://www.hardwario.com/) in the heart of Europe.
1 change: 1 addition & 0 deletions mqtt2influxdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/usr/bin/env python3
43 changes: 43 additions & 0 deletions mqtt2influxdb/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python3

import os
import sys
import argparse
import logging
import yaml
from .config import load_config
from .mqtt2influxdb import Mqtt2InfluxDB

__version__ = '@@VERSION@@'
LOG_FORMAT = '%(asctime)s %(levelname)s: %(message)s'


def main():
argp = argparse.ArgumentParser(description='MQTT to InfluxDB')
argp.add_argument('-c', '--config', help='path to configuration file (YAML format)', required=True)
argp.add_argument('-D', '--debug', help='print debug messages', action='store_true')
argp.add_argument('-t', '--test', help='test parse config', action='store_true')
argp.add_argument('-v', '--version', action='version', version='%(prog)s ' + __version__)
args = argp.parse_args()

logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO, format=LOG_FORMAT)

try:
config = load_config(args.config)

if args.test:
return

Mqtt2InfluxDB(config)

except KeyboardInterrupt as e:
return
except Exception as e:
logging.error(e)
if os.getenv('DEBUG', False):
raise e
sys.exit(1)


if __name__ == '__main__':
main()
65 changes: 65 additions & 0 deletions mqtt2influxdb/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python3

import os
import sys
import logging
import yaml
from schema import Schema, And, Or, Use, Optional, SchemaError
import jsonpath_ng


def json_path(txt):
try:
return jsonpath_ng.parse(txt)
except Exception as e:
raise SchemaError('Bad JsonPath format: %s' % txt)


def str_or_jsonPath(txt):
if "$." in txt:
return json_path(txt)
return txt


def port_range(port):
return 0 <= port <= 65535


schema = Schema({
'mqtt': {
'host': And(str, len),
'port': And(int, port_range),
Optional('username'): And(str, len),
Optional('password'): And(str, len),
Optional('cafile'): os.path.exists,
Optional('certfile'): os.path.exists,
Optional('keyfile'): os.path.exists,
},
'influxdb': {
'host': And(str, len),
'port': And(int, port_range),
Optional('username'): And(str, len),
Optional('password'): And(str, len),
'database': And(str, len),
Optional('ssl'): bool
},
'points': [{
'measurement': And(str, len, Use(str_or_jsonPath)),
'topic': And(str, len),
Optional('fields'): {str: And(str, len, Use(str_or_jsonPath))},
Optional('tags'): {str: And(str, len, Use(str_or_jsonPath))},
Optional('database'): And(str, len)
}]
})


def load_config(config_filename):
with open(config_filename, 'r') as f:
config = yaml.load(f)
try:
return schema.validate(config)
except SchemaError as e:
# Better error format
error = str(e).splitlines()
del error[1]
raise Exception(' '.join(error))
145 changes: 145 additions & 0 deletions mqtt2influxdb/mqtt2influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3

import os
import sys
import logging
import json
from datetime import datetime
import paho.mqtt.client
from paho.mqtt.client import topic_matches_sub
import influxdb
import jsonpath_ng


class Mqtt2InfluxDB:

def __init__(self, config):

self._points = config['points']

self._influxdb = influxdb.InfluxDBClient(config['influxdb']['host'],
config['influxdb']['port'],
config['influxdb'].get('username', 'root'),
config['influxdb'].get('password', 'root'),
ssl=config['influxdb'].get('ssl', False))

self._influxdb.create_database(config['influxdb']['database'])
self._influxdb.switch_database(config['influxdb']['database'])

for point in self._points:
if 'database' in point:
self._influxdb.create_database(point['database'])

self._mqtt = paho.mqtt.client.Client()

if config['mqtt'].get('username', None):
self._mqtt.username_pw_set(config['mqtt']['username'],
config['mqtt'].get('password', None))

if config['mqtt'].get('cafile', None):
self._mqtt.tls_set(config['mqtt']['cafile'],
config['mqtt'].get('certfile', None),
config['mqtt'].get('keyfile', None))

self._mqtt.on_connect = self._on_mqtt_connect
self._mqtt.on_disconnect = self._on_mqtt_disconnect
self._mqtt.on_message = self._on_mqtt_message

logging.info('MQTT broker host: %s, port: %d, use tls: %s',
config['mqtt']['host'],
config['mqtt']['port'],
bool(config['mqtt'].get('cafile', None)))

self._mqtt.connect_async(config['mqtt']['host'], config['mqtt']['port'], keepalive=10)
self._mqtt.loop_forever()

def _on_mqtt_connect(self, client, userdata, flags, rc):
logging.info('Connected to MQTT broker with code %s', rc)

lut = {paho.mqtt.client.CONNACK_REFUSED_PROTOCOL_VERSION: 'incorrect protocol version',
paho.mqtt.client.CONNACK_REFUSED_IDENTIFIER_REJECTED: 'invalid client identifier',
paho.mqtt.client.CONNACK_REFUSED_SERVER_UNAVAILABLE: 'server unavailable',
paho.mqtt.client.CONNACK_REFUSED_BAD_USERNAME_PASSWORD: 'bad username or password',
paho.mqtt.client.CONNACK_REFUSED_NOT_AUTHORIZED: 'not authorised'}

if rc != paho.mqtt.client.CONNACK_ACCEPTED:
logging.error('Connection refused from reason: %s', lut.get(rc, 'unknown code'))

if rc == paho.mqtt.client.CONNACK_ACCEPTED:
for point in self._points:
logging.info('subscribe %s', point['topic'])
client.subscribe(point['topic'])

def _on_mqtt_disconnect(self, client, userdata, rc):
logging.info('Disconnect from MQTT broker with code %s', rc)

def _on_mqtt_message(self, client, userdata, message):
logging.debug('mqtt_on_message %s %s', message.topic, message.payload)

msg = None

for point in self._points:
if topic_matches_sub(point['topic'], message.topic):
if not msg:
payload = message.payload.decode('utf-8')

if payload == '':
payload = 'null'
try:
payload = json.loads(payload)
except Exception as e:
logging.error('parse json: %s topic: %s payload: %s', e, message.topic, message.payload)
return
msg = {
"topic": message.topic.split('/'),
"payload": payload,
"timestamp": message.timestamp,
"qos": message.qos
}

measurement = self._get_value_from_str_or_JSONPath(point['measurement'], msg)
if measurement is None:
logging.warning('unknown measurement')
return

record = {'measurement': measurement,
'time': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'tags': {},
'fields': {}}

if 'fields' in point:
for key in point['fields']:
val = self._get_value_from_str_or_JSONPath(point['fields'][key], msg)
if val is None:
continue
record['fields'][key] = val

if not record['fields']:
logging.warning('empty fields')
return

if len(record['fields']) != len(point['fields']):
logging.warning('different number of fields')

if 'tags' in point:
for key in point['tags']:
val = self._get_value_from_str_or_JSONPath(point['tags'][key], msg)
if val is None:
continue
record['tags'][key] = val

if len(record['tags']) != len(point['tags']):
logging.warning('different number of tags')

logging.debug('influxdb write %s', record)

self._influxdb.write_points([record], database=point.get('database', None))

def _get_value_from_str_or_JSONPath(self, param, msg):
if isinstance(param, str):
return param

elif isinstance(param, jsonpath_ng.JSONPath):
tmp = param.find(msg)
if tmp:
return tmp[0].value
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r requirements.txt
pycodestyle
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
PyYAML>=3.11
paho-mqtt>=1.0
influxdb
schema>=0.6.7
jsonpath-ng>=1.4.3
56 changes: 56 additions & 0 deletions script/bootstrap
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/sh
# vim: set ts=4:
#
# Ensures that Python 3.4+ and nodejs is available, installs modules specified
# in requirements.txt and package.json.
#
# Environment variables:
# PYTHON : Python executable to use (default is python3 or python on PATH).
set -eu

cd "$(dirname "$0")/.."
. script/utils.sh

VENV_DIR="$(pwd)/.venv"


if [ "$(id -u)" -eq 0 ] && [ "$ALLOW_ROOT" != 'yes' ]; then
die 'Do not run this script as root!'
fi

if [ ! -f "$VENV_DIR/bin/python3" ]; then
einfo 'Initializing Python virtual environment...'

# Find Python executable.
for pybin in "${PYTHON:-}" python3 python NOT_FOUND; do
has "$pybin" && break
done
if [ "$pybin" = 'NOT_FOUND' ]; then
die 'Could not find python executable! Please install Python 3.4+.'
fi

if ! "$pybin" -c 'import sys; exit(not sys.version_info >= (3, 4, 0))'; then
die "Python 3.4+ is required, but you have $("$pybin" -V 2>&1)!"
fi

# This can happen probably only on Debian-based distros.
if ! "$pybin" -c 'import venv' 2>/dev/null; then
die 'Python module venv is not installed!',
'TIP: If you are using Debian-based distro, run "apt-get install python3-venv".'
fi

# This can happen probably only on Debian-based distros.
if ! "$pybin" -c 'import pip' 2>/dev/null; then
die 'Python module pip is not installed!',
'TIP: If you are using Debian-based distro, run "apt-get install python3-pip".'
fi

"$pybin" -m venv "$VENV_DIR"
fi

. ./.envrc

einfo 'Installing Python modules...'
python3 -m pip install -r requirements-dev.txt 2>&1 \
| sed -e '/^Requirement already satisfied/d' \
-e '/don.t match your environment$/d'
Loading

0 comments on commit c580a15

Please sign in to comment.