Skip to content

Commit

Permalink
KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeft…
Browse files Browse the repository at this point in the history
…Side (apache#15601)

The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input.

Reviewers: Greg Harris <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
raminqaf authored May 29, 2024
1 parent 897cab2 commit b73f479
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();

final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>(
true,
final KStreamKStreamJoinLeftSide<K, V1, V2, VOut> joinThis = new KStreamKStreamJoinLeftSide<>(
otherWindowStore.name(),
internalWindows,
joiner,
Expand All @@ -182,8 +181,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final KStream<K, V1> lhs,
sharedTimeTrackerSupplier
);

final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamJoin<>(
false,
final KStreamKStreamJoinRightSide<K, V1, V2, VOut> joinOther = new KStreamKStreamJoinRightSide<>(
thisWindowStore.name(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
Expand All @@ -43,7 +42,7 @@
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;

class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);

private final String otherWindowName;
Expand All @@ -55,28 +54,22 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final long windowsAfterMs;

private final boolean outer;
private final boolean isLeftSide;
private final Optional<String> outerJoinWindowName;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner;
private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner;

private final TimeTrackerSupplier sharedTimeTrackerSupplier;

KStreamKStreamJoin(final boolean isLeftSide,
final String otherWindowName,
KStreamKStreamJoin(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner,
final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final long joinBeforeMs,
final long joinAfterMs,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
this.isLeftSide = isLeftSide;
this.otherWindowName = otherWindowName;
if (isLeftSide) {
this.joinBeforeMs = windows.beforeMs;
this.joinAfterMs = windows.afterMs;
} else {
this.joinBeforeMs = windows.afterMs;
this.joinAfterMs = windows.beforeMs;
}
this.joinBeforeMs = joinBeforeMs;
this.joinAfterMs = joinAfterMs;
this.windowsAfterMs = windows.afterMs;
this.windowsBeforeMs = windows.beforeMs;
this.joinGraceMs = windows.gracePeriodMs();
Expand All @@ -87,15 +80,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
}

@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKStreamJoinProcessor();
}

private class KStreamKStreamJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
private WindowStore<K, V2> otherWindowStore;
protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> {
private WindowStore<K, VOther> otherWindowStore;
private Sensor droppedRecordsSensor;
private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>> outerJoinStore = Optional.empty();
private InternalProcessorContext<K, VOut> internalProcessorContext;
private TimeTracker sharedTimeTracker;

Expand All @@ -122,13 +110,10 @@ public void init(final ProcessorContext<K, VOut> context) {
}
}

@SuppressWarnings("unchecked")
@Override
public void process(final Record<K, V1> record) {
public void process(final Record<K, VThis> record) {

final long inputRecordTimestamp = record.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);

sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);

Expand All @@ -144,26 +129,11 @@ public void process(final Record<K, V1> record) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
}

boolean needOuterJoin = outer;
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;

outerJoinStore.ifPresent(store -> {
// use putIfAbsent to first read and see if there's any values for the key,
// if yes delete the key, otherwise do not issue a put;
// we may delete some values with the same key early but since we are going
// range over all values of the same key even after failure, since the other window-store
// is only cleaned up by stream time, so this is okay for at-least-once.
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), null);
});

context().forward(
record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value))
.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
}
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
try (final WindowStoreIterator<VOther> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
final boolean needOuterJoin = outer && !iter.hasNext();
iter.forEachRemaining(otherRecord -> emitInnerJoin(record, otherRecord, inputRecordTimestamp));

if (needOuterJoin) {
// The maxStreamTime contains the max time observed in both sides of the join.
Expand All @@ -187,17 +157,24 @@ public void process(final Record<K, V1> record) {
context().forward(record.withValue(joiner.apply(record.key(), record.value(), null)));
} else {
sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
outerJoinStore.ifPresent(store -> store.put(
TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp),
LeftOrRightValue.make(isLeftSide, record.value())));
putInOuterJoinStore(record);
}
}
}
}

private void emitNonJoinedOuterRecords(
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> store,
final Record<K, V1> record) {
protected abstract TimestampedKeyAndJoinSide<K> makeThisKey(final K key, final long inputRecordTimestamp);

protected abstract LeftOrRightValue<VLeft, VRight> makeThisValue(final VThis thisValue);

protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp);

protected abstract VThis getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);

protected abstract VOther getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);

private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store,
final Record<K, VThis> record) {

// calling `store.all()` creates an iterator what is an expensive operation on RocksDB;
// to reduce runtime cost, we try to avoid paying those cost
Expand All @@ -221,26 +198,24 @@ private void emitNonJoinedOuterRecords(
// reset to MAX_VALUE in case the store is empty
sharedTimeTracker.minTime = Long.MAX_VALUE;

try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> it = store.all()) {
try (final KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> it = store.all()) {
TimestampedKeyAndJoinSide<K> prevKey = null;

boolean outerJoinLeftWindowOpen = false;
boolean outerJoinRightWindowOpen = false;
while (it.hasNext()) {
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key;
sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp();
if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
// if windows are open for both joinSides we can break since there are no more candidates to emit
break;
}
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key;
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
sharedTimeTracker.minTime = timestamp;

// Continue with the next outer record if window for this joinSide has not closed yet
// There might be an outer record for the other joinSide which window has not closed yet
// We rely on the <timestamp><left/right-boolean><key> ordering of KeyValueIterator
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) {
if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) {
if (timestampedKeyAndJoinSide.isLeftSide()) {
outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side
} else {
Expand All @@ -249,13 +224,9 @@ private void emitNonJoinedOuterRecords(
// We continue with the next outer record
continue;
}

final K key = timestampedKeyAndJoinSide.getKey();
final LeftOrRightValue<V1, V2> leftOrRightValue = next.value;
final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue);
context().forward(
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
);

final LeftOrRightValue<VLeft, VRight> leftOrRightValue = nextKeyValue.value;
forwardNonJoinedOuterRecords(record, timestampedKeyAndJoinSide, leftOrRightValue);

if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) {
// blind-delete the previous key from the outer window store now it is emitted;
Expand All @@ -275,20 +246,22 @@ private void emitNonJoinedOuterRecords(
}
}

@SuppressWarnings("unchecked")
private VOut getNullJoinedValue(
final K key,
final LeftOrRightValue<V1, V2> leftOrRightValue) {
// depending on the JoinSide fill in the joiner key and joiner values
if (isLeftSide) {
return joiner.apply(key,
leftOrRightValue.getLeftValue(),
leftOrRightValue.getRightValue());
} else {
return joiner.apply(key,
(V1) leftOrRightValue.getRightValue(),
(V2) leftOrRightValue.getLeftValue());
}
private void forwardNonJoinedOuterRecords(final Record<K, VThis> record,
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
final LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
final K key = timestampedKeyAndJoinSide.getKey();
final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
final VThis thisValue = getThisValue(leftOrRightValue);
final VOther otherValue = getOtherValue(leftOrRightValue);
final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue);
context().forward(
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
);
}

private boolean isOuterJoinWindowOpen(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
return sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime;
}

private long getOuterJoinLookBackTimeMs(
Expand All @@ -301,6 +274,31 @@ private long getOuterJoinLookBackTimeMs(
}
}

private void emitInnerJoin(final Record<K, VThis> thisRecord, final KeyValue<Long, VOther> otherRecord,
final long inputRecordTimestamp) {
outerJoinStore.ifPresent(store -> {
// use putIfAbsent to first read and see if there's any values for the key,
// if yes delete the key, otherwise do not issue a put;
// we may delete some values with the same key early but since we are going
// range over all values of the same key even after failure, since the other window-store
// is only cleaned up by stream time, so this is okay for at-least-once.
final TimestampedKeyAndJoinSide<K> otherKey = makeOtherKey(thisRecord.key(), otherRecord.key);
store.putIfAbsent(otherKey, null);
});

context().forward(
thisRecord.withValue(joiner.apply(thisRecord.key(), thisRecord.value(), otherRecord.value))
.withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
}

private void putInOuterJoinStore(final Record<K, VThis> thisRecord) {
outerJoinStore.ifPresent(store -> {
final TimestampedKeyAndJoinSide<K> thisKey = makeThisKey(thisRecord.key(), thisRecord.timestamp());
final LeftOrRightValue<VLeft, VRight> thisValue = makeThisValue(thisRecord.value());
store.put(thisKey, thisValue);
});
}

@Override
public void close() {
sharedTimeTrackerSupplier.remove(context().taskId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;

import java.util.Optional;

class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> extends KStreamKStreamJoin<K, VLeft, VRight, VOut, VLeft, VRight> {

KStreamKStreamJoinLeftSide(final String otherWindowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super VLeft, ? super VRight, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
final TimeTrackerSupplier sharedTimeTrackerSupplier) {
super(otherWindowName, windows, joiner, outer, outerJoinWindowName, windows.beforeMs, windows.afterMs,
sharedTimeTrackerSupplier);
}

@Override
public Processor<K, VLeft, K, VOut> get() {
return new KStreamKStreamJoinLeftProcessor();
}

private class KStreamKStreamJoinLeftProcessor extends KStreamKStreamJoinProcessor {

@Override
public TimestampedKeyAndJoinSide<K> makeThisKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeLeft(key, timestamp);
}

@Override
public LeftOrRightValue<VLeft, VRight> makeThisValue(final VLeft thisValue) {
return LeftOrRightValue.makeLeftValue(thisValue);
}

@Override
public TimestampedKeyAndJoinSide<K> makeOtherKey(final K key, final long timestamp) {
return TimestampedKeyAndJoinSide.makeRight(key, timestamp);
}

@Override
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
}

@Override
public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue();
}
}
}
Loading

0 comments on commit b73f479

Please sign in to comment.