-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9113, P2: Extract Producer to RecordCollector #2
Conversation
…ang/kafka into K9113-record-collector-p2
@@ -1149,7 +1149,8 @@ public void flush() { | |||
* block forever.</strong> | |||
* <p> | |||
* | |||
* @throws InterruptException If the thread is interrupted while blocked | |||
* @throws InterruptException If the thread is interrupted while blocked. | |||
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this documentation! Unfortunately, this text is a bit confounding.
As I read it, it says that the broker may return some errors, which are not documented as part of the public API. Therefore, the client just throws a generic exception, with no information about what happened, what it means, or what I can do about it. Is that good library design?
If I find myself with such an exception, I would be totally at a loss as to what to do next... I guess go read the broker logs and source code to find out what happened? If that's really the answer, then a generic exception is indeed appropriate. Also, this situation must be exceedingly rare and requiring human intervention on the brokers, right? Something like the broker running out of memory or disk?
Concretely, even if this is the gloomy situation right now, it's better to have it documented than not, but we should include some guidance. Such as saying that these exceptions are fatal and unrecoverable, and that they indicate the broker is unhealthy and the client is in an unknown state. Or (if applicable), that the exception may or may not be fatal, and that the user could optimistically attempt to retry if they want to. Or something. I'm not asking you to exhaustively document the situation, just apply the knowledge you already have to briefly provide a little more information to users.
Future generations can take on the work of actually categorizing and documenting the exceptions that can get thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I'm not sure how we could hit this exception in real life, but if we anticipate it could happen, there needs to plan action items to triage further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great, I will try to update it.
Hey @guozhangwang , thanks for rebasing this PR. I left one additional comment. I also responded to a few of the threads in the old PR. Nothing is a big deal to me, so feel free to merge the PR when you're ready. |
*/ | ||
@Override | ||
public void close(final boolean clean, | ||
final boolean isZombie) { | ||
public void close(final boolean clean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: about isZombie removal, seems like we just remove the logic of not aborting transaction when the task being closed is treated as zombie, is that correct? So does that mean the old logic is unnecessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it is not unnecessary, it is just that when I did the refactoring and move it to the record collector, what I did is that we only start txn when there is non inflight, and we commit / abort txn there is one inflight, e.g. as in maybeAbort:
if (eosEnabled && transactionInFlight)
So when we are closing because of a zombie, the txn may be inflight or not, in which case we would call abortTxn or not, and this logic is wrapped in the close
call and be agnostic to the task itself.
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
@Override | ||
public void init(final Producer<byte[], byte[]> producer) { | ||
this.producer = producer; | ||
public void initialize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
req: I'm inclined to remove this unnecessary interface unless for other compelling reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my TODO above: my intent is that initialize
would be called whenever a task transits state, this is because in the past we have a known issue that we call initTxn
first, and then because of the restoration takes very long time the transaction id actually gets expired even before we call the first beginTxn
.
I'm leaving it to another PR (supposedly by @vvcephei :) to do maintain this fix as we did now, which is that when we finished restoring and about to start processing, we initialize the thread's record collector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sgtm
} | ||
} | ||
|
||
public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: I have the same curiosity before, but if the offsets are not moving forward from last iteration, could we skip the commit as an optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we could -- my vague memory is that we used to do it on the stream-thread level, but it was later regressed because it is too complicated. I think maybe now is a better time to pick up this optimization again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change LGTM now with just one minor question, thanks so much for the work!
@@ -50,65 +57,135 @@ | |||
import java.util.Map; | |||
|
|||
public class RecordCollectorImpl implements RecordCollector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not convinced atm, that the record collector should handle on the committing. IMHO, the StreamThread
should unify this logic (especially with KIP-447 in mind)-- might be best to discuss in person.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we can discuss this in person definitely. When I was working on this I do not have the full picture of KIP-447 in mind yet. After reviewed your PR I think one way was to let the Record Collector be a per-thread object than per-task, and within the class it would maintain one or multiple producers, the benefits of this is to still maintain the committing within a task --- but again, this is just one way of doing it and I'm open for other options especially that in KIP-447 we think it is better to always commit all tasks together.
If we want to push the management of embedded clients to stream-thread while always committing all tasks when needed, maybe we can consider this:
- Let the task.commit() function to only handle the procedure of flushing / checkpointing state stores.
- And then at StreamThread we call
foreach (task) -> commit
and then make anothercommit
call which did similar logic as in RecordCollector's commit logic, branching on EOS-alpha / EOS-beta / non-EOS. And also let the StreamThread to keep a reference of all the producer(s) in order to make that call.
By doing this, only per-task RecordCollector and StreamThread would have reference to the producer (and StreamThread have reference to the consumer), and the former only used it to call produce / flush
, while the latter used it for commit / close
(the lifecycle of the producers are also managed by the thread).
Let's chat more in person about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That align with my thinking -- in fact, we could rename task.commit()
to something like prepareCommit()
? A task must for sure flush it's state -- not sure if it's need to flush the record collector -- flushing the record collector could be done by the stream-thread. This would decouple tasks even more (and IMHO better) from topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepareCommit()
sounds better.
Regarding the record collector: we need to flush the collector before we flush the state manager due to the following order:
- flush the state mgr which may generate records to send
- record-collector.flush -> producer.flush which would update the offset
- write the offsets to the checkpoint file.
- then commit.
1/2/3 would be in prepareCommit
in that order, and then 4) would be in stream-thread. With EOS-enabled, now if 4) failed I would wipe out the state-dir in closeDirty so that the checkpoint files would be wiped as well.
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below: Extract embedded clients (producer and consumer) into RecordCollector from StreamTask. guozhangwang#2 guozhangwang#5 Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread. guozhangwang#3 guozhangwang#4 Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state. guozhangwang#6 guozhangwang#7 Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)). guozhangwang#8 guozhangwang#9 Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2). guozhangwang#10 Reviewers: A. Sophie Blee-Goldman <[email protected]>, Bruno Cadonna <[email protected]>, Boyang Chen <[email protected]>
1.a Producer.send() could throw KafkaException whose cause is ProducerFenced;
1.b. Producer.send() could return other exceptions as passed from the callback (ProducerFenced and UnknownProducerId);
1.c. Consumer.commit() could throw CommitFailedException;
1.d Producer.[otherAPI] and Consumer.[otherAPI] could throw other KafkaException, including TimeoutException.
1.a/b/c are interpreted as TaskMigratedException and rethrown from RecordCollector; 1.d are interpreted as general StreamsException and rethrown from RecordCollector.
Task would not handle any such exception, all of them thrown all the way up to the StreamThread; thread handles TaskMigratedException by closing the task "uncleanly", and then handles any other StreamsException as fatal and shutdown itself --- this is a major change in exception handling, e.g. today when close(clean) we try to immediately capture the exception within task, and then immediately handle by re-try close(unclean), which causes very messy hierarchy. This is not completely done in this PR but we are going to that direction in future PRs.
ProcessorStateManager and RecordCollector are created before StreamTask / StandbyTask and are passed in as parameters; this also have a good effect in unit tests that we can use nice mocks for them when only testing task functionalities. Also because of this I've removed a bunch of unit tests from the task level (so do not be afraid of the LOC size, the non-testing part is actually not huge), and will move them to state-manager / record-collector after we've done the whole cleanup. During this period the test coverage would be dropped a bit but we will eventually add them back to other classes.
Task close / suspend procedure are cleaned up based on the clean flag (the isZombie flag is now consolidated into the previous one). Also as a side-effect we fixed the issue that double checkpointing in committing / closing.
Comments from apache#7846 are addressed already.
Committer Checklist (excluded from commit message)