Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unacked messages being redelivered #2

Closed
callmehiphop opened this issue Nov 27, 2017 · 21 comments
Closed

unacked messages being redelivered #2

callmehiphop opened this issue Nov 27, 2017 · 21 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@callmehiphop
Copy link
Contributor

From @rhodgkins on November 23, 2017 15:19

In previous (I think v0.12 and lower) versions of pubsub messages were auto acked upon receiving a message (or if this was turned off, had to be manually acked). As I understood this had to happen within the ackDeadlineSeconds time and this could be modified with subscription.modifyAckDeadline().

Now the newer version (v0.15), from looking at the code, uses timers to "lease" out the message (using the acDeadlineSeconds time as the initial lease interval), automatically extending the ack deadline until either message.ack() or message.nack() is called?
What happens if you don't do this for a long period of time? Does the lease timer keep on and on extending the ack deadline?

The reason I'm asking for clarification is that I've seen unacked (that is neither .ack() or .nack() has been called on the message) messages being delivered again after a period of time.

So the following would happen:

  1. Message A published
  2. Message A received and not acked or nacked
  3. Period of time passes - say 3 mins
  4. Message A is received again

I've also had it where I've nacked a message and the current message I'm processing is delivered again.

  1. Message A published
  2. Message A received and not acked or nacked
  3. Message B published and nacked
  4. Message A is received again

I'll try and post some replication steps for the latter issue (its not consistent when I have seen it), but if anyone can confirm by above questions that would be great! Cheers!

Copied from original issue: googleapis/google-cloud-node#2756

@callmehiphop
Copy link
Contributor Author

From @rhodgkins on November 23, 2017 15:44

I've managed to reproduce the duplicated delivery which happens probably 2/5 times:

const PubSub = require('@google-cloud/pubsub')

const pubsub = new PubSub(/*{
       projectId: '',
       keyFilename: ''
} */)

function log(...args) {
	console.log(`[${new Date().toISOString()}]`, ...args)
}
function errLog(...args) {
	console.error(`[${new Date().toISOString()}]`, ...args)
}

const name = 'com.test.domain-redelivery-sub'
	
log(`Creating pubsub for ${name}`)

const subOptions = {
	flowControl: {
		maxMessages: 3
	},
	// Max deadline: https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/create
	ackDeadlineSeconds: 30
}


function handleMessage(m) {
	log(`Received message: ${m.id} / ${m.publishTime}: `, m.data.toString())
	
	const shouldAck = true
	const waiting = 600 * 1000
	
	log(`Waiting ${waiting / 1000}s to reply with ${shouldAck ? 'ACK' : 'NACK'}`)
	setTimeout(() => {
		log(`Reporting message: ${m.id} - ${shouldAck ? 'ACK' : 'NACK'}`)
		
		if (shouldAck) {
			m.ack()
		} else {
			m.nack()
		}
	}, waiting)
}
function handleError(err) {
	errLog(`Error with subscription ${name}:`, err)
	throw err
}

pubsub.topic(name).get({ autoCreate: true }, (topicErr, topic) => {
	if (topicErr) {
		errLog(`Failed to create topic - ${name}:`, topicErr)
		throw topicErr
	}
	topic.createSubscription(`${name}-subscription`, subOptions, (subErr, sub) => {
		if (subErr) return handleError(subErr)
				
		sub.on('message', handleMessage)
		sub.on('error', handleError)
		
		log(`Subscribed to ${name} with subscription ID: ${sub.name}`)
		
		const sendMessage = () => {
			
			const m = `Message @ ${new Date().toString()}`
			log('Sending message:', m)
			topic.publisher().publish(Buffer.from(m), mErr => {
				if (mErr) return errLog('Error publishing:', mErr)
			})
		}
		
		setTimeout(sendMessage, 4 * 1000)
	})
})

Console:

[2017-11-23T15:33:25.453Z] Creating pubsub for com.test.domain-redelivery-sub
[2017-11-23T15:33:30.980Z] Subscribed to projects/bookcreator-dev/topics/com.test.domain-redelivery-sub with subscription ID: projects/bookcreator-dev/subscriptions/com.test.domain-redelivery-sub-subscription
[2017-11-23T15:33:34.982Z] Sending message: Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:33:36.529Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:33:36.529Z] Waiting 600s to reply with ACK
[2017-11-23T15:35:09.606Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:35:09.606Z] Waiting 600s to reply with ACK
[2017-11-23T15:35:21.893Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:35:21.893Z] Waiting 600s to reply with ACK
[2017-11-23T15:35:33.567Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:35:33.568Z] Waiting 600s to reply with ACK
[2017-11-23T15:37:07.460Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:37:07.460Z] Waiting 600s to reply with ACK
[2017-11-23T15:39:08.366Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:39:08.366Z] Waiting 600s to reply with ACK
[2017-11-23T15:40:47.350Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:40:47.350Z] Waiting 600s to reply with ACK
[2017-11-23T15:42:07.097Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:42:07.097Z] Waiting 600s to reply with ACK
[2017-11-23T15:43:11.674Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:43:11.675Z] Waiting 600s to reply with ACK
[2017-11-23T15:43:36.511Z] Reporting message: 174068690240637 - ACK
[2017-11-23T15:44:42.234Z] Received message: 174068690240637 / Thu Nov 23 2017 15:33:36 GMT+0000 (GMT):  Message @ Thu Nov 23 2017 15:33:34 GMT+0000 (GMT)
[2017-11-23T15:44:42.234Z] Waiting 600s to reply with ACK
[2017-11-23T15:45:09.589Z] Reporting message: 174068690240637 - ACK
[2017-11-23T15:45:21.877Z] Reporting message: 174068690240637 - ACK
[2017-11-23T15:45:33.548Z] Reporting message: 174068690240637 - ACK

Pubsub version: 0.14.8 (I shall try 0.15 but I'm not sure it'll make a difference as it was only GAX that was updated?)

Also - I let the code run for a while after the initial message was ACKed - and you can see it was still redelivered then.

@callmehiphop
Copy link
Contributor Author

@rhodgkins thanks for opening this, I need to dig into this a little more, but I think there could be a latency issue where we try and extend the ack deadline but before the request is sent it expires -
hence the redelivery.

@stephenplusplus stephenplusplus changed the title pubsub: unacked messages being redelivered unacked messages being redelivered Dec 13, 2017
@stephenplusplus stephenplusplus added priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Dec 13, 2017
@ehacke
Copy link

ehacke commented Dec 15, 2017

👍 I think I'm seeing this issue as well.

@ehacke
Copy link

ehacke commented Dec 17, 2017

Could this be related to this issue: googleapis/google-cloud-go#778 ?

Seems to be similar.

Right now I'm seeing a large number of 499, 503 and 504 errors in the console for all of my node pubsub clients. Throughput is also quite low, it seems like the client is reading from pubsub at a much lower rate than it should, which could be explained by a large percentage of the StreamingPull requests failing due to 499 or 503 errors.

@callmehiphop
Copy link
Contributor Author

@ehacke I can't say for certain, but since moving to the StreamingPull rpc, I haven't seen a single stream close gracefully. For some reason the upstream API almost always closes with some kind of error. I would only be concerned if those errors were being generated from ack or modifyAckDeadline requests.

@ehacke
Copy link

ehacke commented Dec 18, 2017

@callmehiphop thanks for info. Yeah I'm sort of at a loss for what is going on in my system right now so I'm looking everywhere. The other thing that is suspicious is the 98th percentile latency as seen in the Cloud Console for the PubSub API is regularly spiking over 500,000 ms. Which seems bad.

@kir-titievsky
Copy link

This is working as intended: Pub/Sub will continue trying to re-deliver a message until a nack or ack. Streams are really not meant to close gracefully since they are terminated by the server every so many minutes, by design. The client rebuilds them very quickly so these errors are unimportant.

@rhodgkins
Copy link
Contributor

rhodgkins commented Dec 22, 2017

@kir-titievsky is this a change from how pub-sub used to work? Previously when running on GAE each instances (say 2) would be subscribed to a topic, and only 1 instance would handle the message at a time until the message ack deadline expired (i.e., message was acked or nacked). From what you've said above, either of those instances could handle the message at the same time?

@kir-titievsky
Copy link

@rhodgkins No. Sorry. When I said "will continue trying to re-deliver a message": I meant that Pub/Sub will keep trying to deliver the message, but once it was nacked or it's been out longer than the ackDeadline. Pub/Sub still does it's best to have only one copy of any given message out. The basic guarantees of the service have not changed: we do our best to send each message once to a single subscriber client instance and wait for ack, nack, modifyAckDeadline, or an expiration before redelivering it. Streams are just a mechanism for delivering these messages efficiently. Since we are an "at-least-once" service, occasionally the same message will be sent multiple times, which might mean it is processed by different instances of a subscriber client.

Does this make sense?

@rhodgkins
Copy link
Contributor

@kir-titievsky yeh it does thanks!

So the only issue here now is the redelivery of acked messages?

@kir-titievsky
Copy link

@rhodgkins To be more precise, we see a rate of re-delivery higher than we want in cases where messages are not acknowledged within seconds by applications using streamingPull (and client libraries that use it). Re-delivery of acked messages should be expected since the service is at-least-once. That said, we are working to reduce the rate of re-delivery to a minimum and eventually put some specific SLOs around that rate.

@rossj
Copy link

rossj commented Jan 9, 2018

I've been experiencing some of these same re-delivery issues. Some of my tasks take up to an hour, so ideally the message would not be re-delivered as long as my client is actively leasing & updating the ack deadline of the message.

As it is, my Node subscription will often receive duplicate messages of still-leased and un-acked messages at somewhat random intervals. Most of the time the duplicate messages have the same ack ID, causing the subscription's internal inventory_.lease array to fill with duplicate ack IDs.

I was able to greatly reduce the frequency of duplicates by setting maxConnections: 1 in the client's subscription options. With this setting, I can consistently maintain a lease on a message for precisely 15 minutes before the subscription client receives a duplicate.

I've also tried setting flowControl: { maxMessages: 1} in the subscription settings, in order to just ignore any subsequent messages before the 1st is done, but this doesn't work either. What happens is that the connection receives a duplicate after 15 minutes, notices that the current lease array is full, and proceeds to nack the duplicate message. However, because the ack ID of this duplicate message is the same as the original, the effect is the same as nacking the original message. It is removed from the lease array and the ack deadline is updated to 0. This then immediately triggers a re-delivery of the message for a 3rd time to the client, which now has an empty lease array, which then re-delivers the message to my subscription callback.

@kir-titievsky
Copy link

kir-titievsky commented Jan 12, 2018 via email

@rhodgkins
Copy link
Contributor

@kir-titievsky no problem - this is what betas are for I guess 👍 !
But thanks for the update and could you let us know here once the fixed is released?

@kir-titievsky
Copy link

kir-titievsky commented Jan 23, 2018 via email

@rhodgkins
Copy link
Contributor

@kir-titievsky yeh it looks much better just from a brief test! I'll keep an eye on the test env we're using it in and report back. Thanks again for your help 🙂

@stephenplusplus
Copy link
Contributor

I’m going to close for now, but we will reopen if you experience more trouble.

@shwei
Copy link

shwei commented Feb 8, 2018

I am very lost when it comes to setting ackDealine in Node; in desperate need of trying to understand pubsub a little better.

  1. When I set {ackDeadlineSeconds: 120} in subscription creation, the returned Subscription will have ackDeadlineSeconds as 120 but ackDeadline as 10000. Here is the snippet of the object:
  "pubsub": {
    "options": {
      "grpc.keepalive_time_ms": 300000,
      "grpc.max_receive_message_length": 20000001,
      "libName": "gccl",
      "libVersion": "0.16.2",
      "scopes": [
        "https://www.googleapis.com/auth/cloud-platform",
        "https://www.googleapis.com/auth/pubsub"
      ]
    },
... 
...
...
  "connectionPool": null,
  "ackDeadline": 10000,
  "maxConnections": 5,
  "metadata": {
    "labels": {},
    "name": "projects/temp-project-name/subscriptions/test-123",
    "topic": "projects/temp-project-name/topics/results",
    "pushConfig": {
      "attributes": {},
      "pushEndpoint": ""
    },
    "ackDeadlineSeconds": 120,
    "retainAckedMessages": false,
    "messageRetentionDuration": {
      "seconds": "604800",
      "nanos": 0
    }
  }
}

How is ackDeadline different than ackDeadlineSeconds? And how do I make them consistent?

  1. I called subscription.modifyAckDeadline_ function to set ackDeadline inside the on 'message' callback.
    let topic = pubsub.topic(topicName);
    const results = await topic.get({autoCreate: true});
    topic = results[0];
...
...
    const subscription = topic.subscription(subscriptionName, {ackDeadlineSeconds: 120});
    await subscription.modifyAckDeadline_(ackId, ackDeadlineSeconds);
    subscription.setLeaseTimeout_(); // is setting lease timeout required?

Is this the correct way of modifying ack deadline? How do I verify it was modified successfully?

@kir-titievsky
Copy link

kir-titievsky commented Feb 8, 2018 via email

@shwei
Copy link

shwei commented Feb 8, 2018

Hi Kir, thanks for replying. I have multiple listeners on the same subscription. It takes about 50 secs for a subscriber to process one message and ack it. I notice there are multiple subscribers getting the same message about 50 secs apart and they ack the same message successfully. That's why I tried to change the ack deadline in 2 different approaches -- via subscription creation and via subscription.modifyAckDeadline_ function which I haven't had any success doing so far.

@kir-titievsky
Copy link

kir-titievsky commented Feb 8, 2018 via email

@google-cloud-label-sync google-cloud-label-sync bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Jan 31, 2020
@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Apr 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

8 participants