-
-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broken client subscription after random aedes instance restart - behind a proxy [question] #775
Comments
What is the qos of the # subscription? What's the qos of published messages? Giving the error I think it's something related to mqemitter as it is the one that shares messages between instances |
Both are with qos = 0. |
Subscription and Publish must be with qos > 0 if you want to be sure that the client with clean session false and subscription http://www.steves-internet-guide.com/mqtt-clean-sessions-example/ |
Unfortunatly, I can't manage the qos of the publishers, but I will try to change the # subscription qos. |
Unfortunatly both must be > 0 in order to work like described in the link in my previous answer:
|
Thank you for the link, I saw all relations, but I think there is some issue with emitter when some nodes restarted. Because even with QoS = o on both sides, everything is delivered perfectly if there are no restarts - I haven't undelivered messages. |
Could you submit a snippet or else that reproduces the issue? |
I have the aedes script in two separated projects on the same VPS. The differences are: listening port and aedes identifier. So the first one listens on port 8883 with ID = aedes_i1, and the second one on port 8884 with ID = aedes_i2. Except for PM2, I tried to dockerize the project - started two containers with different exposed ports. The issue occurs again.
Instances are randomly selected by the load balancer. Ok, I start to restart Aedes nodes randomly. I made a table to show the test: Everything to column "I" works perfectly - after every restart Client 1, which is # subscribed sees messages published by all other clients. The issue occurs in column "J", after the last restart, Client 1 doesn't receive the messages published by Client 2. Actually, every time the count of restarts is different. And something other important. When I test with Mongo Emitter, the issue disappear itself after around 10 minutes. It looks like some delay, queue, or something else. |
Ok so we could consider the issue is mostly related to mqemitters. @gnought @mcollina any clue of what could be the problem here? BTW @rosenvivanov could you give a try if you can reproduce this with sub/pub qos 1? |
I tried with Sub/Pub qos 1 (for Client 1 '#'), but it's the same. |
Ok but you also could emulate them with a simple node script. The best would be to create a server script and a client scipt that emulates this problem If you are able to repro this using aedes+mqttjs I could use that to debug the problem |
Alright, I'll do that and will post the results. |
This could be a starting point for the server: https://github.com/moscajs/aedes/tree/main/examples/clusters |
In my server script I don't use a cluster package: var fs = require('graceful-fs');
var aedes = require('aedes');
var stats = require('aedes-stats');
var {createServer} = require('aedes-server-factory')
var {protocolDecoder} = require('aedes-protocol-decoder')
const mongodb = require('mqemitter-mongodb')
const mongoPersistence = require('aedes-persistence-mongodb')
var mq = mongodb({
url: 'mongodb://XX.XX.XX.XX', database: 'aedes'
});
var persistence = mongoPersistence({
url: 'mongodb://XX.XX.XX.XX', database: 'aedes', ttl: {
packets: 300,
subscriptions: 300
}
})
const aedesInstance = aedes({
id: 'aedes_i1',
concurrency: 20000,
mq: mq,
persistence: persistence,
preConnect: function (client, packet, done) {
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress
}
return done(null, true)
}
})
const server = createServer(aedesInstance, {
trustProxy: true, protocolDecoder
})
server.listen(8883, function () {
stats(aedesInstance, {
interval: 30000
});
console.log('>>>> Server started and listening')
}) The other script listen on 8884 with id = aedes_i2. Is this enough if they share the same mongo emitter? |
Yes but you need a load balancer to use them. If you use cluster package that is done automatically |
I think I found something related to the issue. When I tested with the cluster package, I didn't have undelivered messages to # subscribed, then I decided to remove the static clientId (when using load balancer) of both instances - just uuidv4(). |
I didn't understand what you mean. You firstly tried with a fixed aedes id in options then you left it blanck so it gets a random one (as default) and in both cases this works? Anyway what I suggest is to use fixed aedes ids. |
Issue occurs with fixed ids, when I use random ids - there is no issue. |
Could be something here: https://github.com/moscajs/aedes/blob/main/lib/handlers/connect.js#L227 In mongo: https://github.com/moscajs/aedes-persistence-mongodb/blob/master/persistence.js#L426 But them are not using any brokerid reference |
I've removed persistence at the moment. I'm using only mqemitter-mongodb |
ok but when using clusters you must use on disk persistence |
I got it. When don't use clusters package - just different ports, isn't required to use persistence for the properly messaging share, right? |
False, when there are multuple aedes instances them must share the persistence so you have to use mongo or redis |
Alright, I got it. I'll give more feedback about dynamic and static ids. |
I tested with redis and with mongo. I also increased the instances number - to be more tortuous :) I saw you send me a direction: Actually, I'm trying to see the data in "outgoing" collection in mongo, but always stays empty, "incoming" also. Where can I debug the broadcasting to all clients? |
That is populated only when QoS is > 0
Mqemitter is the responsable of the communication between instance. It follow the same pub/sub logic used in mqtt topics |
I made a repo with several files that reproduce the issue aedes.1.js, aedes.2.js, aedes.3.js - different ports and static ids, same mongo databases. Just put your test mongo url and start. Steps:
I observe the "pubsub" collection in the mongo db, while # doesn't receive from some node, but there are all messages, including these that # doesn't receive. |
|
Does client 1 actually reconnect successfully after node 1 is restarted? |
Yes, I have a console log in both files. Broker printed: Client Connected: client-1 to broker aedes-i1 Furthermore, if I reconnect the # client, then client 1 appears again :) |
I have changed scripts a bit: broker.js: const aedes = require('aedes');
const mongodb = require('mqemitter-mongodb')
const mongoPersistence = require('aedes-persistence-mongodb')
const redisPersistence = require('aedes-persistence-redis')
const redismq = require('mqemitter-redis')
const { createServer } = require('net')
const id = parseInt(process.argv[2]) || 0;
const isMongo = process.argv[3] === 'mongo'
const MONGO_URI = 'mongodb://127.0.0.1';
const REDIS_PORT = 6380;
var mq = isMongo ? mongodb({
url: MONGO_URI, database: 'aedes-test'
}) : redismq({
port: REDIS_PORT,
});
var persistence = isMongo ? mongoPersistence({
url: MONGO_URI, database: 'aedes-test'
}) : redisPersistence({
port: REDIS_PORT,
})
function startInstance(id) {
const BROKER_ID = 'aedes-instance-' + id;
const PORT = 1883 + id;
const aedesInstance = aedes({
id: BROKER_ID,
concurrency: 20000,
heartbeatInterval: 600000,
mq: mq,
persistence: persistence,
preConnect: function (client, packet, done) {
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress
}
return done(null, true)
}
})
const server = createServer(aedesInstance.handle)
server.listen(PORT, function () {
console.log('+++++ Server started and listening on port ', PORT)
})
aedesInstance.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedesInstance.id)
})
aedesInstance.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + aedesInstance.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', aedesInstance.id)
})
}
startInstance(id); clients.js: var mqtt = require('mqtt');
function startClient(id) {
let clientID = 'client-' + id;;
let mqttClient = mqtt.connect('mqtt://127.0.0.1', {
clientId: clientID,
rejectUnauthorized: false,
clean: true,
port: 1883 + id
});
mqttClient.on('connect', function () {
console.log(`>> Connected ${clientID}`);
});
setInterval(() => {
mqttClient.publish('topic-from-' + clientID + '', 'test-message', {
qos: 1
})
}, 3000)
}
for (let i = 0; i < 3; i++) {
startClient(i);
}
let mainClient = mqtt.connect('mqtt://127.0.0.1', {
clientId: 'main',
rejectUnauthorized: false,
port: 1883,
clean: false
});
mainClient.on('connect', function () {
console.log('>> # connected');
mainClient.subscribe('#', { qos: 1 });
});
mainClient.on('message', function (topic, message) {
console.log(new Date().toISOString() + ' - ' + topic.toString() + ' - ' + message.toString());
console.log('=================================');
}); I start brokers one by one with: If I force close and restart one of the instances (not the one when main client is connected) I note that it needs to wait a while to see messages from that instance being delivered to main. But I actually see the messages are correctly received from aedes. |
Seems that this is happening with both persistences. What I noticed is that anyway after around 20 sec messages start arriving |
I've updated the repo - added your files.
Yes, with the test scripts after some seconds messages start arriving. Unfortunately in the real case with real client devices, when traffic goes through the load balancer, this could take a while - sometimes for hours. I'm not sure, could it be something related to the proxy server ... I will set up the broker.js from above behind a proxy, then will post you the results. |
I also noticed that the more clients are runnig the more the messages take to arrive, even without a proxy. IMO the problem is on mqemitter but ATM I hav't the time to investigate. What I could suggest is to try reproduce this using only mqemitter instances. Start 4 mqemitter instances, make one of them subscribe to emitters.js: const mongomq = require('mqemitter-mongodb')
const redismq = require('mqemitter-redis')
const id = parseInt(process.argv[2]) || 0;
const isMongo = process.argv[3] === 'mongo'
const MONGO_URI = 'mongodb://127.0.0.1';
const REDIS_PORT = 6380;
var mq = isMongo ? mongodb({
url: MONGO_URI, database: 'aedes-test'
}) : redismq({
port: REDIS_PORT,
});
if(id === 0) {
mq.on('#', function(message, cb) {
// will ONLY capture { topic: 'hello/my/world', 'something': 'more' }
console.log(new Date().toISOString() + ' - ' + message.topic.toString() + ' - ' + message.payload.toString());
console.log('=================================');
cb()
})
} else {
setInterval(() => {
mq.emit({ topic: 'topic-from-' + id + '', payload: 'test-message'})
}, 3000)
}
console.log('Emitter ' + id + ' started'); |
No, there is no delay. It starts after defined interval of 3secs. |
Ok so excluded it you should check what happens in aedes |
I tried to restart exact one instance many times. The test results: Delays with Redis:
Delays with Mongo:
The delay value increases with each restart. |
Ok found where the bug is: https://github.com/moscajs/aedes/blob/main/lib/client.js#L170 The dedupe function simply rejects the packets as they have a counter that is behind the last received packet |
Fix available in: #777 |
Perfect, thanks a lot. I will test it out and post the results. |
Let me know. I tested it with your repo and it's working |
Proxy server Information
Brokers server information
** Issue **
I have multiple aedes instances which work on different ports, behind HaProxy load balancer. They are alive with PM2. I use Redis MQEmmiter and Redis Persistence. For test purposes, I have 5 clients subscribed to different topics and 1 client subscribed to #. All clients points to proxy IP address. Proxy provides TLS connection with certificate. On every new connection, proxy redirects client to random aedes instance.
Everything works fine, but there is an issue with the client subscribed to #. It's clean connected. Sometimes when some of the brokers restarted, after that restart I see all the clients reconnected again, but the "#" subscribed client receives messages only form the clients connected to the same aedes node. After 1-2 restarts all works correct again.
The important thing here is that happens randomly. I restarted manually 5-10-15 times and everything works fine, but after next restart the issue appears.
I tried with Mongo MQEmmiter and Mongo Persistence. I tried without Persistence. I've also cleaned the script code to "Hello world" version, to reduce any bugs. But nothing helps.
Any ideas?
The text was updated successfully, but these errors were encountered: