Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10847: Fix spurious results on left/outer stream-stream joins #10462

Merged
merged 12 commits into from
Apr 29, 2021

Conversation

spena
Copy link
Contributor

@spena spena commented Apr 1, 2021

Fixes the issue with https://issues.apache.org/jira/browse/KAFKA-10847.

To fix the above problem, the left/outer stream-stream join processor uses a buffer to hold non-joined records for some time until the window closes, so they are not processed if a join is found during the join window time. If the window of a record closes and a join was not found, then this should be emitted and processed by the consequent topology processor.

A new time-ordered window store is used to temporary hold records that do not have a join and keep the records keys ordered by time. The KStreamStreamJoin has a reference to this new store . For every non-joined record seen, the processor writes it to this new state store without processing it. When a joined record is seen, the processor deletes the joined record from the new state store to prevent further processing.

Records that were never joined at the end of the window + grace period are emitted to the next topology processor. I use the stream time to check for the expiry time for determinism results . The KStreamStreamJoin checks for expired records and emit them every time a new record is processed in the join processor.

The new state store is shared with the left and right join nodes. The new store needs to serialize the record keys using a combined key of <joinSide-recordKey>. This key combination helps to delete the records from the other join if a joined record is found. Two new serdes are created for this, KeyAndJoinSideSerde which serializes a boolean value that specifies the side where the key is found, and ValueOrOtherValueSerde that serializes either V1 or V2 based on where the key was found.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@spena spena force-pushed the fix_left_joins branch 2 times, most recently from c03261a to 9b6a93d Compare April 7, 2021 15:37
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One pass on the first commit.

*/
public class KeyAndJoinSide<K> {
private final K key;
private final boolean thisJoin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of using this / other, maybe we can use left/right to be more aligned with join terminologies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to leftJoin. But I seems you suggested adding two bool variables, one for left and another for rigth?


@Override
public String toString() {
return "<" + thisJoin + "," + key + ">";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with left / right, then we can update here as well to print left/right instead of true/false, i.e.: <left/right -> key>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

static byte[] rawKey(final byte[] data) {
final int rawValueLength = data.length - 1;

return ByteBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had some benchmarks before that reveals generally System.arraycopy(data, 1, newBytes, 0, rawValueLength); has better performance than ByteBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

static byte[] rawKey(final byte[] data) {
final int rawValueLength = data.length - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rawKeyLength?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
* which is found in the left topic, or V2 value if it is found in the right topic.
*/
public class ValueOrOtherValue<V1, V2> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: use left / right value terminology, i.e. maybe we can rename this class as LeftOrRightValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Override
public String toString() {
return "<" + thisValue + "," + otherValue + ">";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: better print as left/right: value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

static byte[] rawValue(final byte[] joinedValues) {
final int rawValueLength = joinedValues.length - 1;

return ByteBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: use array copy, since ByteBuffer#put uses a while-loop of public abstract ByteBuffer put(byte b) which is less efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final V2 otherValue;

private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
this.thisValue = thisValue;
Copy link
Contributor

@guozhangwang guozhangwang Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we an assertion that at most one of these two fields should be not null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return null;
}

final byte[] rawThisValue = (data.getThisValue() != null) ? thisSerializer.serialize(topic, data.getThisValue()) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify this a bit, since we know only one field would be not null; we can just declare a single rawValue here, and then the lines 46-54 can also be simplified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on the second commit; thanks for the added test coverage!!

Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter || rightOuter) {
final String outerJoinSuffix = "-shared-outer-join-store";
final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using newProcessorName would bump up the suffix index inside the builder by one, and hence all downstream processor names / store name suffixes are shifted by one, which would break compatibility. I'd suggest we do not use that function to generate name in any case, instead:

  1. if userProvidedBaseStoreName is provided, then use userProvidedBaseStoreName + outerJoinSuffix;
  2. otherwise, we piggy-back on the suffix index of the joinThisGeneratedName. E.g. if joinThisGeneratedName is KSTREAM-OUTERTHIS-0000X then this store name is KSTREAM-OUTERSHARED-0000X as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Duration.ofMillis(windows.size())
),
new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we used this/other because we have two joiners / join stores so the relation is vice-versa: from left side's point of view, V1 is this and V2 is other, from right side's point of view, V2 is this and V2 is other.

However here we only have one extra store, so we can just name them as left and right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the ValueOrOtherValueSerde -> LeftOrRightValueSerde? If so, then Done

retentionMs,
segmentInterval,
windowSizeMs,
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. why we do not want to retain duplicates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had issues with duplicates, and forgot to investigate about it. I just did another round of investigation, but I still get issues with it. The problem is I cannot delete any key when duplicates are used. This happens in any window store, not just the time-ordered window store.

The problem I found is:

  1. Added two duplicates with key = 0 and time = 0
# this adds a key with seqNum = 0
put(0, "A0", 0) 
# this adds a key with seqNum = 1
put(0, "A0-0", 0)
  1. Delete key = 0 and time = 0
# this attempts to delete with seqNum = 2, which it does not exist
put(0, null, 0)

Initially I didn't think using duplicates were necessary, but I just wrote a test case with the old semantics and duplicates are processed, so I need to support it. Do you know if deleting duplicates was unsupported all the time? or am I missing some API or workaround?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For existing windowed-stores with duplicates enables, we never call delete() but only rely on retention based deletion -- thus, it totally possible that is was never supported.

outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
}

// Time shared between joins to keep track of the maximum stream time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only accessed single-thread, using an atomic long feels a bit overkill. We could probably maintain the "long maxObservedStreamTime" in this class, and pass in a ObservedStreamTime interface to the two joiners which just have a setter / getter to read and write to the local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One disadvantage compared to using context.streamTime() would be, that MaxObservedStreamTime would be reset to zero on rebalance/restart. (Or we need to add additional code to preserve it...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's extremely subtle, but we cannot use context.streamTime() because of the time-delay effects of upstream record caches. This was the cause of a severe bug in suppress that went undetected until after it was released.

For example: if we have a record cache upstream of this join, it will delay the propogation of records (and their accompanying timestamps) by time amount D. Say we ingest some record with timestamp T. If we reference the context's stream time, our processor will think it is at time T, when it is really at time T - D, leading it to behave wrongly, such as enforcing the grace period prematurely, which will manifest to users as data loss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax is correct that there is a bug that the processor-local stream time gets reset on rebalance/restart. It would be good to fix it, but with the current architecture, the only correct solution is to persist the processor-local stream time. Another approach we've discussed is to remove the time-delay effect of the record cache.


private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
private final boolean outer;
private final Optional<String> outerJoinWindowName;
private final AtomicLong maxObservedStreamTime;
private final boolean thisJoin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here, we can rename it to leftJoin / rightJoin to indicate if this joiner is for left or right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting using two bool variables?

@@ -92,6 +113,10 @@ public void process(final K key, final V1 value) {
return;
}

// maxObservedStreamTime is updated and shared between left and right sides, so we can
// process a non-join record immediately if it is late
final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can move inputRecordTimestamp up and use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

outerJoinWindowStore.ifPresent(store -> {
// only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
// if the current record is late, then there is no need to check for expired records
if (inputRecordTimestamp == maxStreamTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment to be addressed for this PR, but for future optimizations right after this PR: currently we are likely to trigger this function every time, assuming stream time would advance most of the time --- this is the case in production --- while inside the maybeEmitOuterExpiryRecords, inside which we would consider grace period. As a result we may invoke rocksDB many times unnecessarily only to find condition 198 is satisfied immediately.

A possible heuristic is that, in line 198 below, before we break we remember the difference as previousObservedExpirationGap = e.key.window().end() + joinGraceMs - maxStreamTime. And also we remember the previous maxStreamTime when last maybeEmitOuterExpiryRecords is triggered. Then here we would only trigger the function if maxStreamTime - previousMaxStreamTimeWhenEmitTriggered >= previousObservedExpirationGap. In such ways we would trigger this function and hence search in rocksDB's starting position much less.

cc @mjsax @vvcephei WDYT?

// Emit the record by joining with a null value. But the order varies depending whether
// this join is using a reverse joiner or not. Also whether the returned record from the
// outer window store is a V1 or V2 value.
if (thisJoin) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here if we refactor to left / right then this logic can be simplified as well since we would only care whether the deserialized key/value are left or right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

// Delete the key from tne outer window store now it is emitted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo on comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Delete the key from tne outer window store now it is emitted
store.put(e.key.key(), null, e.key.window().start());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me about the test coverage: maybe we should also test that store.put / delete can be triggered while the iterator is open, and if the put / deleted elements would not be reflected from the iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is possible. If I put or delete a future record that the iterator hasn't seen, then the iterator will reflect that change if I continue iterating until the deleted or new record appears. This is how Rocksdb iterator works, isn't it? When asking the next record, it only reads one record from the store. If subsequent records are deleted or are new, then the iterator will read the update in the next next() call.

Or what test case do you have in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it's my bad, I mis read the code and thought you were deleting records while inside the

try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {

block. Please ignore this comment.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on the third commit.

@@ -161,7 +181,7 @@ public void process(final K key, final V1 value) {
//
// the condition below allows us to process the late record without the need
// to hold it in the temporary outer store
if (timeTo < maxStreamTime) {
if (internalOuterJoinFixDisabled || timeTo < maxStreamTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can refactor it as:

if (!outerJoinWindowStore.isPresent() || timeTo < maxStreamTime) {
    context().forward(key, joiner.apply(key, value, null));
} else {

}

Then internalOuterJoinFixDisabled can just be a local variable instead of a class field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One meta comment for simplifying the logic.

final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;

// Emit expired records before the joined record to keep time ordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can refactor the logic here as the following:

  1. suppose the received record timestamp is T1, the current stream time is T2 >= T1; and we found one or more matching record from the other side, with timestamp T1' <= T2' <= T3' etc. The joined record would have the timestamp of T1 = max(T1, T1'), T2 = max(T1, T2'), where T1 <= T2 <= ...

  2. After we get all the joined records, we do not call context.forward() yet, but just cache them locally.

  3. We then range query the expired records store, and generate the joined records (and also delete the records), again we do not call context.forward() yet, but just cache them locally.

  4. We merge sort on these two sorted-by-timestamp list, and then call context.forward() on the sorted join result records to emit.

In this we do not need the following complex logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @mjsax as well, LMK WDYT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's possible, but if the join result is large, we could run into memory issue buffering all join results?

Also, sorting could be expensive and we can actually avoid it, and still guarantee that results are emitted in timestamp order:

  • we know that left/outer join result would have the smallest timestamps and thus we can emit those first (given that we use timestamped-sorted store anyway, we just scan the store from old to new and emit
  • for the inner join result, we get the output sorted by timestamp, too, because for the join key, data is sorted in timestamp order in the store, too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trickiness as demonstrated in the current PR though, is that if we first do the expiration we may get records that are matched to the current processing record, which need to be skipped from deleting/emitting before the join.

I think it is still possible to simply the current logic without naive buffering. Because:

  1. The current processing record's timestamp T is no larger than the updated max stream time T';
  2. The current processing record's matching record's smallest timestamp would be (T - window-size);
  3. The expired records' largest timestamp would be (T' - window-size - grace-period), where grace-period >= 0.

In other words, if the current processing's record timestamp T is smaller than T' (i.e. it's a late record and hence did not advance the stream time), then for all records that are within [T - window-size, T' - window-size - grace-period] assuming T - window-size < T' - window-size - grace-period, would have already been expired end emitted, and hence won't be found and matched; if the current processing's record timestamp T == T' (i.e. it is not a late record), then T - window-size is always >= T' - window-size - grace-period, which means that all joined record's timestamps should be later than the expired timestamps.

That means, if we do the expiration first based on (T' - window-size - grace-period), the newly expired records' timestamps should all be smaller than any joined record's timestamps for that processing record generated later. And hence it is safe to just blindly expire them all without the except logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be more concrete here, I think we can move the expiration out of the loop https://github.com/apache/kafka/pull/10462/files#diff-6ca18143cc0226e6d1e4d5180ff81596a72d53639ca5184bf1238350265382a6R154 for fetching any records to join, based on the above analysis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree (even if your train of though seems to be complex...) that we can expired left/outer join result outside of the loop blindly.

To rephrase: if we process record with timestamp T there are two cases:

In-order record:
We can expire everything with timestamp smaller then T - windowSize - gracePeriod.

Out-of-order record:
There is nothing to expired to begin with, because we could only expire T - windowSize - gracePeriod but this value is smaller than streamTime - windowSize - gracePeriod and thus we expired everything up to the larger value perviously already.

Ie, we don't need method emitExpiredNonJoinedOuterRecordsExcept at all, and can move emitExpiredNonJoinedOuterRecords(); before the while loop.

// Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
// this time is not updated in parallel, so we can call get() several times without worry about getting different
// times.
private final AtomicLong maxObservedStreamTime;
Copy link
Member

@mjsax mjsax Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to maintain it manually? Could we use context.streamTime() instead?

Note that context.streamTime() might be slightly different because we advance it for every input record. Thus, if there is a filter before the join, the join might not get all records and thus it's locally observed stream-time could differ from the task stream-time.

It's a smaller semantic impact/difference and it's unclear to me, if we should prefer processor-local stream-time or task stream-time?

\cc @guozhangwang @vvcephei

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think context.currentStreamTimeMs() should work. I wasn't awrare I could get the new stream time from it. I don't see any problems as I only need the stream time to expire the records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had similar debate for using per-task stream time v.s. per-processor stream time when it comes down to expiration. I'm concerned if we go with task stream time here we would have an inconsistent behavior compared with other scenarios?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I am wondering if we should/want to switch to the new context.streamTime() everywhere? -- It's totally open question, but might be worth to explore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally was on the side of always using task stream time everywhere but more people feel that we should use processor stream time :P Anyways, all I'm trying to say is that we need to make an educated decision here, and if we concluded that either 1) we rely on task time here, but still use processor time on other expiration logic, or 2) we rely on processor time on all logic, or 3) we rely on task time on all logic, we have a good rationale for whichever we choose.

@spena spena force-pushed the fix_left_joins branch 3 times, most recently from afd9f36 to 3e444a4 Compare April 12, 2021 20:34
@@ -118,20 +142,47 @@
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);

Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter || rightOuter) {
final String outerJoinSuffix = "-shared-outer-join-store";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use -shared-left-join-store and -shared-right-join-store to left/outer join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one store shared between left/right joins. I can use only one name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion. What I mean was, if it's in leftJoin() we could use -shared-left-join-store and if it's an outerJoin() we could use -shared-outer-join-store ?

+ joinThisGeneratedName.substring(
rightOuter
? KStreamImpl.OUTERTHIS_NAME.length()
: KStreamImpl.JOINTHIS_NAME.length());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand? Why use "outer" or " this" here? If the store is shared, neither one seems to make sense? Overall naming of processor and stores is tricky.. Can we actually add a corresponding test that compares generated and expected TopologyDescription for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially generated a name with a new index for the shared store. However, seems this was going to cause incompatibilities in the topology because the new indexed increasing. Instead, now I just get the index from one of the current join stores. Why doesn't make sense? Is there another way to get an index? Or, do I really need to append an index at the end of the shared store?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should not use one more index to avoid compatibility issues... Maybe the question is really (just for my better understanding), what would the name be, ie, could be give a concrete example (with and without Named parameter)? That is also why I asked for a test using TopologyDescription -- makes it easier to wrap my head around.

outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
}

// Time shared between joins to keep track of the maximum stream time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One disadvantage compared to using context.streamTime() would be, that MaxObservedStreamTime would be reset to zero on rebalance/restart. (Or we need to add additional code to preserve it...)

),
new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
Time.SYSTEM
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass a Time reference here to allow us to mock time in tests if necesssary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require more changes just to allow that. The KStreamImplJoin constructor, where we could overload to pass a Time mock object, is only used by the KStreamImplJoinImpl class. The tests use the StreamsBuilder to create the joins, and they do not accept a Time object.

Also, the Stores class, which is called by KStreamImplJoin, does not mock it. Maybe because the same code changes required just for that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Maybe good enough as-is. (Fair point that we don't mock it in other stores either -- maybe there was never any demand to be able to mock it. As you said, we could change it as follow up if needed.)

@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
return builder;
}

@SuppressWarnings("unchecked")
private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name is confusing. We are building the shared store (compared to "this" and "other") that we use to refer to the two "main" stores of the left and right input processors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@SuppressWarnings("unchecked")
private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line to long

should be

private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store,
                                              final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final To timestamp = To.all().withTimestamp(e.key.window().start());

final R nullJoinedValue;
if (isLeftJoin) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this (as we pass in a ReversJoiner in the "other" join processor anyway) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The ReverseJoiner does not work well in the shared store scenario. I need cast the value and otherValue based on what join side we are.

For instance, I would get an error if I remove the (V1) and (V2) casts. When using the casts, now I need to specify which value is the right one. In the left-side is (V1) record.value.getLeftValue(), in the right-side is (V2) record.value.getLeftValue().

nullJoinedValue = joiner.apply(key,
                        (V1) record.value.getLeftValue(),
                        (V2) record.value.getRightValue());

private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
while (it.hasNext()) {
final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> e = it.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e is not a good variable name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final boolean leftJoin;

private KeyAndJoinSide(final boolean leftJoin, final K key) {
this.key = Objects.requireNonNull(key, "key is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "key cannot be null"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @return a new {@link LeftOrRightValue} instance
*/
public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1 leftValue) {
Objects.requireNonNull(leftValue, "leftValue is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "leftValue cannot be null" (similar below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick look on the serde part. LGTM except two minor comments.

private final V2 rightValue;

private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
this.leftValue = leftValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel it is better to add a check here, to make sure at most one of the left/rightValue is not null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
public class KeyAndJoinSide<K> {
private final K key;
private final boolean leftJoin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the term leftJoin may be a bit confusing as it could mean the join type; maybe just call it leftSide is better? Ditto for the fields in LeftOrRightValue, and also isLeftJoin -> isLeftSide in the impl join classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -486,6 +486,327 @@ public void testJoin() {
}
}

@Test
public void testImprovedLeftJoin() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this test not be added to KStreamKStreamLeftJoinTest.java ?

}

@Test
public void testImprovedFullOuterJoin() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just realizing, that we don't have a KStreamKStreamOuterJoinTest class -- we should add one and move this test there.

}

@Test
public void testOuterJoinEmitsNonJoinedRecordsAfterWindowCloses() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

new KeyValueTimestamp<>(0, "A0+null", 1));
}
}

@Test
public void testOuterJoin() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this test into KStreamKStreamOuterJoinTest class

@spena spena force-pushed the fix_left_joins branch 3 times, most recently from ff263aa to 6bda027 Compare April 19, 2021 15:21
@mjsax mjsax added the streams label Apr 19, 2021
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed again, thanks for the great test coverage!

@@ -118,20 +142,47 @@
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);

Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter || rightOuter) {
final String outerJoinSuffix = leftOuter ? "-shared-left-outer-join" : "-shared-outer-join";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @mjsax 's comment about naming is just to align with the join type, and note that "leftOuter" would always be true once we passed if (leftOuter || rightOuter) since "leftOuter == false && rightOuter == true" is not a case. Also to align with the terms "-outer-this-join" / "-this-join", I think it would be:

rightOuter ? "-outer-shared-join" : "-left-shared-join";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the intent of my comment, but if you look into the newly added tests in TopologyTest.java it might not matter too much, as we also have some "weird" naming in existing code -- and to stay backward compatible, we cannot really change the naming:

inner-join: (store names)
 - KSTREAM-JOINTHIS-0000000004-store
 - KSTREAM-JOINOTHER-0000000005-store

left-join: (store names)
 - KSTREAM-JOINTHIS-0000000004-store
 - KSTREAM-OUTEROTHER-0000000005-store

(Ideally we should have named both KSTREAM-LEFTTHIS-0000000004-store and KSTREAM-LEFTOTHER-0000000005-store...)

outer-join: (store names)
 - KSTREAM-OUTERTHIS-0000000004-store
 - KSTREAM-OUTEROTHER-0000000005-store

@@ -60,20 +83,44 @@
}

private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : I think this function can just be inlined now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
otherWindow = context.getStateStore(otherWindowName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not introduced by this PR, but let's rename it to otherWindowStore for naming consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -118,20 +142,47 @@
final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);

Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
if (leftOuter || rightOuter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the INTERNAL_ENABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX config is set to false, we would end up creating a store but not use it at all.

So I think we need to also check this config here during the plan generation as well: if it is false, then leaving the storeBuilder as empty optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find how to get the configs from KStreamImplJoin. I tracked the configs being passed from the StreamTask down to the processor when processing the record, so I ended up checking the flag at that point. I could refactor the code to pass the configs when constructing the joins, but that require more changes in different places which I'm not sure if it will make things incompatible. Any ideas?

}

private boolean internalOuterJoinFixEnabled(final Map<String, Object> configs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment above: I think we should move this to the KStreamImplJoin to decide whether or not creating the store builder, and here we can just rely on whether the passed in outerJoinWindowName is empty or not to get the store (if it is not empty, then the store must have been created).

//
// the condition below allows us to process the late record without the need
// to hold it in the temporary outer store
if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the motivation is that if the current record's timestamp is too small (i.e. it is too late), then it should not be added into the book-keeping store but can be "expired" immediately. But I also feel the condition seems a bit off here: for the record to be "too late", its timestamp just need to be smaller than the expiration boundary, which is observed-stream-time - join-after - grace-period, right?

private final V2 rightValue;

private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
if (leftValue != null && rightValue != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I thought it could be possible that both sides are null? e.g. for a left-value, where the value itself is null (we do not filter null values in a stream at the moment right? @mjsax ).

Copy link
Member

@mjsax mjsax Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do filter both null keys and null values. For null keys, because we cannot really compute the join, and for null value because a null would be a delete in RocksDB and thus we cannot store null value to begin with.

Thus, it's not possible that both values are null.

import static java.time.Duration.ofMillis;
import static org.junit.Assert.assertEquals;

public class KStreamKStreamOuterJoinTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the added tests! I'm wondering if we can add a case for both left/outer where input streams have consecutive records with exact key / timestamps --- i.e. they are "duplicates" --- and check that 1) we do not de-duplicate them, 2) when a join is not found in time, we expire / delete them together, with two emitted results, 3) when a join is found in time, we emitted two join results.

Also could we have a test case that, when a single record both causes expiration and join results, the emitted records are still sorted in order.

I raised them only because I have yet found these two cases in the test cases, if there's already coverage please ignore this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of them were already there. I just updated them to add the 3) scenario.

KStreamKStreamLeftJoinTest
- testLeftJoinDuplicates()
- testLeftJoinDuplicatesWithFixDisabled()
- testOrdering()

KStreamKStreamOuterJoinTest
- testOuterJoinDuplicates()
- testOuterJoinDuplicatesWithFixDisabled()
- testOrdering()

spena added 5 commits April 20, 2021 16:20
These serdes are used by the shared state store in the stream-stream joins
  - Records are not emittet if current stream time is higher than retention period
  - Keeps time ordering by emitting expired records before the joined record is emitted
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a comment regarding if we can drop a record early, instead of checking within the while loop. @spena please try to address it in the follow-up PR.

//
// the condition below allows us to process the late record without the need
// to hold it in the temporary outer store
if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena seems there are a few different conditions we can consider here:

  1. record time < stream time - window length - grace length: the record is too late, we should drop it up front and also record the droppedRecordsSensorOrExpiredWindowRecordDropSensor.

  2. record time >= stream time - window length - grace length, but < stream time: the record is still late, but joinable, since the stream time would not be advanced we would not have to check and emit non-joined records, but just try to join this record with the other window. Note that like @mjsax said, for the returned matching record, we also need to check if the other record time >= stream time - window length - grace length or not.

  3. record time > stream time, we would first try to emit non-joined records, and then try to join this record.


final long inputRecordTimestamp = context().timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);

maxObservedStreamTime.advance(inputRecordTimestamp);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena @mjsax I left another comment below regarding the time. Please LMK id you think that makes sense.

final KeyValue<Long, V2> otherRecord = iter.next();
final long otherRecordTimestamp = otherRecord.key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spena just ping to make sure you get this on the follow-up PR.

@guozhangwang guozhangwang merged commit bf359f8 into apache:trunk Apr 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants