Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All unacknowledged messages are delivered at once on subscription resumption #1009

Open
martin-spinks opened this issue Jan 14, 2020 · 2 comments

Comments

@martin-spinks
Copy link

Background
We've noticed a behaviour we would like to suggest a change for. We are running a number of subscribers (in both node and python), these are subscribed with a durable name a queue group and manual acks. When running multiple instances of these subscribers, messages are distributed across the instances and subscribers can drop out and come back fine (durable name). Not all messages that are sent will be acknowledged, if a piece of work fails its message is not acknowledged and nats-streaming will retry later.

Request
We have noticed that if all the subscribers close their connections the first one back gets a huge influx of unacknowledged messages (it can be a lot and we suspect it maybe the entire backlog). Can nats-streaming therefore honour the maxInFlight setting for that channel when attempting to re-deliver all the unacknowledged messages on resumption?

We understand this is related to #732 and #187. We don't have any problem with unacknowledged messages being redelivered first on resume, this is actually fine for our use case. But having all messages re-delivered at once is a problem as there could be a LOT of them. This could send the first listening subscription into a sudden heavy load situation.

To replicate
If we have a running nats-streaming-server and we have a subscriber (below in Node) that only acknowledges messages that are strings that equal bob. (subscriber.js)

const stan = require('node-nats-streaming');
const os = require('os');

// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');

// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {
  // Generic function
  const caller = (msg) => {
    console.log('Function', msg.getSequence(), msg.getData());
    if (msg.getData() === 'bob') {
      console.log('Got a bob message');
      msg.ack()
    }
  }

  // Generic function
  const wrapper = (msg) => {
    caller(msg);
  }
  
  const exit = () => {
    transport.close();
  }

  // Set options
  const options = transport.subscriptionOptions();
  options.setAckWait(2000);
  options.setMaxInFlight(2);
  options.setManualAckMode(true);
  options.setDurableName('nlp.a');

  // Subscribe
  const subscription = transport.subscribe('nlp.a', 'q.nlp.a', options);
  subscription.on('message', wrapper);

  process.on('SIGTERM', exit);
  process.on('SIGINT', exit);
  process.on('SIGQUIT', exit);

  // Done
  console.log('Ready');
})

If we run multiple instances of this, messages are shared between the instances (queue name), if one instance is stopped and started it will resume and join the group again. If we send bob messages and non bob messages via: (caller.js)

const stan = require('node-nats-streaming');
const os = require('os');

// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');

// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {

  for (let i = 0; i < 8; i++) {
    transport.publish('nlp.a', JSON.stringify({ a: 'a' }), function (err, uid) {
      console.log('Message recieved ', uid);
    })    
  }

  for (let i = 0; i < 8; i++) {
    transport.publish('nlp.a', 'bob', function (err, uid) {
      console.log('Bob message recieved ', uid);
    })
  }
})

You can see bob messages appearing and being acknowledged and after sometime you can observe all the retries happening. If you then close both instances of the of the subscriber.js and start just one instance of subsriber.js again; it receives more that the maxInFlight messages (in this example 2). This number could get very large very quickly in our situation.

Environment

nats-streaming-server version 0.16.2, nats-server: v2.0.4
@kozlovic
Copy link
Member

Redeliveries do not honor maxInflight for the reason that if the server sends messages up to MaxInflight but the client does not ack them or simply missed them, then the server would not be able to redeliver since the number of outstanding messages would already be at MaxInflight.

But in the case of a subscription restart, we may be able to limit the number of redelivered messages to MaxInflight. That is likely to happen only for queue subs (since this is the only case where more un-ack'ed messages than MaxInflight can be assigned to a subscription), or in cases where a non-queue durable subscription is restarted with a lower MaxInflight I guess...

Will look into that, thanks!

@martin-spinks
Copy link
Author

Hi @kozlovic, thank you so much for your response and for looking into this. You are correct it is the case of the fist subscription back (say after a restart) will potentially get a lot of messages (in our case its a queue + durable subscription).

For us, it would be sensible to be able to limit the number that are redelivered, something like the maxInFlight for the channel, which would mean that first subscription back wasn't swamped with a bucket load of messages.

Is this a feature you see being added in the future?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants