Skip to content

Commit

Permalink
KAFKA-15417: move outerJoinBreak-flags out of the loop (#15510)
Browse files Browse the repository at this point in the history
Follow up PR for #14426 to fix a bug introduced by the previous PR.

Cf #14426 (comment)

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
VictorvandenHoven authored Apr 2, 2024
1 parent b3116f4 commit ee61bb7
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ public void init(final ProcessorContext<K, VOut> context) {
@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {

final long inputRecordTimestamp = record.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);

sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);

if (outer && record.key() == null && record.value() != null) {
Expand Down Expand Up @@ -193,7 +195,6 @@ public void process(final Record<K, V1> record) {
}
}

@SuppressWarnings("unchecked")
private void emitNonJoinedOuterRecords(
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> store,
final Record<K, V1> record) {
Expand Down Expand Up @@ -223,43 +224,35 @@ private void emitNonJoinedOuterRecords(
try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) {
TimestampedKeyAndJoinSide<K> prevKey = null;

boolean outerJoinLeftWindowOpen = false;
boolean outerJoinRightWindowOpen = false;
while (it.hasNext()) {
boolean outerJoinLeftBreak = false;
boolean outerJoinRightBreak = false;
if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
// if windows are open for both joinSides we can break since there are no more candidates to emit
break;
}
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key;
final LeftOrRightValue<V1, V2> value = next.value;
final K key = timestampedKeyAndJoinSide.getKey();
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
sharedTimeTracker.minTime = timestamp;

// Skip next records if window has not closed
// Continue with the next outer record if window for this joinSide has not closed yet
// There might be an outer record for the other joinSide which window has not closed yet
// We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) {
if (timestampedKeyAndJoinSide.isLeftSide()) {
outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side
outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side
} else {
outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side
outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side
}
if (outerJoinLeftBreak && outerJoinRightBreak) {
break; // there are no more candidates to emit on left-outerJoin-side and
// right-outerJoin-side
} else {
continue; // there are possibly candidates left on the other outerJoin-side
}
}

final VOut nullJoinedValue;
if (isLeftSide) {
nullJoinedValue = joiner.apply(key,
value.getLeftValue(),
value.getRightValue());
} else {
nullJoinedValue = joiner.apply(key,
(V1) value.getRightValue(),
(V2) value.getLeftValue());
// We continue with the next outer record
continue;
}


final K key = timestampedKeyAndJoinSide.getKey();
final LeftOrRightValue<V1, V2> leftOrRightValue = next.value;
final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue);
context().forward(
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
);
Expand All @@ -272,7 +265,6 @@ private void emitNonJoinedOuterRecords(
// we do not use delete() calls since it would incur extra get()
store.put(prevKey, null);
}

prevKey = timestampedKeyAndJoinSide;
}

Expand All @@ -283,7 +275,24 @@ private void emitNonJoinedOuterRecords(
}
}

private long getOuterJoinLookBackTimeMs(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
@SuppressWarnings("unchecked")
private VOut getNullJoinedValue(
final K key,
final LeftOrRightValue<V1, V2> leftOrRightValue) {
// depending on the JoinSide fill in the joiner key and joiner values
if (isLeftSide) {
return joiner.apply(key,
leftOrRightValue.getLeftValue(),
leftOrRightValue.getRightValue());
} else {
return joiner.apply(key,
(V1) leftOrRightValue.getRightValue(),
(V2) leftOrRightValue.getLeftValue());
}
}

private long getOuterJoinLookBackTimeMs(
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
// depending on the JoinSide we fill in the outerJoinLookBackTimeMs
if (timestampedKeyAndJoinSide.isLeftSide()) {
return windowsAfterMs; // On the left-JoinSide we look back in time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,10 @@ public void testGracePeriod() {
// joined records because the window has ended, but will not produce non-joined records because the window has not closed.
// w1 = { 0:A0 (ts: 0) }
// w2 = { 1:a1 (ts: 0) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
inputTopic2.pipeInput(0, "a0", 101L);
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 101) }
// --> w2 = { 1:a1 (ts: 0), 0:a0 (ts: 101) }
inputTopic1.pipeInput(1, "A1", 101L);
inputTopic2.pipeInput(0, "a0", 101L);
processor.checkAndClearProcessResult();

// push a dummy item to the any stream after the window is closed; this should produced all expired non-joined records because
Expand All @@ -511,14 +511,109 @@ public void testGracePeriod() {
// w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
// --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
inputTopic2.pipeInput(0, "dummy", 211);
inputTopic2.pipeInput(0, "dummy", 112);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "null+a1", 0L),
new KeyValueTimestamp<>(0, "A0+null", 0L)
);
}
}

@Test
public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);

joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
);
joined.process(supplier);

final Collection<Set<String>> copartitionGroups =
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();

assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
final TestInputTopic<Integer, String> inputTopic1 =
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<Integer, String> inputTopic2 =
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();

// push one item to the primary stream; this should not produce any items because there are no matching keys
// and window has not ended
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0 (ts: 29) }
// --> w2 = {}
inputTopic1.pipeInput(0, "A0", 29L);
processor.checkAndClearProcessResult();

// push another item to the primary stream; this should not produce any items because there are no matching keys
// and window has not ended
// w1 = { 0:A0 (ts: 29) }
// w2 = {}
// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
// --> w2 = {}
inputTopic1.pipeInput(1, "A1", 30L);
processor.checkAndClearProcessResult();

// push one item to the other stream; this should not produce any items because there are no matching keys
// and window has not ended
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
// w2 = {}
// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
// --> w2 = { 2:a2 (ts: 31) }
inputTopic2.pipeInput(2, "a2", 31L);
processor.checkAndClearProcessResult();

// push another item to the other stream; this should produce no inner joined-items because there are no matching keys
// and window has not ended
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
// w2 = { 2:a2 (ts: 31) }
// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
// --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36) }
inputTopic2.pipeInput(3, "a3", 36L);
processor.checkAndClearProcessResult();

// push another item to the other stream; this should produce no inner joined-items because there are no matching keys
// and should produce a right-join-item because before window has ended
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
// w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36) }
// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
// --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37) }
inputTopic2.pipeInput(4, "a4", 37L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(2, "null+a2", 31L)
);

// push another item to the other stream; this should produce no inner joined-items because there are no matching keys
// and should produce a left-join-item because after window has ended
// and should produce two right-join-items because before window has ended
// w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
// w2 = { 2:a0 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37) }
// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
// --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37), 5:a5 (ts: 50) }
inputTopic2.pipeInput(5, "a5", 50L);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(0, "A0+null", 29L),
new KeyValueTimestamp<>(3, "null+a3", 36L),
new KeyValueTimestamp<>(4, "null+a4", 37L)
);
}
}

@Test
public void testOuterJoinWithInMemoryCustomSuppliers() {
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
Expand Down Expand Up @@ -727,7 +822,7 @@ public void testWindowing() {
}

@Test
public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};

Expand Down Expand Up @@ -814,7 +909,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
}

@Test
public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
public void testShouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};

Expand Down Expand Up @@ -906,7 +1001,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
* behavior so that we can make decisions about defining it in the future.
*/
@Test
public void shouldForwardCurrentHeaders() {
public void testShouldForwardCurrentHeaders() {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -1331,7 +1426,7 @@ public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params)
}

@Test
public void shouldJoinWithNonTimestampedStore() {
public void testShouldJoinWithNonTimestampedStore() {
final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
final StreamJoined<Integer, String, String> streamJoined =
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
Expand Down

0 comments on commit ee61bb7

Please sign in to comment.