Skip to content

Commit

Permalink
feat: Added Mqtt socket
Browse files Browse the repository at this point in the history
You can now listen for parsed messages on a single socket.
  • Loading branch information
svrooij committed Aug 30, 2021
1 parent 7c2d6a2 commit 3fa7cb2
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 41 deletions.
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export interface MqttConfig {
distinctFields: Array<string>;
prefix: string;
url: string;
last_reset?: string;
last_reset_solar?: string;
}

const defaultMqttConfig: Partial<MqttConfig> = {
Expand All @@ -43,6 +45,7 @@ export interface OutputConfig {
influx?: InfluxOutputOptions;
jsonSocket?: number;
mqtt?: MqttConfig;
mqttSocket?: number;
post?: HttpPostConfig;
rawSocket?: number;
webserver?: number;
Expand Down Expand Up @@ -121,11 +124,16 @@ export class ConfigLoader {
.describe('mqtt-discovery', 'Emit auto-discovery message')
.boolean('mqtt-discovery')
.describe('mqtt-discovery-prefix', 'Autodiscovery prefix')
.describe('mqtt-last-reset', 'If set, this value is added to mqtt as \'last_reset\'')
.string('mqtt-last-reset')
.describe('mqtt-last-reset-solar', 'If set, this value is added to mqtt as \'last_reset\'')
.string('mqtt-last-reset-solar')
.describe('influx-url', 'Influxdb server url')
.describe('influx-token', 'Influxdb server token')
.describe('influx-bucket', 'Influx bucket')
.describe('influx-org', 'Influx organization')
.describe('tcp-server', 'Expose JSON TCP socket on this port')
.describe('tcp-server-mqtt', 'Expose JSON TCP socket on this port')
.describe('raw-tcp-server', 'Expose RAW TCP socket on this port')
.conflicts('port', 'socket')
.describe('debug', 'Enable debug output')
Expand All @@ -137,6 +145,7 @@ export class ConfigLoader {
.number('sunspec-modbus-interval')
.number('web-server')
.number('tcp-server')
.number('tcp-server-mqtt')
.number('raw-tcp-server')
.number('post-interval')
.describe('enc-aad', 'Additional authentication data, if your meter encrypts data (eg. Luxemburg)')
Expand Down Expand Up @@ -170,6 +179,10 @@ export class ConfigLoader {
config.outputs.jsonSocket = args['tcp-server'];
}

if (args['tcp-server-mqtt']) {
config.outputs.mqttSocket = args['tcp-server-mqtt'];
}

if (typeof args['mqtt-url'] === 'string') {
config.outputs.mqtt = {
discovery: args['mqtt-discovery'] === true,
Expand All @@ -178,6 +191,8 @@ export class ConfigLoader {
distinctFields: args['mqtt-distinct-fields']?.split(',') ?? defaultMqttConfig.distinctFields ?? [],
prefix: args['mqtt-topic'] ?? 'smartmeter',
url: args['mqtt-url'],
last_reset: args['mqtt-last-reset'],
last_reset_solar: args['mqtt-last-reset-solar'],
};
}

Expand Down
5 changes: 2 additions & 3 deletions src/output/http-output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import IntervalOutput from './interval-output';
import P1Reader from '../p1-reader';
import DsmrMessage from '../dsmr-message';
import GasValue from '../gas-value';
import BaseSolarReader from '../solar/base-solar-input';

export default class HttpOutput extends IntervalOutput {
constructor(private config: HttpPostConfig) {
Expand All @@ -25,9 +24,9 @@ export default class HttpOutput extends IntervalOutput {
}));
}

addSolar(solarReader: BaseSolarReader): void {
// addSolar(solarReader: BaseSolarReader): void {

}
// }

private sendEvent(data: DsmrMessage): Promise<Response> {
const flatData = this.config.fields
Expand Down
6 changes: 5 additions & 1 deletion src/output/interval-output.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EventEmitter } from 'events';
import TypedEmitter from 'typed-emitter';
import { SunspecResult } from '@svrooij/sunspec/lib/sunspec-result';
import P1Reader, { Usage } from '../p1-reader';
import { Output } from './output';
import DsmrMessage from '../dsmr-message';
Expand All @@ -10,6 +11,7 @@ interface IntervalOutputEvents {
dsmr: (result: DsmrMessage) => void;
gasUsage: (usage: Usage) => void;
usage: (usage: Usage) => void;
solar: (solar: Partial<SunspecResult>) => void;
}

export default abstract class IntervalOutput extends (EventEmitter as new () => TypedEmitter<IntervalOutputEvents>) implements Output {
Expand Down Expand Up @@ -47,7 +49,9 @@ export default abstract class IntervalOutput extends (EventEmitter as new () =>
}, (this.interval ?? 60) * 1000);
}

addSolar(solarReader: BaseSolarReader): void {}
addSolar(solarReader: BaseSolarReader): void {
solarReader.on('solar', (data) => { this.emit('solar', data); });
}

close(): Promise<void> {
if (this.timer) {
Expand Down
52 changes: 36 additions & 16 deletions src/output/mqtt-output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface MqttDiscoveryMessage {
device_class?: 'power' | 'current' | 'energy' | 'voltage';
state_class?: 'measurement';
json_attributes_topic: string;
last_reset_value_template?: string;
state_topic: string;
name: string;
icon?: string;
Expand Down Expand Up @@ -118,12 +119,22 @@ export default class MqttOutput implements Output {
}
});
} else {
this.sendToMqtt('energy', data);
const withLifetime = {
...data,
last_reset: this.config.last_reset,
};
this.sendToMqtt('energy', withLifetime);
}
}

private publishSolar(data: Partial<SunspecResult>): void {
this.sendToMqtt('solar', data);
const kwData = {
...data,
lifetimeProductionKwh: ((data.lifetimeProduction || 0) / 1000).toFixed(2),
acPowerKwh: ((data.acPower || 0) / 1000).toFixed(3),
last_reset: this.config.last_reset_solar,
};
this.sendToMqtt('solar', kwData);
}

private sendToMqtt(topicSuffix: string, data: any): void {
Expand Down Expand Up @@ -182,6 +193,7 @@ export default class MqttOutput implements Output {
description.value_template = '{{ value_json.totalT1Use }}';
description.name = 'Total power used T1';
description.device_class = 'energy';
if (this.config.last_reset) description.last_reset_value_template = '{{ value_json.last_reset }}';
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/t1-used/config`, description);
}

Expand All @@ -192,6 +204,7 @@ export default class MqttOutput implements Output {
description.value_template = '{{ value_json.totalT2Use }}';
description.name = 'Total power used T2';
description.device_class = 'energy';
if (this.config.last_reset) description.last_reset_value_template = '{{ value_json.last_reset }}';
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/t2-used/config`, description);
}

Expand All @@ -202,6 +215,7 @@ export default class MqttOutput implements Output {
description.value_template = '{{ value_json.totalT1Delivered }}';
description.name = 'Total power delivered T1';
description.device_class = 'energy';
if (this.config.last_reset) description.last_reset_value_template = '{{ value_json.last_reset }}';
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/t1-delivered/config`, description);
}

Expand All @@ -212,6 +226,7 @@ export default class MqttOutput implements Output {
description.value_template = '{{ value_json.totalT2Delivered }}';
description.name = 'Total power delivered T2';
description.device_class = 'energy';
if (this.config.last_reset) description.last_reset_value_template = '{{ value_json.last_reset }}';
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/t2-delivered/config`, description);
}

Expand All @@ -221,19 +236,20 @@ export default class MqttOutput implements Output {
description.value_template = '{{ value_json.currentTarrif }}';
description.name = 'Current tarrif';
delete description.device_class;
delete description.last_reset_value_template;
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/current-tarrif/config`, description);
}

if (data.houseUsage) {
description.json_attributes_topic = `${this.config.prefix}/status/usage`;
description.state_topic = `${this.config.prefix}/status/usage`;
description.unique_id = `smartmeter_${data.powerSn}_house-usage`;
description.unit_of_measurement = 'Watt';
description.value_template = '{{ value_json.val }}';
description.name = 'Current house usage';
description.device_class = 'power';
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/house-usage/config`, description);
}
// if (data.houseUsage) {
// description.json_attributes_topic = `${this.config.prefix}/status/usage`;
// description.state_topic = `${this.config.prefix}/status/usage`;
// description.unique_id = `smartmeter_${data.powerSn}_house-usage`;
// description.unit_of_measurement = 'Watt';
// description.value_template = '{{ value_json.val }}';
// description.name = 'Current house usage';
// description.device_class = 'power';
// this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/house-usage/config`, description);
// }

// Total Gas used
if (data.gasSn) {
Expand All @@ -248,6 +264,7 @@ export default class MqttOutput implements Output {
description.value_template = '{{ value_json.gas.totalUse }}';
description.name = 'Total gas usage';
description.icon = 'mdi:gas-cylinder';
if (this.config.last_reset) description.last_reset_value_template = '{{ value_json.last_reset }}';
delete description.device_class;
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/gas/config`, description);
}
Expand All @@ -266,22 +283,25 @@ export default class MqttOutput implements Output {
sw_version: `${this.pkg.name} (${this.pkg.version})`,
},
unique_id: `solar-invertor_${solar.serial}_total`,
unit_of_measurement: 'Wh',
unit_of_measurement: 'kWh',
json_attributes_topic: `${this.config.prefix}/status/solar`,
state_topic: `${this.config.prefix}/status/solar`,
name: 'Lifetime solar production',
icon: 'mdi:solar-power',
device_class: 'energy',
state_class: 'measurement',
value_template: '{{ value_json.lifetimeProduction }}',
value_template: '{{ value_json.lifetimeProductionKwh }}',
};
if (this.config.last_reset_solar) description.last_reset_value_template = '{{ value_json.last_reset }}';

this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/solar-total/config`, description);

description.name = 'Current solar production';
description.unique_id = `solar-invertor_${solar.serial}_current`;
description.unit_of_measurement = 'Watt';
description.value_template = '{{ value_json.acPower }}';
description.unit_of_measurement = 'kW';
description.value_template = '{{ value_json.acPowerKwh }}';
description.device_class = 'power';
delete description.last_reset_value_template;
this.publishDiscoveryMessage(`${this.config.discoveryPrefix}/sensor/${this.config.prefix}/solar-current/config`, description);
}

Expand Down
15 changes: 11 additions & 4 deletions src/output/tcp-output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import BaseSolarReader from '../solar/base-solar-input';
export default class TcpOutput implements Output {
private server?: TcpServer;

constructor(private port: number, private raw = false, private startServer = true) {
constructor(private port: number, private socketType: 'tcp' | 'mqtt' | 'raw', private startServer = true) {
}

start(p1Reader: P1Reader): void {
Expand All @@ -15,13 +15,14 @@ export default class TcpOutput implements Output {
}

this.server = new TcpServer({ port: this.port, host: '0.0.0.0', maxConnections: 3 });
if (this.raw) {
if (this.socketType === 'raw') {
p1Reader.on('line', (line) => {
this.server?.publish(`${line}\r\n`);
});
} else {
p1Reader.on('dsmr', (data) => {
this.server?.publishAsJson(data, '\r\n');
const niceData = this.socketType === 'mqtt' ? { topic: 'power', payload: data } : data;
this.server?.publishAsJson(niceData, '\r\n');
});
}

Expand All @@ -36,7 +37,13 @@ export default class TcpOutput implements Output {
}
}

addSolar(solarReader: BaseSolarReader): void { }
addSolar(solarReader: BaseSolarReader): void {
if (this.socketType === 'mqtt') {
solarReader.on('solar', (data) => {
this.server?.publishAsJson({ topic: 'solar', payload: data }, '\r\n');
});
}
}

close(): Promise<void> {
this.server?.stop();
Expand Down
4 changes: 2 additions & 2 deletions src/output/web-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export default class WebServer implements Output {
// ws.isAlive = true
// })
if (this.lastReading) {
ws.send(JSON.stringify(this.lastReading));
ws.send(JSON.stringify({ topic: 'power', payload: this.lastReading }));
} else {
ws.send('{"err":"No reading just yet"}');
}
Expand Down Expand Up @@ -119,7 +119,7 @@ export default class WebServer implements Output {
if (this.wsServer) {
const msg = {
topic,
data,
payload: data,
};
const readingString = JSON.stringify(msg);
this.wsServer.clients.forEach((client) => {
Expand Down
16 changes: 8 additions & 8 deletions src/output/wwwroot/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ function createSocket() {
const wsUrl = `${protocol}://${window.location.hostname}:${window.location.port}/`;
socket = new WebSocket(wsUrl);
socket.onmessage = (msg) => {
const data = JSON.parse(msg.data);
const parsedMsg = JSON.parse(msg.data);
// console.log('Got data from server %s', JSON.stringify(data, null, 2))
if (data.err) {
if (parsedMsg.err) {
console.error(err);
return;
}
if (data.topic === 'dsmr') updateData(data.data);
else if (data.topic === 'solar') updateSolar(data.data);
if (parsedMsg.topic === 'dsmr') updateData(parsedMsg.payload);
else if (parsedMsg.topic === 'solar') updateSolar(parsedMsg.payload);
else {
console.log(data);
console.log(parsedMsg);
}
};
socket.onopen = (ev) => {
Expand Down Expand Up @@ -125,14 +125,14 @@ function updateSolar(data) {
$('.sunProduction').text(Math.round(data.acPower || 0));
$('.sunTotal')
.text(Math.round((data.lifetimeProduction || 0) / 10) * 10 / 1000)
.attr('title', (data.lifetimeProduction / 1000));
updateHouseUsage();
.attr('title', (data.lifetimeProduction / 1000).toFixed(2));
//updateHouseUsage();
}

function updateHouseUsage() {
if (solarData && powerData) {
$('.solar').removeClass('hide');
let houseUsage = solarData.acPower + (powerData.currentUsage * 1000) - (powerData.currentDelivery * 1000);
let houseUsage = (solarData.acPower + (powerData.currentUsage * 1000) - (powerData.currentDelivery * 1000)).toFixed(0);
$('.houseUsage').text(houseUsage);
}
}
9 changes: 7 additions & 2 deletions src/smartmeter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ export default class Smartmeter {

if (this.config.outputs.jsonSocket) {
console.log(`- Output: JSON TCP socket on port ${this.config.outputs.jsonSocket}`);
this.outputs.push(new TcpOutput(this.config.outputs.jsonSocket, false, true));
this.outputs.push(new TcpOutput(this.config.outputs.jsonSocket, 'tcp', true));
}

if (this.config.outputs.mqttSocket) {
console.log(`- Output: MQTT TCP socket on port ${this.config.outputs.mqttSocket}`);
this.outputs.push(new TcpOutput(this.config.outputs.mqttSocket, 'mqtt', true));
}

if (this.config.outputs.mqtt) {
Expand All @@ -95,7 +100,7 @@ export default class Smartmeter {

if (this.config.outputs.rawSocket) {
console.log(`- Output: Raw TCP socket on port ${this.config.outputs.rawSocket}`);
this.outputs.push(new TcpOutput(this.config.outputs.rawSocket, true, true));
this.outputs.push(new TcpOutput(this.config.outputs.rawSocket, 'raw', true));
}

if (this.config.outputs.webserver) {
Expand Down
4 changes: 2 additions & 2 deletions tests/output-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ describe('DebugOutput', () => {
describe('TcpOutput', () => {
it('Should subscribe to Parsed result event', () => {
const fakeReader = new P1Reader()
const tcpServer = new TcpOutput(3000, false, false)
const tcpServer = new TcpOutput(3000, 'tcp', false)
tcpServer.start(fakeReader)
expect(fakeReader.listenerCount('dsmr')).to.be.eq(1, 'Json TCP output not subscribed to ParsedResult event')
tcpServer.close()
})

it('Should subscribe to Line event for rawSocket: true', () => {
const fakeReader = new P1Reader()
const tcpServer = new TcpOutput(3000, true, false)
const tcpServer = new TcpOutput(3000, 'raw', false)
tcpServer.start(fakeReader)
expect(fakeReader.listenerCount('line')).to.be.eq(1, 'Raw TCP output not subscribed to Line event')
tcpServer.close()
Expand Down

0 comments on commit 3fa7cb2

Please sign in to comment.