Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10847: Fix spurious results on left/outer stream-stream joins #10462

Merged
merged 12 commits into from
Apr 29, 2021
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
files="StreamsMetricsImpl.java"/>

<suppress checks="NPathComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl).java"/>
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin).java"/>

<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
Expand Down Expand Up @@ -211,7 +211,7 @@
files="Murmur3Test.java"/>

<suppress checks="MethodLength"
files="KStreamSlidingWindowAggregateTest.java"/>
files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest).java"/>

<suppress checks="ClassFanOutComplexity"
files="StreamTaskTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K

static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";

static final String OUTERSHARED_NAME = "KSTREAM-OUTERSHARED-";

static final String SOURCE_NAME = "KSTREAM-SOURCE-";

static final String SINK_NAME = "KSTREAM-SINK-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
Expand All @@ -27,23 +29,45 @@
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyAndJoinSideSerde;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;

class KStreamImplJoin {

private final InternalStreamsBuilder builder;
private final boolean leftOuter;
private final boolean rightOuter;

static class MaxObservedStreamTime {
private long maxObservedStreamTime = ConsumerRecord.NO_TIMESTAMP;

public void advance(final long streamTime) {
maxObservedStreamTime = Math.max(streamTime, maxObservedStreamTime);
}

public long get() {
return maxObservedStreamTime;
}
}

KStreamImplJoin(final InternalStreamsBuilder builder,
final boolean leftOuter,
Expand Down Expand Up @@ -118,20 +142,47 @@ public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);

Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter) {
final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";

// Get the suffix index of the joinThisGeneratedName to build the outer join store name.
final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
+ joinThisGeneratedName.substring(
rightOuter
? KStreamImpl.OUTERTHIS_NAME.length()
: KStreamImpl.JOINTHIS_NAME.length());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand? Why use "outer" or " this" here? If the store is shared, neither one seems to make sense? Overall naming of processor and stores is tricky.. Can we actually add a corresponding test that compares generated and expected TopologyDescription for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially generated a name with a new index for the shared store. However, seems this was going to cause incompatibilities in the topology because the new indexed increasing. Instead, now I just get the index from one of the current join stores. Why doesn't make sense? Is there another way to get an index? Or, do I really need to append an index at the end of the shared store?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should not use one more index to avoid compatibility issues... Maybe the question is really (just for my better understanding), what would the name be, ie, could be give a concrete example (with and without Named parameter)? That is also why I asked for a test using TopologyDescription -- makes it easier to wrap my head around.


final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;

outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
}

// Time shared between joins to keep track of the maximum stream time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only accessed single-thread, using an atomic long feels a bit overkill. We could probably maintain the "long maxObservedStreamTime" in this class, and pass in a ObservedStreamTime interface to the two joiners which just have a setter / getter to read and write to the local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One disadvantage compared to using context.streamTime() would be, that MaxObservedStreamTime would be reset to zero on rebalance/restart. (Or we need to add additional code to preserve it...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's extremely subtle, but we cannot use context.streamTime() because of the time-delay effects of upstream record caches. This was the cause of a severe bug in suppress that went undetected until after it was released.

For example: if we have a record cache upstream of this join, it will delay the propogation of records (and their accompanying timestamps) by time amount D. Say we ingest some record with timestamp T. If we reference the context's stream time, our processor will think it is at time T, when it is really at time T - D, leading it to behave wrongly, such as enforcing the grace period prematurely, which will manifest to users as data loss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax is correct that there is a bug that the processor-local stream time gets reset on rebalance/restart. It would be good to fix it, but with the current architecture, the only correct solution is to persist the processor-local stream time. Another approach we've discussed is to remove the time-delay effect of the record cache.

final MaxObservedStreamTime maxObservedStreamTime = new MaxObservedStreamTime();

final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
true,
otherWindowStore.name(),
windows.beforeMs,
windows.afterMs,
windows.gracePeriodMs(),
joiner,
leftOuter
leftOuter,
outerJoinWindowStore.map(StoreBuilder::name),
maxObservedStreamTime
);

final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(
false,
thisWindowStore.name(),
windows.afterMs,
windows.beforeMs,
windows.gracePeriodMs(),
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter
rightOuter,
outerJoinWindowStore.map(StoreBuilder::name),
maxObservedStreamTime
);

final PassThrough<K1, R> joinMerge = new PassThrough<>();
Expand All @@ -149,6 +200,7 @@ public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
.withOtherWindowStoreBuilder(otherWindowStore)
.withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
.withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
.withOuterJoinWindowStoreBuilder(outerJoinWindowStore)
.withValueJoiner(joiner)
.withNodeName(joinMergeName);

Expand Down Expand Up @@ -211,6 +263,66 @@ private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(fin
return builder;
}

@SuppressWarnings("unchecked")
private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String storeName,
final JoinWindows windows,
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
persistentTimeOrderedWindowStore(
storeName + "-store",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size())
),
new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
Time.SYSTEM
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass a Time reference here to allow us to mock time in tests if necesssary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require more changes just to allow that. The KStreamImplJoin constructor, where we could overload to pass a Time mock object, is only used by the KStreamImplJoinImpl class. The tests use the StreamsBuilder to create the joins, and they do not accept a Time object.

Also, the Stores class, which is called by KStreamImplJoin, does not mock it. Maybe because the same code changes required just for that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Maybe good enough as-is. (Fair point that we don't mock it in other stores either -- maybe there was never any demand to be able to mock it. As you said, we could change it as follow up if needed.)

);
if (streamJoinedInternal.loggingEnabled()) {
builder.withLoggingEnabled(streamJoinedInternal.logConfig());
} else {
builder.withLoggingDisabled();
}

return builder;
}

// This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is
// a non-public API, so we need to keep duplicate code until it becomes public.
private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName,
final Duration retentionPeriod,
final Duration windowSize) {
Objects.requireNonNull(storeName, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);

final long segmentInterval = Math.max(retentionMs / 2, 60_000L);

if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (segmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ storeName + " must be no smaller than its window size. Got size=["
+ windowSizeMs + "], retention=[" + retentionMs + "]");
}

return new RocksDbWindowBytesStoreSupplier(
storeName,
retentionMs,
segmentInterval,
windowSizeMs,
true,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIME_ORDERED_WINDOW_STORE);
}

private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.function.Predicate;

import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;

class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
Expand All @@ -38,20 +45,33 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private final String otherWindowName;
private final long joinBeforeMs;
private final long joinAfterMs;
private final long joinGraceMs;

private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
private final boolean outer;
private final Optional<String> outerJoinWindowName;
private final boolean isLeftSide;

private final KStreamImplJoin.MaxObservedStreamTime maxObservedStreamTime;

KStreamKStreamJoin(final String otherWindowName,
KStreamKStreamJoin(final boolean isLeftSide,
final String otherWindowName,
final long joinBeforeMs,
final long joinAfterMs,
final long joinGraceMs,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner,
final boolean outer) {
final boolean outer,
final Optional<String> outerJoinWindowName,
final KStreamImplJoin.MaxObservedStreamTime maxObservedStreamTime) {
this.isLeftSide = isLeftSide;
this.otherWindowName = otherWindowName;
this.joinBeforeMs = joinBeforeMs;
this.joinAfterMs = joinAfterMs;
this.joinGraceMs = joinGraceMs;
this.joiner = joiner;
this.outer = outer;
this.outerJoinWindowName = outerJoinWindowName;
this.maxObservedStreamTime = maxObservedStreamTime;
}

@Override
Expand All @@ -60,21 +80,24 @@ public Processor<K, V1> get() {
}

private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : I think this function can just be inlined now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();

private WindowStore<K, V2> otherWindow;
private WindowStore<K, V2> otherWindowStore;
private StreamsMetricsImpl metrics;
private Sensor droppedRecordsSensor;
private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
otherWindowStore = context.getStateStore(otherWindowName);
outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
}


@Override
public void process(final K key, final V1 value) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
Expand All @@ -98,18 +121,92 @@ key, value, context().topic(), context().partition(), context().offset()
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);

try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
maxObservedStreamTime.advance(inputRecordTimestamp);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side improvement: I think we should skip late record directly and also record it in TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena @mjsax I left another comment below regarding the time. Please LMK id you think that makes sense.

// Emit all non-joined records which window has closed
if (inputRecordTimestamp == maxObservedStreamTime.get()) {
outerJoinWindowStore.ifPresent(store -> emitNonJoinedOuterRecords(store));
}

try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(key, timeFrom, timeTo)) {
while (iter.hasNext()) {
needOuterJoin = false;
final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side improvement: atm windowStore does not have a guarantee of a strict retention time, ie, even if retention time passed, it may still have expired data in it, and return it. Thus, we should have an additional check if otherRecordTimestamp < stream-time - windowSize - gracePeriod and drop the "other record" for this case, to get a strict time bound (we don't need to report this in any metric).

We should extend our tests accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Is this a current bug with the old join semantics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it a bug in the current implementation...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this comment was not address yet. Or do you not want to add this additional fix into this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena just ping to make sure you get this on the follow-up PR.


outerJoinWindowStore.ifPresent(store -> {
// Delete the joined record from the non-joined outer window store
store.put(KeyAndJoinSide.make(!isLeftSide, key), null, otherRecordTimestamp);
});

context().forward(
key,
joiner.apply(key, value, otherRecord.value),
To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
}

if (needOuterJoin) {
context().forward(key, joiner.apply(key, value, null));
// The maxStreamTime contains the max time observed in both sides of the join.
// Having access to the time observed in the other join side fixes the following
// problem:
//
// Say we have a window size of 5 seconds
// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
// The record is not processed yet, and is added to the outer-join store
// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
// The record is not processed yet, and is added to the outer-join store
// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
// It is time to look at the expired records. T10 and T2 should be emitted, but
// because T2 was late, then it is not fetched by the window store, so it is not processed
//
// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
//
// This condition below allows us to process the out-of-order records without the need
// to hold it in the temporary outer store
if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the second condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the motivation is that if the current record's timestamp is too small (i.e. it is too late), then it should not be added into the book-keeping store but can be "expired" immediately. But I also feel the condition seems a bit off here: for the record to be "too late", its timestamp just need to be smaller than the expiration boundary, which is observed-stream-time - join-after - grace-period, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, while we should have a check like this, it seems it should go to the top of this method, next to the key/value null check? We should also add a corresponding lateRecordDropSensor (cf KStreamWindowAggregate.java).

We can also return from process() early, as if we have a late record, we know that stream-time does not advance and thus we don't need to emit anything downstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena seems there are a few different conditions we can consider here:

  1. record time < stream time - window length - grace length: the record is too late, we should drop it up front and also record the droppedRecordsSensorOrExpiredWindowRecordDropSensor.

  2. record time >= stream time - window length - grace length, but < stream time: the record is still late, but joinable, since the stream time would not be advanced we would not have to check and emit non-joined records, but just try to join this record with the other window. Note that like @mjsax said, for the returned matching record, we also need to check if the other record time >= stream time - window length - grace length or not.

  3. record time > stream time, we would first try to emit non-joined records, and then try to join this record.

context().forward(key, joiner.apply(key, value, null));
} else {
outerJoinWindowStore.ifPresent(store -> store.put(
KeyAndJoinSide.make(isLeftSide, key),
LeftOrRightValue.make(isLeftSide, value),
inputRecordTimestamp));
}
}
}
}

@SuppressWarnings("unchecked")
private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
while (it.hasNext()) {
final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();

final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
final LeftOrRightValue value = record.value;

// Skip next records if window has not closed
if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= maxObservedStreamTime.get()) {
break;
}

final K key = windowedKey.key().getKey();
final long time = windowedKey.window().start();

final R nullJoinedValue;
if (isLeftSide) {
nullJoinedValue = joiner.apply(key,
(V1) value.getLeftValue(),
(V2) value.getRightValue());
} else {
nullJoinedValue = joiner.apply(key,
(V1) value.getRightValue(),
(V2) value.getLeftValue());
}

context().forward(key, nullJoinedValue, To.all().withTimestamp(time));

// Delete the key from the outer window store now it is emitted
store.put(record.key.key(), null, record.key.window().start());
}
}
}
Expand Down
Loading