diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 02db28fa8a338..d155f5dd509f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -181,7 +181,7 @@ private void commitAndEmitCheckpoints() throws IOException, InterruptedException private void commitAndEmit(CheckpointCommittableManager committableManager) throws IOException, InterruptedException { Collection> committed = committableManager.commit(committer); - if (emitDownstream && !committed.isEmpty()) { + if (emitDownstream && committableManager.isFinished()) { int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); output.collect( @@ -201,13 +201,6 @@ private void retryWithDelay() { @Override public void processElement(StreamRecord> element) throws Exception { committableCollector.addMessage(element.getValue()); - - // in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the - // committables - long checkpointId = element.getValue().getCheckpointIdOrEOI(); - if (checkpointId <= lastCompletedCheckpointId) { - commitAndEmitCheckpoints(); - } } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 99d321d4cd197..bb6cceead47a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,13 +169,37 @@ public Collection> commit(Committer committ return committed; } - Collection> getPendingRequests(boolean onlyIfFullyReceived) { + Collection> getPendingRequests(boolean assertFull) { return subtasksCommittableManagers.values().stream() - .filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll()) + .peek(subtask -> assertReceivedAll(assertFull, subtask)) .flatMap(SubtaskCommittableManager::getPendingRequests) .collect(Collectors.toList()); } + /** + * For committers: Sinks don't use unaligned checkpoints, so we receive all committables of a + * given upstream task before the respective barrier. Thus, when the barrier reaches the + * committer, all committables of a specific checkpoint must have been received. Committing + * happens even later on notifyCheckpointComplete. + * + *

Global committers need to ensure that all committables of all subtasks have been received + * with {@link #hasGloballyReceivedAll()} before trying to commit. Naturally, this method then + * becomes a no-op. + * + *

Note that by transitivity, the assertion also holds for committables of subsumed + * checkpoints. + * + *

This assertion will fail in case of bugs in the writer or in the pre-commit topology if + * present. + */ + private void assertReceivedAll(boolean assertFull, SubtaskCommittableManager subtask) { + Preconditions.checkArgument( + !assertFull || subtask.hasReceivedAll(), + "Trying to commit incomplete batch of committables subtask=%s, manager=%s", + subtask.getSubtaskId(), + this); + } + Collection> drainFinished() { return subtasksCommittableManagers.values().stream() .flatMap(subtask -> subtask.drainCommitted().stream()) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index c24721884f680..e63ec66c41bf5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -101,7 +101,8 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { final CommittableWithLineage first = new CommittableWithLineage<>("1", 1L, 1); testHarness.processElement(new StreamRecord<>(first)); - testHarness.notifyOfCompletedCheckpoint(1); + assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); assertThat(testHarness.getOutput()).isEmpty(); assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index b9c65c7c1674f..fcc2f559043b6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -34,6 +34,7 @@ import java.util.HashMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; class CheckpointCommittableManagerImplTest { @@ -73,20 +74,20 @@ void testCommit() throws IOException, InterruptedException { final Committer committer = new NoOpCommitter(); // Only commit fully received committables - assertThat(checkpointCommittables.commit(committer)) - .hasSize(1) - .satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(3)); + assertThatCode(() -> checkpointCommittables.commit(committer)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); // Even on retry - assertThat(checkpointCommittables.commit(committer)).isEmpty(); + assertThatCode(() -> checkpointCommittables.commit(committer)) + .hasMessageContaining("Trying to commit incomplete batch of committables"); // Add missing committable checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 1L, 2)); // Commit all committables assertThat(checkpointCommittables.commit(committer)) - .hasSize(2) + .hasSize(3) .extracting(CommittableWithLineage::getCommittable) - .containsExactlyInAnyOrder(4, 5); + .containsExactlyInAnyOrder(3, 4, 5); } @Test