Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Client with shared subscription is blocked #21104

Open
1 of 2 tasks
michalcukierman opened this issue Aug 31, 2023 · 29 comments
Open
1 of 2 tasks

[Bug] Client with shared subscription is blocked #21104

michalcukierman opened this issue Aug 31, 2023 · 29 comments
Assignees
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost type/bug The PR fixed a bug or issue reported a bug

Comments

@michalcukierman
Copy link

Search before asking

  • I searched in the issues and found nothing similar.

Version

Client - 3.1.0
Pulsar - 3.1.0 (and later builds)

Also reported on 3.0.1

Minimal reproduce step

My reproducible steps:

  1. Create persistent topic with 3 partitions
  2. Publish 1 mln messages (30KB)
  3. Run the client and consumer:
    PulsarClient client = PulsarClient.builder()
        .serviceUrl(this.pulsarBrokerUrl)
        .build();

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(5, TimeUnit.SECONDS)
        .subscribe();

What did you expect to see?

All messages are received

What did you see instead?

Client stops to receive messages, restart client helps, but it get stuck after some time.

Anything else?

The issue was originally created described here: #21082
@MichalKoziorowski-TomTom also faces the issue.

I've created new issue, because it in #21082 the author says that broker restart helps. In case of this issue, it looks like it's client related and some race condition observed in 3.x.x. after introducing ackTimeout

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@michalcukierman michalcukierman added the type/bug The PR fixed a bug or issue reported a bug label Aug 31, 2023
@codelipenghui codelipenghui added the category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost label Aug 31, 2023
@codelipenghui
Copy link
Contributor

@michalcukierman Could you please share the topic stats and internal-stats when the issue is reproduced?

@michalcukierman
Copy link
Author

michalcukierman commented Sep 1, 2023

Two files attached:
partitioned.stats.internal.txt
partitioned.stats.txt

I see that in stats we have:

        "availablePermits" : 0,
        "unackedMessages" : 25,

        "availablePermits" : 0,
        "unackedMessages" : 32,

, but I do ack all of the messages:

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(ackTimeout, TimeUnit.SECONDS)
        .subscribe();

    Producer<byte[]> producer = client.newProducer(Schema.BYTES)
        .topic(destinationTopic)
        .compressionType(CompressionType.LZ4)
        .maxPendingMessages(8)  // Support 5 MB files
        .blockIfQueueFull(true) // Support 5 MB files
        .batchingMaxBytes(5242880)
        .create();

    Multi<Messagemessages = Multi.createBy().repeating()
        .completionStage(consumer::receiveAsync)
        .until(m -> closed.get());

    messages.subscribe().with(msg -> {
      receivedDistribution.record(getKiloBytesSize(msg.getData()));
      Uni.createFrom().completionStage(producer.newMessage(Schema.BYTES)
              .key(msg.getKey())
              .value(msg.getValue().getContent().getPayload().toByteArray())
              .eventTime(msg.getEventTime()).sendAsync())
          .subscribe().with(msgId -> {
            sentCounter.increment();
            try {
              consumer.acknowledge(msg.getMessageId());
            } catch (PulsarClientException e) {
              throw new RuntimeException("Unable to send or ACK the message", e);
            }
          });
    });
  }

askTimeout is set to 120 seconds.

It can be confirmed in the log files, where sentCounter is just on message behind
Screenshot 2023-09-01 at 13 11 49

@michalcukierman
Copy link
Author

Here are the alternative stats:
#21082 (comment)

@michalcukierman
Copy link
Author

I think it happens:

  • there is ackTimeout set on a consumer
  • high throughput
  • the receviverQueue size is low

There may be a race condition in 3.1.0 client, as the situation was not observed with 2.10.4 (we've downgraded, also @MichalKoziorowski-TomTom reported this as a fix).

@mattisonchao mattisonchao self-assigned this Sep 3, 2023
@MichalKoziorowski-TomTom
Copy link
Contributor

MichalKoziorowski-TomTom commented Sep 4, 2023

We don't have ackTimeout and it still happens. In my case, my topic has only one partition.

@redoc123
Copy link

redoc123 commented Sep 5, 2023

you said 1 million messages right? Could you post your sample payload? I want to reproduce and debug.

@michalcukierman
Copy link
Author

It's 1 mln x 30 kb of sample HTML file. Any text file would be good.
What we are doing:

topic:source ( 1mln x 30k) -> app:router -> topic:destination

source has 3 partitions
destination has 6 partitions
Router has 2 replicas, uses shared subscription to read from source, writing to destination and ack's incoming message once the message is stored and acked on destination

@michalcukierman
Copy link
Author

Here is the exact payload:
30k.html.zip

@lvdelu
Copy link

lvdelu commented Sep 6, 2023

+1,it seems also occurred in other consumer mode(shared,keyshared)

@michalcukierman
Copy link
Author

Happened today again on different module.

topic - not partitioned, without retention
messages - 50k, around 5k each
subscription - shared, with two clients

The solution is to use 2.10.4 client for now.

@mattisonchao
Copy link
Member

I'll try to reproduce it and get back to you. :)

@mattisonchao
Copy link
Member

HI, @michalcukierman

I created a repo to reproduce this issue. But no luck.
Could you please help me to refine this test?

@michalcukierman
Copy link
Author

@mattisonchao
I'll have a time next week to get back to it.

@lvdelu
Copy link

lvdelu commented Sep 26, 2023

how about fix deadline ?

@mattisonchao
Copy link
Member

@lvdelu
Could you help to refine this test to help reproduce it?
repo

@michalcukierman
Copy link
Author

michalcukierman commented Sep 29, 2023

I've tried to create isolated environment to reproduce the issue on local machine using test containers, but I was not able today.
The environment I observe the issue is a GCP cluster with:

  • 5 x Bookkeeper
  • 3 x Broker
  • 3 x Proxy

It also happens on randomly, with 1 million messages It sometimes get stuck around 300k-500k messages, but it is not deterministic. I'll try to reproduce the issue on GCP, but I'll need more time. It may happen that more than one partition on more than one brokers are required. Unfortunately with the test containers I am not able to recreate the same load.

@michalcukierman
Copy link
Author

michalcukierman commented Oct 2, 2023

It looks like I was able to reproduce the issue in the two runs today (failed 2/2).

  • I've created SmallRye project (one producer to produce 1 mln messages, one consumer to read it using shared subscription)
  • The cluster is created on GCP using Pulsar operator from SN (great tool!)

The code is here:
https://github.com/michalcukierman/pulsar-21104

In general it's very much like in the bug description. Produce 1 mln messages of 30kb:

  @Outgoing("requests-out")
  public Multi<String> produce() {
    return Multi.createBy().repeating()
        .uni(() -> Uni
            .createFrom()
            .item(() -> RandomStringUtils.randomAlphabetic(30_000))
            .onItem()
            .invoke(() -> System.out.println("+ Produced: " + outCount.incrementAndGet()))
        )
        .atMost(1_000_000);
  }

Read it using client with shared subscription and write to another topic:

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}

The settings of the client are:

pulsar.client.serviceUrl=pulsar://brokers-broker:6650

mp.messaging.incoming.requests-in.subscriptionType=Shared
mp.messaging.incoming.requests-in.numIoThreads=4
mp.messaging.incoming.requests-in.subscriptionName=request-shared-subscription
mp.messaging.incoming.requests-in.ackTimeoutMillis=5000
mp.messaging.incoming.requests-in.subscriptionInitialPosition=Earliest
mp.messaging.incoming.requests-in.receiverQueueSize=8
mp.messaging.incoming.requests-in.topic=persistent://public/default/requests_4
mp.messaging.incoming.requests-in.connector=smallrye-pulsar

mp.messaging.outgoing.dump-out.topic=persistent://public/default/dump
mp.messaging.outgoing.dump-out.connector=smallrye-pulsar
mp.messaging.outgoing.dump-out.blockIfQueueFull=true
mp.messaging.outgoing.dump-out.maxPendingMessages=8
mp.messaging.outgoing.dump-out.maxPendingMessagesAcrossPartitions=12

The retention of the topic requests is set using Pulsar Admin in Java to -1 -1.

During two runs the consumer get stucked:
Screenshot 2023-10-02 at 21 12 42

@poorbarcode
Copy link
Contributor

poorbarcode commented Oct 25, 2023

@michalcukierman

I left a comment here, and you can answer the comment under current Issue, Thanks

@michalcukierman
Copy link
Author

michalcukierman commented Nov 1, 2023

Both issues may not be related, In both cases the subscriptions are blocked, but in this case the restart of the broker didn't help - it looks like a deadlock in the client.

@poorbarcode have you tried the repository I've linked?
https://github.com/michalcukierman/pulsar-21104

It's possible to reproduce it on GCP cluster. Should work on other clusters as well.

@MichalKoziorowski-TomTom
Copy link
Contributor

It might be fixed with #22352. Need to check...

@MichalKoziorowski-TomTom
Copy link
Contributor

@michalcukierman Could you recheck your case? I've checked my case and I can't reproduce with 3.0.4 client while it was easily reproducible with 3.0.1. Test with version that includes #22352 fix.

@michalcukierman
Copy link
Author

michalcukierman commented May 7, 2024

I could not reproduce the issue with Client 3.0.4, I can still reproduce the issue with Client 3.2.2
@codelipenghui the hprof is here: https://github.com/michalcukierman/pulsar-21104/blob/main/heap.hprof
It was taken on the execution of the code that is currently in the main branch (Pulsar Cluster 3.1.0 / Pulsar Client 3.2.2)

@michalcukierman
Copy link
Author

michalcukierman commented May 7, 2024

I've noticed that with Client 3.2.2 the behavior may be a bit different. The consumers get blocked, but occasionally resume. After a couple messages received are stuck again.
Stats are uploaded:
https://github.com/michalcukierman/pulsar-21104/tree/main/stats

I've confirmed once again, the issue does not occur with Pulsar Client 3.0.4 (or at least I was not able to reproduce it after processing 1 mln messages. With Client 3.2.2 usually clients are blocked after 50k messages).

@michalcukierman
Copy link
Author

@poorbarcode have you had chance to see the last comment?

@dao-jun
Copy link
Member

dao-jun commented May 30, 2024

I've noticed that with Client 3.2.2 the behavior may be a bit different. The consumers get blocked, but occasionally resume. After a couple messages received are stuck again. Stats are uploaded: https://github.com/michalcukierman/pulsar-21104/tree/main/stats

I've confirmed once again, the issue does not occur with Pulsar Client 3.0.4 (or at least I was not able to reproduce it after processing 1 mln messages. With Client 3.2.2 usually clients are blocked after 50k messages).

I check the stats and the internal-stats,
As I can see, there is about 30k messages in the topic, and there is a subscription named request-shared-subscription to consume the topic, but it seems none of the messages acknowledged, the markDeletedPosition is 0:-1 and individuallyDeletedMessages is empty:
image

Which means all the messages are unacked, or, in the backlog.
And I checked the stats, for the consumer of request-shared-subscription. I can see msgOutCounter=10002 which means how many messages dispatched to the consumer, unackedMessages=10001 means 10001 messages are unacked.
So, obviously, you only receive messages but didn't ack messages.

I guess the configuration maxUnackedMessagesPerConsumer in your broker.conf is 10000. So, since the unackedMessages=10001 exceeds the maxUnackedMessagesPerConsumer=10000, so the blockedConsumerOnUnackedMsgs field in the stats is true.
image

Which means there are toooo many unacked messages on the consumer so that the consumer stopped dispatch messages to clients.

Please ack message after process successfully.

@michalcukierman
Copy link
Author

@dao-jun,

The whole source code with the instructions on how to run it is available in the linked repository.
Note that the same example works when downgrading the client to 3.0.4 (see the videos).
#21104 (comment)

The message acknowledgment should be done after getting write confirmation from the producer.
#21104 (comment)

This is the original code that was failing. We are no longer using it because we've decommissioned the module.:
#21104 (comment)

#21104 was created as a reproducible example.

@michalcukierman
Copy link
Author

This code creates a pipeline with the source-processor-sink. Sink ACK triggers source ACK.

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}

@dao-jun
Copy link
Member

dao-jun commented May 31, 2024

@michalcukierman I'm not familiar with this kind of development framework, #21104 (comment) is based on the stats and internal-stats you provided. The point is: all the messages dispatched to clients are not acked.

You can debug your code to confirm the messages are acked or not

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

8 participants