Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub streamingPull subscriber: large number of duplicate messages, modifyAckDeadline calls observed #2465

Closed
kir-titievsky opened this issue Sep 26, 2017 · 34 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. status: blocked Resolving the issue is dependent on other work. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@kir-titievsky
Copy link

Large number of duplicate messages is observed with Pub/Sub streamingPull client library using code that pulls from a Pub/Sub subscription and inserts messages into BigQuery with a synchronous blocking operation, with flowControl set to max 500 outstanding messages. See [1] for code.

For the same code, we also observe an excessive number of modifyAckDeadline operations (>> streamingPull message operations). And tracing a single message, we see modifyAcks and Acks in alternating order for the same message (modAck, modAck, Ack, modAck, Ack) [2]. This suggest that the implementation might fail to remove Ack'ed messages from a queue of messages to process and keep re-processing messages already on the client. This also suggests that ack requests may not actually be sent.

[2] https://docs.google.com/spreadsheets/d/1mqtxxm0guZcOcRy8ORG0ri787XLQZNF_FLBujAiayFI/edit?ts=59cac0f0#gid=2139642597

[1]


package kir.pubsub;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.bigquery.*;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.AckReplyConsumer;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Sub {

// Instantiate an asynchronous message receiver

public static void main(String... args) throws Exception {
    final String projectId = args[0];
    final String subscriptionId = args[1];

    final BigQuery bq = BigQueryOptions.getDefaultInstance().getService();
    final String datasetName = "pubsub_debug";
    final String tableName = "gke_subscriber";
    final AtomicInteger messageCount = new AtomicInteger(0);
    final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    MessageReceiver receiver = new MessageReceiver() {
                public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                    // handle incoming message, then ack/nack the received message
                    System.out.printf("%s,\t%s,\t%d\n",message.getData().toStringUtf8()
                            , message.getMessageId()
                            , messageCount.incrementAndGet());
                    Map<String,Object> row = new HashMap<String,Object>();
                    long timestampMs = message.getPublishTime().getSeconds()*1000 + message.getPublishTime().getNanos() / 1000000;
                    Date timestampDate = new Date(timestampMs);

                    row.put("messageId", message.getMessageId());
                    row.put("messageData", message.getData().toStringUtf8());
                    row.put("messagePublishTime", dateFormat.format(timestampDate));
                    // a version of this code without the bq.insert was ran, where consumer.ack() 
                    // was called immediately. The results were the same.
                    InsertAllResponse response = bq.insertAll(
                                InsertAllRequest.newBuilder(TableId.of(projectId, datasetName, tableName)).addRow(row).build()
                    );
                    if (response.hasErrors()) {
                        System.err.println("Error inserting into BigQuery " + response.toString() );
                        consumer.nack();
                    } else{
                        consumer.ack();
                    }
                }

            };

    SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
    Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).setFlowControlSettings(
            FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()
    ).build();
    subscriber.startAsync();
    subscriber.awaitRunning();
    System.out.println("Started async subscriber.");
    subscriber.awaitTerminated();
}

}

@garrettjonesgoogle garrettjonesgoogle added api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Sep 26, 2017
@pongad
Copy link
Contributor

pongad commented Sep 27, 2017

@kir-titievsky Do you have a workload that can reliably reproduce this? If you do, could you try removing the flow control (hopefully the workload isn't so large that it crashes your machine)? If the problem goes away, this is probably a dup of #2452; the symptoms are nearly identical.

If this doesn't fix the problem, could you share the repro workload with me?

EDIT: If the workload is too large to remove flow control, the fix for the linked issue is already in master, so we can to test with that version. Slightly less convenient as we'll need to compile from source.

@robertsaxby
Copy link

The behaviour observed with #2452 was seen with the older non streaming pull implementation (0.21.1-beta). This issue came about when using the newer client library. It might also be worth noting that with the Flow Control set to 1000 max messages on the older library duplicates where not seen, just the stuck messages.

@pongad
Copy link
Contributor

pongad commented Sep 28, 2017

@robertsaxby That makes sense. I can reproduce the messages getting stuck, but not redelivery.

@kir-titievsky I need a little help understanding the spreadsheet. Are all rows for the same message? I assume that stream_id uniquely identifies a StreamingPull stream? Ie, one physical computer can have multiple IDs by opening many streams, but one stream ID is unique to one computer?

FWIW, I have a PR opened to address a potential race condition in streaming. It's concievable that the race condition causes this problem.

If you could set up a reproduction, please let me know.

@kir-titievsky
Copy link
Author

@pongad You are right on all counts about the spreadsheet. All rows are for the same message, including traffic from several bidi streams, that had been opened at different times.

@kir-titievsky
Copy link
Author

@pongad Did a couple experiments:

  • No flow control, single machine: everything is acked within seconds ~10 modify ack deadline operations for 10K messages.
  • Added a synchronous 50ms sleep to every operation. Got a peak of 7K modifyAckDeadline operations over a minute, about 10K total. modifyAckDeadline ops peaked around the same time as Ack operations. Overall, ~2-3 minutes of activity. This is very much unexpected as the subscription has a 10 minute ack deadline (at least that's the setting).

@pongad
Copy link
Contributor

pongad commented Oct 18, 2017

To summarize: This is an issue on the server side. Currently the client library does not have enough information to properly handle ack deadlines.

In the immediate term, consider using v0.21.1. That version uses a different (slower) pubsub endpoint, that isn't affected by this problem.

@pongad
Copy link
Contributor

pongad commented Nov 1, 2017

Pubsub team is working on a server-side mitigation for this. The client lib will need to be updated to take advantage of it. Fortunately, this new feature "piggybacks" on an already existing one, so the work on client lib can progress right away.

I hope to create a PR for this soon.

@pongad
Copy link
Contributor

pongad commented Nov 10, 2017

Update: the server side release should happen this week. The feature should be enabled next week. The client (in master) has already been modified to take advantage of this new feature.

When the server feature is enabled, we'll test to see how this helps.

@pongad
Copy link
Contributor

pongad commented Nov 22, 2017

The server-side fix has landed. If you are affected, could you try again and see if you observe fewer duplicates?

While we expect the fix to help reduce duplication on older client libs, I'd encourage moving to latest release (v0.30.0) since more fixes has landed during that time.

@ericmartineau
Copy link

We are seeing this behavior currently. We are using v0.30.0-beta of the pubsub library, and our subscriptions are all set to a 60s ack deadline. We have a subscription that is currently extended the deadline for over 750K unacked messages:

Screenshot of stackdriver below:
https://screencast.com/t/KogMzx0q7f5

Our receiver always performs either ack() or nack(), and the occasional message that sneaks through when symptoms look like this complete within 1s, usually faster.

I deleted the subscription and recreated it, and saw the modack calls drop to zero, only to climb back almost instantly to where they were before.

Is there anything else that will help you/us troubleshoot this issue?

@kir-titievsky
Copy link
Author

kir-titievsky commented Dec 11, 2017 via email

@ericmartineau
Copy link

This is the ack rate during the same window:
https://screencast.com/t/FR2utgEau

The publish rate:
https://screencast.com/t/2bT4ZcMkJwG

We have a backlog of 1MM messages. Also, we have three separate identical environments with roughly the same usage, and we're seeing this issue on two of them. A third environment seems unaffected.

@kir-titievsky
Copy link
Author

kir-titievsky commented Dec 11, 2017 via email

@ericmartineau
Copy link

ericmartineau commented Dec 11, 2017

@kir-titievsky
https://screencast.com/t/Nrlxws3v (the drop at the end is when I deleted and recreated the topic)

@kir-titievsky
Copy link
Author

kir-titievsky commented Dec 11, 2017 via email

@ericmartineau
Copy link

We're either acking or nacking every message. My assumption is that nacking would cause the message to become available for immediately redelivery.

@kir-titievsky
Copy link
Author

kir-titievsky commented Dec 12, 2017 via email

@ericmartineau
Copy link

I've been reading more this morning... and I realized that a nack() in our code would show up as a "Modify Ack Deadline Operation" (because it effectively sets the ack deadline to 0). Is that correct?
Is there any possibility for the "Ack Padding" (my understanding is that the pubsub client will request a deadline extension for every message to pad for latency) to conflict with a nack?

FWIW - when we're seeing the hundreds of thousands of modacks (the screenshot I posted above), our code is effectively idle - specifically that our MessageReceiver is not receiving any messages from the dispatcher.

Other than that, is there any way to see the pubsub client's 'deadline extension' calls vs nack()? I've posted the code below as well in case there's something we could do differently on our side.

return (message, acker) -> {
   // isRunning() is an internal check for application shutdown state to not attempt processing 
   // the message due to the likelihood that resources needed to process won't be available
    if (!isRunning()) {
        acker.nack();
        log.info("Skipping message {} because we're shutting down", message.getMessageId());
    }

    try {
        final ImportResult importResult = handleImport(message.getData().toStringUtf8());
        
        //... truncated - log to metrics system ...

        if (!importResult.isRetryable()) {
            acker.ack();
        } else {
            // We do pessimistic resource checks, and flag the message as retryable if it's unable 
            // to obtain a "lock" on a resource in a short period of time 
            acker.nack();
        }
    } catch (Throwable e) {
        // We're trapping all exceptions in the block above - this expands to catch `Throwable`
        acker.ack();
        log.error("sync/subscription/unexpected Unexpected exception processing message", e);
    } 
};

@pongad
Copy link
Contributor

pongad commented Dec 13, 2017

It's a little annoying, but we log to logger named "com.google.cloud.pubsub.v1.MessageDispatcher". If you turn logging to FINER, we log "Sending NNN nacks". We send it very often though.

If your code is idle, this sounds like a serious problem. If there are messages pending, they should be scheduled immediately. I have a few guesses.

  1. Flow control leak. The client keeps track of messages in progress. If the user code does not call ack or nack, the client will think that these messages are still running and not execute more messages. From @smartytime 's response, this seems unlikely.

  2. Maybe the client is somehow not properly releasing the flow control, or failing to schedule new tasks despite the flow control being free.

  3. Maybe the executor is deadlocked. This could only happen if you provide your own executor for client lib to use, some task adds more job to the executor, then waits for the new job to finish. Eventually all threads just wait for tasks to finish so no one actually gets to execute the new tasks.

To rule out (3), could you try running jstack on a running JVM? It will contain stack trace of all threads, so it should help us see if threads are deadlocking. It might contain confidential info though.

To help diagnose (2), could you share with us how you're creating the Subscriber? Maybe your settings are tickling a bug we haven't seen before, etc.

@ericmartineau
Copy link

pongad - I've been trying to prep more intel, but I'll throw out a few more things that might be useful:

       return Subscriber.newBuilder(subscriptionPath, receiver)
                .setParallelPullCount(1)
                .setMaxAckExtensionPeriod(Duration.ZERO)
                .setFlowControlSettings(FlowControlSettings.newBuilder()
                        .setMaxOutstandingElementCount(10L)
                        .setMaxOutstandingRequestBytes(null)
                        .setLimitExceededBehavior(LimitExceededBehavior.Block)
                        .build())
                .setCredentialsProvider(fixedCredentialsProvider)
                .setExecutorProvider(executorProvider)
           .build();
  1. we are running this in a cluster against two topics/subscriptions. Each instance has a single Subscriber for each subscription, and each Subscriber uses a dedicated ScheduledThreadPoolExecutor using FixedExecutorProvider.
  2. Local dev (unable to reproduce) runs oracle jdk 1.8.0_144, production runs openjdk 8u111
  3. I can connect development instances to the prod subscription without any issues (as of yet).
  4. We rolled out a change that allowed us to start/stop Subscriber instances, modify the # of available threads, etc. If I shut down all subscribers, the modacks, pulls all drop to 0 (so, no zombie instances). When this odd behavior is occuring, if I start even a single thread on a single instance for a single subscription, the modacks will jump up to where they were before.

For example, there's a single thread for a single subscriber across the entire cluster that's currently "working":

State: waiting
Wait Count: 188852
Block Count: 21
------------------------
	-sun.misc.Unsafe.park
	-java.util.concurrent.locks.LockSupport.park
	-java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await
	-java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take
	-java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take
	-java.util.concurrent.ThreadPoolExecutor.getTask
	-java.util.concurrent.ThreadPoolExecutor.runWorker
	-java.util.concurrent.ThreadPoolExecutor$Worker.run
	-java.lang.Thread.run

Our logs show no task completion (we log whenever we ack/nack), and stackdriver shows ~90 modack op/s, and ~45 pull op/s.

The TRACE logs look like this (I've removed all the Sending 0 xxx.

Sending 10 receipts
2017-12-14 16:38:41.974
Sending 10 nacks
2017-12-14 16:38:41.973
Sending 6 nacks
2017-12-14 16:38:41.873
Sending 10 receipts
2017-12-14 16:38:41.773
Sending 4 nacks
2017-12-14 16:38:41.772
Sending 2 receipts
2017-12-14 16:38:41.672
Sending 2 nacks
2017-12-14 16:38:41.672
Sending 10 receipts
2017-12-14 16:38:41.471
Sending 10 nacks
2017-12-14 16:38:41.471
Sending 2 acks
2017-12-14 16:38:41.420
Sending 7 nacks
2017-12-14 16:38:41.371
Sending 2 receipts
2017-12-14 16:38:41.320
Sending 1 receipts
2017-12-14 16:38:41.319
Sending 1 acks
2017-12-14 16:38:41.319
Sending 10 receipts
2017-12-14 16:38:41.270
Sending 3 nacks
2017-12-14 16:38:41.270
Sending 3 receipts
2017-12-14 16:38:41.170
Sending 3 nacks
2017-12-14 16:38:41.169
Sending 10 receipts
2017-12-14 16:38:40.969
Sending 10 nacks

@pongad
Copy link
Contributor

pongad commented Dec 15, 2017

@smartytime Thank you for the info! From this, I believe the problem is not with the client lib. Reasoning below:

The stack dump of your thread shows that it's waiting for things to do. The executor is definitely not deadlocked.

However, our logs seem to disagree: You said

Our logs show no task completion (we log whenever we ack/nack)

From the MessageDispatcher log, the subscriber is actually quite active. In the window of 1 second, it has received 58 messages (the "receipts" [1]), nacked 55, and acked only 3. Unless the client lib is doing something incredibly wrong, it looks like you're either calling consumer.ack, consumer.nack, or throwing exception from receiveMessage (exceptions count as nack). I think this explains the message backlog: most messages are getting nacked and redelivered.

[1] The "receipts" feature is added to help with message duplication problem. The client lib sends modacks to let the server know it has received the messages. This might explain why we're seeing a lot of modacks. Unfortunately the receipts are confusing the metric.

Lastly, I think setMaxAckExtensionPeriod(Duration.ZERO) is not the right thing to do. The client lib periodically sends modacks to keep your messages alive; this is potentially important even if you process the messages quickly. For example, you take only 1s to process a message and the message deadline is 60s. The server might send us 100 messages in one batch. You'd only be able to process 60 of them; the other 40 will get redelivered. Judging from the dispatcher log, I don't think this is the problem, but I can't completely rule it out.

@ericmartineau
Copy link

@pongad - Much thanks. I've been trying to read through the pubsub code so you don't have to explain it all... feel like I'm slowly catching up.

Knowing about the receipts helps - in an ideal world where messages are processed within their original lease, should we see roughly a 1/1 modack/message ratio? (one receipt modack per message)

What is an appropriate value for maxAckExtensionPeriod? And does that value represent the time beyond the original deadline? Or is it inclusive of the original deadline? In your example above, if I wanted the client to release any messages not processed within 5 minutes would I set the maxAckExtensionPeriod to 5 minutes, or 4 minutes?

There was another thing I was looking into that was confusing me. I see this occasionally in thread dumps for subscriber executor threads. :

Thread Name: Pubsub Subscriber:  sync-events-alexandria
State: runnable
Wait Count: 173806
Block Count: 34
------------------------
	-java.math.BigInteger.oddModPow
	-java.math.BigInteger.modPow
	-sun.security.rsa.RSACore.crtCrypt
	-sun.security.rsa.RSACore.rsa
	-sun.security.rsa.RSASignature.engineSign
	-java.security.Signature$Delegate.engineSign
	-java.security.Signature.sign
	-com.google.api.client.util.SecurityUtils.sign
	-com.google.api.client.json.webtoken.JsonWebSignature.signUsingRsaSha256
	-com.google.auth.oauth2.ServiceAccountJwtAccessCredentials.getJwtAccess
	-com.google.auth.oauth2.ServiceAccountJwtAccessCredentials.getRequestMetadata
	-io.grpc.auth.GoogleAuthLibraryCallCredentials$1.run
	-java.util.concurrent.Executors$RunnableAdapter.call
	-java.util.concurrent.FutureTask.run
	-java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201
	-java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
	-java.util.concurrent.ThreadPoolExecutor.runWorker
	-java.util.concurrent.ThreadPoolExecutor$Worker.run
	-java.lang.Thread.run

Normally, I'd expect something like this:

	-[our message handler here]
	-com.google.cloud.pubsub.v1.MessageDispatcher$4.run
	-java.util.concurrent.Executors$RunnableAdapter.call

@pongad
Copy link
Contributor

pongad commented Dec 18, 2017

@smartytime I'll answer out of order :)

The stack trace looks like credentials being refreshed. Once in a while (I believe one hour), a gRPC connection needs to refresh credentials, then the credentials is cached and usable for another hour etc. By default, we open a few connections, so I think it makes sense that you're seeing this once in a while.

Ideally, you'd observe < 2 modack/msg in the current implementation.
One of the them is the receipt.
In the current implementation, deadline extension isn't very smart. When it runs, it extends deadline for all pending messages. Eg, if we pull a message one second before the extension is scheduled to run, we send a modack for that message anyway even if we have a while before the message expires. This accounts for some messages getting a second one even though you ack quickly.
It's definitely possible to make this smarter in the future. It's a little finicky and caused bugs in the past though, so we're being a little cautious with it.

That said, we're not being that inefficient. We keep a distribution of how long you take to ack messages and periodically adjust the stream's deadline to 99.9%th percentile so for the vast majority of the messages, you should get fewer than 2 modacks.

Right now if you set the max extension to 5 minutes, we might extend it to more than 5. This is a bug: if you can tolerate it, you can set it to 5 now. I'll work on a fix for this. Note that you should generally set the max extension "unreasonably high" since setting it too low will result in duplicate messages. This feature was actually meant as a stopgap to make sure messages aren't stuck forever if we forget to ack them.

@danoscarmike danoscarmike added triaged for GA priority: p2 Moderately-important priority. Fix may not be included in next release. and removed priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Jan 11, 2018
@yihanzhen yihanzhen added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. status: blocked Resolving the issue is dependent on other work. labels Jan 30, 2018
@alex-kisialiou-sciencesoft
Copy link

alex-kisialiou-sciencesoft commented Mar 19, 2018

I found this:
https://cloud.google.com/pubsub/docs/reference/rest/
and i guess it'll take a day or so to create your own client that covers your needs.

@kir-titievsky
Copy link
Author

@alex-kisialiou-sciencesoft You might find this useful: https://developers.google.com/api-client-library/java/apis/pubsub/v1, but you could try the lower level auto-generated client for gRPC as well (see the synchronous pull example https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-sync-java)

@yonran
Copy link

yonran commented Jun 12, 2018

We ran into this issue on our GCS Object Change Notification (very small messages) subscribers and repeated it in a test program. After upgrading google-cloud-java from before 0.21.1-beta to after 0.42.0-beta, the subscriber works fine until it falls behind. Once it falls behind, the PubSub server starts sending duplicates about every hour, so that over half of messages are duplicates. The Subscriber can easily fall further and further behind. Therefore, the Subscriber either needs to process messages very quickly, or it needs to cache several hours of message ids to ack duplicates quickly. After contacting GCP support, they told me that this is a known bug with StreamingPull of small messages, and that a workaround is to downgrade to google-cloud-pubsub 0.21.1-beta.

While we wait for a fix to the StreamingPull server API, would it be possible for google-cloud-pubsub library to offer an option to use the Pull API instead of StreamingPull?

@kir-titievsky
Copy link
Author

kir-titievsky commented Jun 13, 2018 via email

@yonran
Copy link

yonran commented Jun 14, 2018

@kir-titievsky, thank you for the synchronous pull sample using the Pull GRPC API directly. However, we have written a number of MessageReceiver/Subscribers which used to work and we want to avoid rewriting them.

Might you say why you take more than an hour to acknowledge a message once you get it?

My client-side code acknowledges all messages in a matter of seconds (In my RawPubSub.java version, I request one more message at a time from the server). The client never holds on to a message for more than a few seconds. I believe this is a server-side bug in the StreamingPull API which results in duplicate messages.

@kir-titievsky
Copy link
Author

That sounds like a bug somewhere. The re-delivery after an hour, in particular, makes me suspicious. If you could, might you file a separate bug with a reproduction? If not,

  1. Might you open a support case with GCP support, if you have a support plan, detailing when you observed this on what project and subscription?
  2. If not, could you send the same to [email protected]? No guarantees, but I might be able to take a look.

An alternative explanation for this behavior is that the acks never succeed. Which might make this a client-side bug. But hard to tell.

@luann-trend
Copy link

We have experienced similar kind of issue here with the java google-cloud-pubsub lib GA version 1.31.0. We don't get the duplicate messages but the messages seem stuck in the queue even though we send ack back. After restarts the clients, the stuck messages got cleared up.

@pongad
Copy link
Contributor

pongad commented Jun 21, 2018

@luann-trend What does "stuck" here mean? Are new messages not being processed?

@luann-trend
Copy link

@pongad The new messages still being processed, only there couple hundreds message keep get redelivered but not able to process for some reason. We have experienced the same issues on 2 different cluster environments after about 4-5 days start using the new Java Google-pubsub client GA version.

@pongad
Copy link
Contributor

pongad commented Jun 22, 2018

@luann-trend This is interesting. Is it possible that the messages are causing you to throw exception? We catch exception and nack messages automatically, assuming that user code failed to process it. Do you know the duration of time between redelivers?

@chingor13
Copy link
Contributor

Should have been fixed in #3743

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. status: blocked Resolving the issue is dependent on other work. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests