Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent produce() sequence number fix #1050

Merged
merged 11 commits into from
Feb 9, 2022

Conversation

t-d-d
Copy link
Contributor

@t-d-d t-d-d commented Mar 21, 2021

Read and update sequence numbers immediately before the produce request.
Fixes #1005 Maybe #598

Putting this up for review and input but I'm still unsure as to the failure modes. I'm wondering whether we should decrement the sequence numbers if the broker.produce() call fails.

t-d-d added 2 commits March 21, 2021 17:08
Read/update sequence numbers immediately before produce request
@t-d-d
Copy link
Contributor Author

t-d-d commented Mar 22, 2021

More info on dealing with failures with inflight request > 1. Although I don't think we need this full implementation, as I don't think we need to guarantee ordering for concurrent requests (or we at least state that we don't.)

I think this PR as-is is an improvement to the current behaviour. But I think there are questions around concurrent invocations of produce() and inflight requests in the idempotent mode, especially dealing with errors.

@Nevon
Copy link
Collaborator

Nevon commented May 11, 2021

Although I don't think we need this full implementation, as I don't think we need to guarantee ordering for concurrent requests (or we at least state that we don't.)

This is correct. We state that you need to set maxInflightSomethingSomething to 1 in order to use the idempotent producer.

The KIP you linked to has some ideas about improving this by reassigning sequence numbers if there's an error - but I'd say that's a future improvement.

The case we're trying to solve here is where people are essentially doing:

await Promise.all([
  producer.send(),
  producer.send(),
  producer.send()
])

While the order between these three is of course arbitrary, assuming that there are no errors, all three requests should succeed. Currently, this is very likely to fail as the sequence number for all three requests are likely to be the same. This PR will change that so that the sequence numbers will always be incremented as soon as they are assigned. In fact, a future refactoring could change the interface to the EoS manager so that getting a sequence number automatically increments it, instead of relying on updating it separately.

What this PR doesn't solve are the failure cases - but as it also doesn't make them worse, I'm inclined to accept it anyway. Some notes about the failure cases below:


These notes are not really related to this PR, but I'm putting them here to increase our shared understanding.

From KIP-98:

For a given PID, sequence numbers will start from zero and be monotonically increasing, with one sequence number per topic partition produced to. The sequence number will be incremented by the producer on every message sent to the broker.

There was a comment in #598 that indicated that the sequence number should increase by one per request. This statement from KIP-98 can be read that way, but that's not what they mean. What they mean is just that there's a sequence number per topic partition, not that it's incremented by one per produce request. Our implementation is correct in that sense, that it increases with the number of records produced. Nothing to do here, just clearing something up.

It goes on:

The broker will reject a produce request if its sequence number is not exactly one greater than the last committed message from that PID/TopicPartition pair. Messages with a lower sequence number result in a duplicate error, which can be ignored by the producer. Messages with a higher number result in an out-of-sequence error, which indicates that some messages have been lost, and is fatal.
...
The Producer will raise an OutOfOrderSequenceException if the broker detects data loss. In other words, if it receives a sequence number which is greater than the sequence it expected. This exception will be returned in the Future and passed to the Callback, if any. This is a fatal exception, and future invocations of Producer methods like send, beginTransaction, commitTransaction, etc. will raise an IlegalStateException.

This makes me think that the current design of the transactional producer is a bit wonky. What I guess should happen is that if we receive and OUT_OF_ORDER_SEQUENCE_NUMBER, we should transition to a new state in the EoS manager that rejects immediately on any attempt to send any more. This is currently not handled anywhere that I can see. While we could reject any pending requests as well, those should receive an OUT_OF_ORDER_SEQUENCE_NUMBER anyway from the broker, so it's maybe not worth the hassle (getting them out of the queue and rejecting them can be a bit tricky).

There is also the gnarlier situation where we don't get an OUT_OF_ORDER_SEQUENCE_NUMBER, but the request fails anyway, for example due to a connection error where we run out of retries. In such a case, we can't know what the next sequence number is. In this case, it seems the only safe thing to do is to handle it in the same way as if we got an OUT_OF_ORDER_SEQUENCE_NUMBER error and reject any further use of the transactional producer, and require a clean slate with a new producer id etc.

@t-d-d
Copy link
Contributor Author

t-d-d commented Jul 25, 2021

@Nevon I have added code that reverts the sequence numbers in case of a failure. Now at least it will recover in the normal case of sequential produce() calls (for a topic-partition.)

For errors when there are concurrent produce() calls ongoing (for a topic-partition) the idempotent producer will die with a UNKNOWN_PRODUCER_ID or OUT_OF_ORDER_SEQUENCE_NUMBER error on any error. This could be made to work if the sequence number stuff was moved inside the mutex lock that limits maxInFlightRequests. It would be pretty simple to add another lock (per broker, or maybe even per partition?) in sendMessages.js

Test cases added that capture the current behaviour.

I just noticed

await Promise.all(requests)
should probably be an allSettled(). Would cause problems if not all the promises are settled before the retry.

@t-d-d t-d-d requested a review from Nevon July 25, 2021 09:56
Copy link
Collaborator

@Nevon Nevon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor note about whether or not we can remove the sleeps from the tests, but overall it looks good to me. Thanks for your patience!

)
})

it('concurrent produce() calls > all messages are written to the partition once', async () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry about changing this, but just for future reference, you can just wrap these test cases in a describe block to group the tests together, instead of repeating the prefix in each test case:

describe('concurrent produce() calls', () => {
  test('all messages are written to the partition at once')
})

src/producer/__tests__/idempotentProduceMessages.spec.js Outdated Show resolved Hide resolved
* @template T
*/

function allSettled(promises) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. We should actually bump the requirement on Node to 12 at this point, but that's outside the scope of this PR.

aikoven referenced this pull request in aikoven/kafkajs Jan 16, 2022
@Nevon Nevon merged commit 7cc43da into tulios:master Feb 9, 2022
@pi2sqr
Copy link

pi2sqr commented Dec 12, 2022

Hi,

I have a question.
I'm trying to do performance test for default topic: -topic kafka-test --replication-factor 2 --partitions 1.
No other settings different from default.

I've Producer and Consumer written in Java.
Producer tries to do async send 10 messages in loop one by one, each with 10kB volume record:

for (int i=0; i<numberOfRecords; i++) {
start_time = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaProp.getProperty("kafka.topic"), "key " + i, start_time + message);
Future sendFuture = producer.send(record);
if (sendFuture.isDone()) {
try {
sendFuture.get();
} catch (InterruptedException var7) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted", var7);
} catch (ExecutionException var8) {
throw new KafkaException("Send failed", var8.getCause());
}
}
}
producer.flush();
producer.close();

But even for one Producer I'm getting:
"...org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. for topic-partition kafka-test-0 with producerId 24053, epoch 851, and sequence number 1
13:19:36:569 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 24053 with epoch 852
"

What is strange for me, that the first 2050 messages are sent in less than a second.
Then it take some time to send next messages, 20ms up, but still 2050 x 10270B is not a size of topic's buffer.
The more Producers I have for one partition the more errors I'm getting.
From Consumer site I'm counting how many messages I'm not receiving and it's from 200 over 2000 with 1 Producer. For and more Producers its over a 60% of messages that are not received (maybe send correctly).

Can you advise?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The idempotent producer doesn't work properly when sending R request, R > # of partitions
3 participants