From cfcf7a9b96c4d3d221a1ec1f05d898f8763642f4 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Fri, 4 Oct 2024 11:15:24 +0200 Subject: [PATCH] [FLINK-36379] Optimize committers with UC disabled Without UCs, a committer doesn't need to do anything on #processInput except collecting. It emits only on notifyCheckpointCompleted (or endInput for batch). We can also harden some contracts: * NotifyCheckpointCompleted can assert that all committables are received. * Emit committables downstream only if all committables are finished. (cherry picked from commit 7f40ab97258bb323d11ef3c7efcf8761fd34bce0) --- .../operators/sink/CommitterOperator.java | 9 +----- .../CheckpointCommittableManagerImpl.java | 29 +++++++++++++++++-- .../sink/CommitterOperatorTestBase.java | 3 +- .../CheckpointCommittableManagerImplTest.java | 13 +++++---- 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 02db28fa8a338..d155f5dd509f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -181,7 +181,7 @@ private void commitAndEmitCheckpoints() throws IOException, InterruptedException private void commitAndEmit(CheckpointCommittableManager committableManager) throws IOException, InterruptedException { Collection> committed = committableManager.commit(committer); - if (emitDownstream && !committed.isEmpty()) { + if (emitDownstream && committableManager.isFinished()) { int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); output.collect( @@ -201,13 +201,6 @@ private void retryWithDelay() { @Override public void processElement(StreamRecord> element) throws Exception { committableCollector.addMessage(element.getValue()); - - // in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the - // committables - long checkpointId = element.getValue().getCheckpointIdOrEOI(); - if (checkpointId <= lastCompletedCheckpointId) { - commitAndEmitCheckpoints(); - } } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 99d321d4cd197..bb6cceead47a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,13 +169,37 @@ public Collection> commit(Committer committ return committed; } - Collection> getPendingRequests(boolean onlyIfFullyReceived) { + Collection> getPendingRequests(boolean assertFull) { return subtasksCommittableManagers.values().stream() - .filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll()) + .peek(subtask -> assertReceivedAll(assertFull, subtask)) .flatMap(SubtaskCommittableManager::getPendingRequests) .collect(Collectors.toList()); } + /** + * For committers: Sinks don't use unaligned checkpoints, so we receive all committables of a + * given upstream task before the respective barrier. Thus, when the barrier reaches the + * committer, all committables of a specific checkpoint must have been received. Committing + * happens even later on notifyCheckpointComplete. + * + *

Global committers need to ensure that all committables of all subtasks have been received + * with {@link #hasGloballyReceivedAll()} before trying to commit. Naturally, this method then + * becomes a no-op. + * + *

Note that by transitivity, the assertion also holds for committables of subsumed + * checkpoints. + * + *

This assertion will fail in case of bugs in the writer or in the pre-commit topology if + * present. + */ + private void assertReceivedAll(boolean assertFull, SubtaskCommittableManager subtask) { + Preconditions.checkArgument( + !assertFull || subtask.hasReceivedAll(), + "Trying to commit incomplete batch of committables subtask=%s, manager=%s", + subtask.getSubtaskId(), + this); + } + Collection> drainFinished() { return subtasksCommittableManagers.values().stream() .flatMap(subtask -> subtask.drainCommitted().stream()) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index c24721884f680..e63ec66c41bf5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -101,7 +101,8 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { final CommittableWithLineage first = new CommittableWithLineage<>("1", 1L, 1); testHarness.processElement(new StreamRecord<>(first)); - testHarness.notifyOfCompletedCheckpoint(1); + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); assertThat(testHarness.getOutput()).isEmpty(); assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index b9c65c7c1674f..fcc2f559043b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -34,6 +34,7 @@ import java.util.HashMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; class CheckpointCommittableManagerImplTest { @@ -73,20 +74,20 @@ void testCommit() throws IOException, InterruptedException { final Committer committer = new NoOpCommitter(); // Only commit fully received committables - assertThat(checkpointCommittables.commit(committer)) - .hasSize(1) - .satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(3)); + assertThatCode(() -> checkpointCommittables.commit(committer)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); // Even on retry - assertThat(checkpointCommittables.commit(committer)).isEmpty(); + assertThatCode(() -> checkpointCommittables.commit(committer)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); // Add missing committable checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 1L, 2)); // Commit all committables assertThat(checkpointCommittables.commit(committer)) - .hasSize(2) + .hasSize(3) .extracting(CommittableWithLineage::getCommittable) - .containsExactlyInAnyOrder(4, 5); + .containsExactlyInAnyOrder(3, 4, 5); } @Test