From ee61bb721eecb0404929f125fe43392f3d024453 Mon Sep 17 00:00:00 2001 From: Victor van den Hoven Date: Tue, 2 Apr 2024 15:46:54 +0200 Subject: [PATCH] KAFKA-15417: move outerJoinBreak-flags out of the loop (#15510) Follow up PR for https://github.com/apache/kafka/pull/14426 to fix a bug introduced by the previous PR. Cf https://github.com/apache/kafka/pull/14426#discussion_r1518681146 Reviewers: Matthias J. Sax --- .../kstream/internals/KStreamKStreamJoin.java | 65 +++++----- .../KStreamKStreamOuterJoinTest.java | 111 ++++++++++++++++-- 2 files changed, 140 insertions(+), 36 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 124386b9bc3ae..b8b48ff2c4df6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -125,9 +125,11 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { + final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); if (outer && record.key() == null && record.value() != null) { @@ -193,7 +195,6 @@ public void process(final Record record) { } } - @SuppressWarnings("unchecked") private void emitNonJoinedOuterRecords( final KeyValueStore, LeftOrRightValue> store, final Record record) { @@ -223,43 +224,35 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; + boolean outerJoinLeftWindowOpen = false; + boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { - boolean outerJoinLeftBreak = false; - boolean outerJoinRightBreak = false; + if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { + // if windows are open for both joinSides we can break since there are no more candidates to emit + break; + } final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; - final LeftOrRightValue value = next.value; - final K key = timestampedKeyAndJoinSide.getKey(); final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestamp; - // Skip next records if window has not closed + // Continue with the next outer record if window for this joinSide has not closed yet + // There might be an outer record for the other joinSide which window has not closed yet + // We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { - outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side + outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { - outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side + outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side } - if (outerJoinLeftBreak && outerJoinRightBreak) { - break; // there are no more candidates to emit on left-outerJoin-side and - // right-outerJoin-side - } else { - continue; // there are possibly candidates left on the other outerJoin-side - } - } - - final VOut nullJoinedValue; - if (isLeftSide) { - nullJoinedValue = joiner.apply(key, - value.getLeftValue(), - value.getRightValue()); - } else { - nullJoinedValue = joiner.apply(key, - (V1) value.getRightValue(), - (V2) value.getLeftValue()); + // We continue with the next outer record + continue; } - + + final K key = timestampedKeyAndJoinSide.getKey(); + final LeftOrRightValue leftOrRightValue = next.value; + final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); context().forward( record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) ); @@ -272,7 +265,6 @@ private void emitNonJoinedOuterRecords( // we do not use delete() calls since it would incur extra get() store.put(prevKey, null); } - prevKey = timestampedKeyAndJoinSide; } @@ -283,7 +275,24 @@ private void emitNonJoinedOuterRecords( } } - private long getOuterJoinLookBackTimeMs(final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide) { + @SuppressWarnings("unchecked") + private VOut getNullJoinedValue( + final K key, + final LeftOrRightValue leftOrRightValue) { + // depending on the JoinSide fill in the joiner key and joiner values + if (isLeftSide) { + return joiner.apply(key, + leftOrRightValue.getLeftValue(), + leftOrRightValue.getRightValue()); + } else { + return joiner.apply(key, + (V1) leftOrRightValue.getRightValue(), + (V2) leftOrRightValue.getLeftValue()); + } + } + + private long getOuterJoinLookBackTimeMs( + final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide) { // depending on the JoinSide we fill in the outerJoinLookBackTimeMs if (timestampedKeyAndJoinSide.isLeftSide()) { return windowsAfterMs; // On the left-JoinSide we look back in time diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java index 28a5f1488fbce..279c21ef61a40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java @@ -499,10 +499,10 @@ public void testGracePeriod() { // joined records because the window has ended, but will not produce non-joined records because the window has not closed. // w1 = { 0:A0 (ts: 0) } // w2 = { 1:a1 (ts: 0) } - // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } - // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } - inputTopic2.pipeInput(0, "a0", 101L); + // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 101) } + // --> w2 = { 1:a1 (ts: 0), 0:a0 (ts: 101) } inputTopic1.pipeInput(1, "A1", 101L); + inputTopic2.pipeInput(0, "a0", 101L); processor.checkAndClearProcessResult(); // push a dummy item to the any stream after the window is closed; this should produced all expired non-joined records because @@ -511,7 +511,7 @@ public void testGracePeriod() { // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) } - inputTopic2.pipeInput(0, "dummy", 211); + inputTopic2.pipeInput(0, "dummy", 112); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(1, "null+a1", 0L), new KeyValueTimestamp<>(0, "A0+null", 0L) @@ -519,6 +519,101 @@ public void testGracePeriod() { } } + @Test + public void testEmitAllNonJoinedResultsForAsymmetricWindow() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream1; + final KStream stream2; + final KStream joined; + final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + ); + joined.process(supplier); + + final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { + final TestInputTopic inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockApiProcessor processor = supplier.theCapturedProcessor(); + + // push one item to the primary stream; this should not produce any items because there are no matching keys + // and window has not ended + // w1 = {} + // w2 = {} + // --> w1 = { 0:A0 (ts: 29) } + // --> w2 = {} + inputTopic1.pipeInput(0, "A0", 29L); + processor.checkAndClearProcessResult(); + + // push another item to the primary stream; this should not produce any items because there are no matching keys + // and window has not ended + // w1 = { 0:A0 (ts: 29) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = {} + inputTopic1.pipeInput(1, "A1", 30L); + processor.checkAndClearProcessResult(); + + // push one item to the other stream; this should not produce any items because there are no matching keys + // and window has not ended + // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31) } + inputTopic2.pipeInput(2, "a2", 31L); + processor.checkAndClearProcessResult(); + + // push another item to the other stream; this should produce no inner joined-items because there are no matching keys + // and window has not ended + // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } + // w2 = { 2:a2 (ts: 31) } + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36) } + inputTopic2.pipeInput(3, "a3", 36L); + processor.checkAndClearProcessResult(); + + // push another item to the other stream; this should produce no inner joined-items because there are no matching keys + // and should produce a right-join-item because before window has ended + // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) } + // w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36) } + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37) } + inputTopic2.pipeInput(4, "a4", 37L); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "null+a2", 31L) + ); + + // push another item to the other stream; this should produce no inner joined-items because there are no matching keys + // and should produce a left-join-item because after window has ended + // and should produce two right-join-items because before window has ended + // w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // w2 = { 2:a0 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37) } + // --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) } + // --> w2 = { 2:a2 (ts: 31), 3:a3 (ts: 36), 4:a4 (ts: 37), 5:a5 (ts: 50) } + inputTopic2.pipeInput(5, "a5", 50L); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 29L), + new KeyValueTimestamp<>(3, "null+a3", 36L), + new KeyValueTimestamp<>(4, "null+a4", 37L) + ); + } + } + @Test public void testOuterJoinWithInMemoryCustomSuppliers() { final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)); @@ -727,7 +822,7 @@ public void testWindowing() { } @Test - public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { + public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { final StreamsBuilder builder = new StreamsBuilder(); final int[] expectedKeys = new int[] {0, 1, 2, 3}; @@ -814,7 +909,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { } @Test - public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { + public void testShouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { final StreamsBuilder builder = new StreamsBuilder(); final int[] expectedKeys = new int[] {0, 1, 2, 3}; @@ -906,7 +1001,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { * behavior so that we can make decisions about defining it in the future. */ @Test - public void shouldForwardCurrentHeaders() { + public void testShouldForwardCurrentHeaders() { final StreamsBuilder builder = new StreamsBuilder(); final KStream stream1; @@ -1331,7 +1426,7 @@ public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) } @Test - public void shouldJoinWithNonTimestampedStore() { + public void testShouldJoinWithNonTimestampedStore() { final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers(); final StreamJoined streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())