Skip to content

Commit

Permalink
[FLINK-36368] Optimize committers with UC disabled
Browse files Browse the repository at this point in the history
Without UCs, a committer doesn't need to do anything on #processInput except collecting. It emits only on notifyCheckpointCompleted (or endInput for batch).

We can also harden some contracts:
* NotifyCheckpointCompleted can assert that all committables are received.
* Emit committables downstream only if all committables are finished.
  • Loading branch information
AHeise committed Oct 8, 2024
1 parent c18fea3 commit a29a943
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private void commitAndEmitCheckpoints() throws IOException, InterruptedException
private void commitAndEmit(CheckpointCommittableManager<CommT> committableManager)
throws IOException, InterruptedException {
Collection<CommittableWithLineage<CommT>> committed = committableManager.commit(committer);
if (emitDownstream && !committed.isEmpty()) {
if (emitDownstream && committableManager.isFinished()) {
int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
output.collect(
Expand All @@ -204,13 +204,6 @@ private void retryWithDelay() {
@Override
public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception {
committableCollector.addMessage(element.getValue());

// in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the
// committables
long checkpointId = element.getValue().getCheckpointIdOrEOI();
if (checkpointId <= lastCompletedCheckpointId) {
commitAndEmitCheckpoints();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,13 +169,33 @@ public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committ
return committed;
}

Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean onlyIfFullyReceived) {
Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean assertFull) {
return subtasksCommittableManagers.values().stream()
.filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll())
.peek(subtask -> assertReceivedAll(assertFull, subtask))
.flatMap(SubtaskCommittableManager::getPendingRequests)
.collect(Collectors.toList());
}

/**
* Sinks don't use unaligned checkpoints, so we receive all committables of a given upstream
* task before the respective barrier. Thus, when the barrier reaches the committer, all
* committables of a specific checkpoint must have been received. Committing happens even later
* on notifyCheckpointComplete.
*
* <p>Note that by transitivity, the assertion also holds for committables of subsumed
* checkpoints.
*
* <p>This assertion will fail in case of bugs in the writer or in the pre-commit topology if
* present.
*/
private void assertReceivedAll(boolean assertFull, SubtaskCommittableManager<CommT> subtask) {
Preconditions.checkArgument(
!assertFull || subtask.hasReceivedAll(),
"Trying to commit incomplete batch of committables subtask=%s, manager=%s",
subtask.getSubtaskId(),
this);
}

Collection<CommittableWithLineage<CommT>> drainFinished() {
return subtasksCommittableManagers.values().stream()
.flatMap(subtask -> subtask.drainCommitted().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class CheckpointCommittableManagerImplTest {
Expand Down Expand Up @@ -73,20 +74,20 @@ void testCommit() throws IOException, InterruptedException {

final Committer<Integer> committer = new NoOpCommitter();
// Only commit fully received committables
assertThat(checkpointCommittables.commit(committer))
.hasSize(1)
.satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(3));
assertThatCode(() -> checkpointCommittables.commit(committer))
.hasMessageContaining("Trying to commit incomplete batch of committables");

// Even on retry
assertThat(checkpointCommittables.commit(committer)).isEmpty();
assertThatCode(() -> checkpointCommittables.commit(committer))
.hasMessageContaining("Trying to commit incomplete batch of committables");

// Add missing committable
checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 1L, 2));
// Commit all committables
assertThat(checkpointCommittables.commit(committer))
.hasSize(2)
.hasSize(3)
.extracting(CommittableWithLineage::getCommittable)
.containsExactlyInAnyOrder(4, 5);
.containsExactlyInAnyOrder(3, 4, 5);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", 1L, 1);
testHarness.processElement(new StreamRecord<>(first));

testHarness.notifyOfCompletedCheckpoint(1);
assertThatCode(() -> testHarness.notifyOfCompletedCheckpoint(1))
.hasMessageContaining("Trying to commit incomplete batch of committables");

assertThat(testHarness.getOutput()).isEmpty();
assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
Expand Down

0 comments on commit a29a943

Please sign in to comment.