Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When the producer or client is normally closed, data will be lost #12195

Closed

Conversation

lordcheng10
Copy link
Contributor

@lordcheng10 lordcheng10 commented Sep 26, 2021

Motivation

In the following example, the data will be lost(The data cannot be consumed):
` try {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();

        Producer<byte[]> producer = client.newProducer().topic("test1").create();

        producer.sendAsync("hello".getBytes());
        client.close();
    } catch (PulsarClientException e) {
        e.printStackTrace();
    }

    Thread.sleep(1000);`

The reason is because the producer did not send the data in the client buffer when it was closed;

The correct approach is to first flush data when closing。

…data in the client buffer is not flushed, and data may be lost
@lordcheng10 lordcheng10 changed the title When calling producer.close or client.close to close the client, the … When the producer or client is normally closed, data will be lost Sep 26, 2021
@lordcheng10
Copy link
Contributor Author

Can you review it? @eolivelli

@hezhangjian
Copy link
Member

This method is closeAsync, I would think call flushAsync would be better.
You can add a tests, the example is a little flaky, batch producer with batch time (like 5s) will be better

@Anonymitaet
Copy link
Member

@lordcheng10 Thanks for your contribution. For this PR, do we need to update docs?

(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@lordcheng10
Copy link
Contributor Author

lordcheng10 commented Sep 26, 2021

flushAsync

Thank you for your attention! The cleanup logic can only be done after the flush is completed. If the flushAsync method is used instead of the flush method, there is no guarantee that all the data has been sent to the server before the cleanup logic is completed;

@lordcheng10
Copy link
Contributor Author

@lordcheng10 Thanks for your contribution. For this PR, do we need to update docs?

(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

Okay, when this discussion is completed, I will update docs。

@hezhangjian
Copy link
Member

I mean that when the flushAsync future complete, then do the logic. So the closeAsync method will not block

@lordcheng10
Copy link
Contributor Author

lordcheng10 commented Sep 26, 2021

I mean that when the flushAsync future complete, then do the logic. So the closeAsync method will not block

You are right, I will try to modify it

@lordcheng10
Copy link
Contributor Author

I mean that when the flushAsync future complete, then do the logic. So the closeAsync method will not block

@Shoothzj As we just discussed, I replaced flush with flushAsync and resubmitted the code.

@hezhangjian
Copy link
Member

LGTM @eolivelli, @codelipenghui, @BewareMyPower, @sijie, @hangc0276, @merlimat - PTAL, thanks.

@BewareMyPower
Copy link
Contributor

IMO it's the right behavior. close or closeAsync should not be responsible to flush the internal messages.

@merlimat
Copy link
Contributor

I think it's correct to flush everything on a (graceful) close, though I would not characterize it as "data lost" since the send futures will not be successful.

We should add testing for the behavior.

@eolivelli
Copy link
Contributor

I understand that current behaviour may be surprising for users.
Probably it may be good to add a method 'flush and close' but this can be easily implemented by the application.

We must ensure that the javadoc are clear about the behaviour.

I suggest you to start a discussion on [email protected] in order to reach out to a bigger audience.
In any case this kind of changes will need a PIP because it would be an important behaviour change

@BewareMyPower
Copy link
Contributor

I agree with @eolivelli . IMO It should be implemented as a new API like flushAndClose, or some extra configs for client or producer.

@lordcheng10
Copy link
Contributor Author

lordcheng10 commented Sep 26, 2021

Thank you for your attention!

For similar producer close logic, we can refer to Kafka's processing:
image

It may be more reasonable for the producer to flush during close. When closing, sending the unsent data to the server is a common demand of users. No user would be more willing to discard the data in the buffer.

Of course, we can also provide a close method with a timeout to satisfy some users who do not want the close method to take too much time.

@BewareMyPower @eolivelli @merlimat @sijie @hangc0276

@merlimat
Copy link
Contributor

The flushing on close was already the pre-existing behavior, though that got lost at some point.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Sep 27, 2021

The flushing on close was already the pre-existing behavior

@merlimat I'm afraid not, in current ProducerImpl#closeAsync, before sending CommandClose, producer only cancels all timers, then call clearPendingMessagesWhenClose to complete all pending messages' callbacks with AlreadyClosedException.

Of course, we can also provide a close method with a timeout to satisfy some users who do not want the close method to take too much time.

It's not so easy like you might think. We need to handle more corner cases if you added the flush semantics to close.

First, we cannot assume pending messages are sent quickly. If your buffer memory is large enough, it might take long time to close. Assuming you have 100000 pending messages and in the timeout, only 20000 messages are persisted. What will you do now?

  1. Continue sending the left 80000 messages. It means your application continues after close, but the background thread is still running. To ensure messages are not lost, you need to keep the connection open. Then it's interesting that a client is closed while the resources are still active, including the connection, pending messages, etc.
  2. Discard the left 80000 messages. Then data is lost. In addition, what's different from Kafka, Pulsar producer will send a CommandCloseProducer to notify the broker to close the producer. What's more, it make your guarantee that messages are not lost after calling synchronous close not work.

At any case, when you choose sendAsync, you should always make use of the returned future to confirm the result of all messages. In Kafka, it's the send callback.

Assuming now close will wait until all messages are persisted. Then you might write following pseudo code.

for (int i = 0; i < N; i++) {
    producer.sendAsync("message-i");
}
producer.close();

What if close() failed? Then how would you check which messages are failed to send? You still need to do something with the future that sendAsync returns. Kafka's close only looks good for those don't care about any error and think all API calls should always succeed.

@michaeljmarshall
Copy link
Member

/**
* Close the producer and releases resources allocated.
*
* <p>No more writes will be accepted from this producer. Waits until all pending write request are persisted.
* In case of errors, pending writes will not be retried.
*
* @return a future that can used to track when the producer has been closed
*/
CompletableFuture<Void> closeAsync();

Given the Javadoc for closeAsync, I would expect the implementation to include a flush of all pending messages. Perhaps there is ambiguity for what constitutes a "pending write request"? I think a "pending write request" is any write request the producer has already accepted. This also aligns with the fact that the producer will not accept any more write requests.

First, we cannot assume pending messages are sent quickly. If your buffer memory is large enough, it might take long time to close. Assuming you have 100000 pending messages and in the timeout, only 20000 messages are persisted. What will you do now?

If we add a timeout to the closeAsync logic, I think it is reasonable to fail all pending work that is not completed before the deadline and clean up all resources. It is up to the implementation to decide how long to wait for messages to deliver, and we already let users configure this via the sendTimeoutMs in the producer's config. This config is already used to fail messages that haven't delivered after some period of time.

@lordcheng10
Copy link
Contributor Author

lordcheng10 commented Sep 28, 2021

I understand that current behaviour may be surprising for users.
Probably it may be good to add a method 'flush and close' but this can be easily implemented by the application.

We must ensure that the javadoc are clear about the behaviour.

I suggest you to start a discussion on [email protected] in order to reach out to a bigger audience.
In any case this kind of changes will need a PIP because it would be an important behaviour change

As you describe, I created a PIP: #12216 @eolivelli

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Sep 28, 2021

Perhaps there is ambiguity for what constitutes a "pending write request"?

I also found the problem. I think it's an inconsistency between the JavaDocs and the actual implementation. But the definition of pending write request is ambiguous. Should it be the inflight CommandSend network requests or all the pending messages in producer's buffer?

The description of flushAsync is

Flush all the messages buffered in the client

The description of closeAsync is

Waits until all pending write request are persisted.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Sep 28, 2021

@lordcheng10 If you're going to submit a PIP, please follow the PIP template.

But before submitting a PIP, it's better to send an email to [email protected] to start a discussion so that more people know the context.

@BewareMyPower
Copy link
Contributor

I've started a discussion, see https://lists.apache.org/thread.html/r8bfcb7ab28612d94d441ff5eadd996413346f0780b6f7b3484aaf7dc%40%3Cdev.pulsar.apache.org%3E

@liangyuanpeng
Copy link
Contributor

liangyuanpeng commented Sep 28, 2021

I believe that it will be resolve #11780 .

@lordcheng10
Copy link
Contributor Author

PIP template.

OK, according to the PIP template, I modified it again。

@lordcheng10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@lordcheng10
Copy link
Contributor Author

/pulsarbot run-failure-checks

Copy link
Member

@michaeljmarshall michaeljmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we're still discussing whether close implies flush on the mailing list, but I wanted to take a closer look at the actual implementation in this PR. If we do end up adding the flush, we'll need to update the Javadoc for close and closeAsync on the Producer interface.

Comment on lines 847 to 853
flushAsync().thenRun(() -> {
final State currentState = getAndUpdateState(state -> {
if (state == State.Closed) {
return state;
}
return State.Closing;
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should set the state of the producer to Closing before triggering the flush. Otherwise, a message could be added to batchMessageContainer after the flush and before the state is set to Closing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right!

@lordcheng10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@Anonymitaet
Copy link
Member

@lordcheng10 feel free to ping me if you need a doc review.

@github-actions
Copy link

github-actions bot commented Mar 1, 2022

The pr had no activity for 30 days, mark with Stale label.

@lordcheng10 lordcheng10 closed this Mar 1, 2022
@Anonymitaet Anonymitaet removed the doc-required Your PR changes impact docs and you will update later. label Mar 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants