-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36379] Improve (Global)Committer with UC disabled #25456
[FLINK-36379] Improve (Global)Committer with UC disabled #25456
Conversation
103eb53
to
b23171d
Compare
b23171d
to
1d91ea0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand the idea of the change, I wonder if it the right time or way.
We just cut 2.0 and this is potentially a breaking change since we do not control where users append the global committer in their sink topologies. It's very theoretical and the globalcommitter was marked @Experimental
but we should at least discuss it.
I am also a bit torn of the change of operation in the GlobalCommitter
and the usage of the underlying infra structure i.e. CheckpointCommittableManager
. I probably need another pass to fully understand how well the manager plays with the new semantics of the GlobalCommitter
.
* error. The state only includes incomplete checkpoints coming from upstream committers not | ||
* receiving {@link #notifyCheckpointComplete(long)}. All committables received are successful. | ||
* | ||
* <p>In rare cases, the GlobalCommitterOperator may be connected to a writer directly. In this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, the global committer can be at any point of the custom topology hooks behind some custom operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I'll clarify the comment - the code already acknowledges that.
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { | ||
checkpoint.commit(committer); | ||
if (!commitOnInput) { | ||
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Let's move the calculation of lastCompletedCheckpointId
into the commit()
method and add a parameter to the commit()
method for the current checkpoint.
// this is true for the last commit and we need to make sure that all committables are | ||
// indeed committed as this function will never be invoked again | ||
boolean waitForAllCommitted = | ||
lastCompletedCheckpointId == EOI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the following scenario happen?
On EOI
and commitOnInput=True
, we start running an infinite loop because we have not received all committables for EOI from the upstream task (committer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is a likely scenario. Hence the second part of this statement that will only be true iff there is an EOI committable and it has received all messages.
Then, the only way that could turn into infinite loop is when committables appear out of order (in respect to the checkpointId) which is possible with async retries. However, this applies also to the old version that loops on endInput
(remember that we need to emit committables post EOI for the final checkpoint)
It will be solved by the next PR that removes async retries.
* Looks for a committer in the pipeline and aborts on writer. The GlobalCommitter behaves | ||
* differently if there is a committer after the writer. | ||
*/ | ||
private static boolean hasUpstreamCommitter(DataStream<?> ds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How safe is this really? Users could have used the global committer with different custom operators in front of that follow either writer or committer semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. What happens is: if it finds a committer, the new optimized behavior applies. If it doesn't the old unoptimized behavior is used.
So the question is what happens if you have a committer and do stuff afterwards in such a way that the committer suddenly is not doing 2PC anymore until the data reaches the global committer. I haven't been able to find such a scenario. You could have a writer->committer->union->global committer where the union is also directly fed by the writer->union->global committer but even then the output of the committer participated in 2PC.
In short, I'm convinced it's safe that any upstream committer does 2PC already and the global committer can simply piggyback.
Remember that parts of the change is that the 3PC commit protocol that we currently have is is not working properly with final checkpoints. With this fix, we get a working 2PC and "only" continue to violate endInput.
.flatMap(SubtaskCommittableManager::getPendingRequests) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
/** | ||
* Sinks don't use unaligned checkpoints, so we receive all committables of a given upstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This javadoc is now a bit confusing since global committer and committer follow different committing and receival strategies.
As discussed offline. The cut was just for 2.0-preview and there is going to be another cut for the actual 2.0.
It's much easier to reason about it if we disregard async retries (next PR fixes it): In the optimized way, the The state naturally is also important when the global committer also performs 2PC because there is no upstream committer. Then, the behavior is virtually the same as now. |
42b5082
to
e71c4a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comments % please double check that org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testSinkDisorderChangeLogWithRank
doesn't fail because of this change 👍
I double-checked: the test uses the |
@flinkbot run azure |
Use more of the assertj native patterns to compare results.
Global committers used to trail a full checkpoint behind committer. That means that data appeared only after >2*checkpoint interval in the sinks that use it (e.g. delta). However, committables in the global committers are already part of the first checkpoint and are idempotent: On recovery, they are resend from the committer to the global committer. Thus, the global committer can actually be seen as stateless and doesn't need to conduct its own 2PC protocol. This commit lets the global committer collect all committables on input (as before) but immediately tries to commit when it has received all (deducible from CommitterSummary - which was always the original intent of that message). Thus, in most cases, GlobalCommitter ignores notifyCheckpointCompleted now as the state of the checkpoint can be inferred by received committables from upstream. There are special cases where a global committer is directly chained to a writer. In this case, the global committer does need to conduct a 2PC protocol in place of the committer. To differentiate these cases, the global committer now has its own transformation.
Without UCs, a committer doesn't need to do anything on #processInput except collecting. It emits only on notifyCheckpointCompleted (or endInput for batch). We can also harden some contracts: * NotifyCheckpointCompleted can assert that all committables are received. * Emit committables downstream only if all committables are finished.
e71c4a4
to
833fc93
Compare
@flinkbot run azure |
Failure is a known issue FLINK-36591. |
What is the purpose of the change
FLINK-36287 disabled UC for all inter-sink connections to adhere to the contract of notifyCheckpointCompleted. This allows us to remove some special casing and improve global committer.
Brief change log
See commit messages for more details.
Verifying this change
Covered by tests in
I modified / extended the former two suites.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation