Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9929: Support backward iterator on WindowStore #9138

Merged
merged 29 commits into from
Sep 2, 2020

Conversation

jeqo
Copy link
Contributor

@jeqo jeqo commented Aug 7, 2020

Depends on #9137

Implements KIP-617 on WindowStore.

Testing strategy: extend existing tests to validate reverse operations are supported.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jeqo jeqo changed the title Backward windowstore KAFKA-9929: Support backward iterator on WindowStore Aug 7, 2020
@jeqo jeqo force-pushed the backward-windowstore branch 2 times, most recently from 756ad2f to 6dc89ff Compare August 11, 2020 12:54
@jeqo jeqo force-pushed the backward-windowstore branch 2 times, most recently from 4b8d0f7 to e0589da Compare August 18, 2020 09:55
@jeqo jeqo marked this pull request as ready for review August 18, 2020 09:56
@jeqo
Copy link
Contributor Author

jeqo commented Aug 18, 2020

@ableegoldman, this PR is ready for review 👍

@jeqo jeqo force-pushed the backward-windowstore branch 2 times, most recently from 9cfecc0 to 07d2f6c Compare August 20, 2020 16:39
Copy link
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part 2 is looking good!
One general question: should we mention that the key ordering will also be reversed? I'm pretty sure that it will be automatically (because of the byte layout). But key ordering isn't really "guaranteed" in the first place, so maybe it's best to not make any claims at all.

*/
@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
// note, this method must be kept if super#fetch(...) is removed
@SuppressWarnings("deprecation")
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why we have all these ReadOnlyWindowStore methods also declared here in WindowStore? We don't need reverse variations of these I guess? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods were introduced when adding Duration/Instant support #5682.

I don't think these are needed, we can do a similar change as for SessionStore read operations. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the note, this method must be kept if super#fetch(...) is removed comments are making me nervous, but they could be out of date. Anyways I don't think you need to clean all this up right now, just wondering what's going on here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, given @mjsax 's response on KAFKA-10434 it seems like we actually need to add the reverse variation of these long-based methods to the WindowStore API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I've added the long-based methods to WindowStore to align this.

@@ -426,7 +558,12 @@ private void getNextSegmentIterator() {
setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github won't let me comment up there but on line 418, shouldn't we should have to decrement the currentSegmentId for the reverse case? I'm a little confused because it looks like you have test coverage for the multi-segment case and it seems to pass. Maybe I'm just tired and missing something obvious here..
For example in CachingWindowStoreTest#shouldFetchAndIterateOverKeyBackwardRange the results seem to go across multiple segments, but it looks like we actually do return the record from the largest segment first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will have to double check this. I have inverted the current/last segment for backwards use-case though.

Comment on lines 171 to 173
public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final Instant from, final Instant to) {
final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's a little awkward to have the reverse variations accept time parameters as an Instant while the forward versions just use a long. I would have thought we could migrate the long methods to Instant at some point but I see all these note, this method must be kept if super#fetch(...) is removed comments littered throughout the code...so maybe there's a reason for sticking with the long overrides in the innermost store layer?
Did you come across anything that suggested a reason for keeping the long flavors? cc @guozhangwang or @mjsax -- why can't we remove these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only backward compatibility. If it make sense to remove these deprecations as part of this KIP, I'd be happy to help cleaning it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, feel free to just create a followup ticket to see if we can clean things up. No need to block this PR on it

Copy link
Contributor Author

@jeqo jeqo Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.keyFrom = keyFrom;
this.keyTo = keyTo;
this.timeTo = timeTo;
this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
this.forward = forward;

this.segmentInterval = cacheFunction.getSegmentInterval();
this.currentSegmentId = cacheFunction.segmentId(timeFrom);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should start on the largest segment I think (largest segment == farthest advanced in time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch! I think this hasn't pop up yet in the tests as all tests may be using the same segment.

Will double check to add more tests to validate this.

@jeqo jeqo force-pushed the backward-windowstore branch from 07d2f6c to 0871d19 Compare August 22, 2020 17:06
new HashSet<>(asList(one, two, four, five)),
toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5)))
assertArrayEquals(
new LinkedHashSet<>(asList(one, two, four, five)).toArray(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels kind of ridiculous to convert this from a list to a set to an array all in one line. Maybe we can use assertThat(result, equalTo(expectedResult)) here like we've started to do elsewhere in Streams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow this is embarrasing haha, looks like I was trying to get tests green without much thinking 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha actually I think I was the one who added the initial conversion from list to set to begin with, so I was clearly asking for trouble

throw new InvalidStateStoreException("Store is not open");
}
final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
for (final long now : data.keySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here (and all the backwards ReadOnlyWindowStoreStub methods): I think we are kind of forced to invert the key ordering for the backwards fetch methods as well, even if we don't necessarily want to. Probably users shouldn't be relying on a strict ordering of the keys anyway but we do have to match the ordering of the cache

Copy link
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! One minor fix remaining on the InMemoryWindowStore but besides that this LGTM. Will ping @guozhangwang for a 2nd review & merge

@jeqo
Copy link
Contributor Author

jeqo commented Aug 31, 2020

@ableegoldman key ordering is added to InMemoryWindowStore now

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the PR non-testing part. I just have one question regarding the caching segmented range search (cc'ed @ableegoldman as well) that needs some clarification. Otherwise LGTM.

* @return The value or {@code null} if no value is found in the window
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for any key.
* @throws NullPointerException If {@code null} is used for any key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this intentional? Also I'd suggest we do not use capitalized If to be consistent with the above line, ditto elsewhere below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, IDE reformatting. I've fixed the capitalized If

ApiUtils.validateMillisecondInstant(timeTo, prepareMillisCheckFailMsgPrefix(timeTo, "timeTo")));
}

default KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom, final long timeTo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are going to remove deprecated overloads with primitive long in the future, I think we do not need to expose a default function here, but just provide a default impl of the function in 204 below as UnsupportedOperation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk, the current defaults make sense to me. If a user has a custom store and wants to use the new backwardFetchAll with both longs and Instants, all they'd have to do is override the long-based backwardFetchAll method (they have to implement the long version no matter what, since this is what gets used internally to Streams). If we just throw UnsupportedOperationException directly from the default implementation of the Instant-based backwardFetchAll, then they would have to override that as well in their custom store. So we should just let the Instant default to the long method so users only have to implement one method instead of two (plus they would have to do the Instant validation themselves, etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang do you mean the changes proposed in KIP-667? In that KIP we are removing long-based methods only from ReadOnlyWindowStore, but will keep it in WindowStore—unless you're referring to a different proposal.

I do agree with @ableegoldman. Users will have to opt-in to reverse operations when using custom stores, and by default only add implementation for long-based to support both params options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I was thinking about ReadOnlyWindowStore exposed in IQ only. All good.

* @throws NullPointerException If {@code null} is used for any key.
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds}
*/
KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, Instant timeFrom, Instant timeTo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is out of the scope of this PR, but I'd like to point out that the current IQ does not actually obey the ordering when there are multiple local stores hosted on that instance. For example, if there are two stores from two tasks hosting keys {1, 3} and {2,4}, then a range query of key [1,4] would return in the order of 1,3,2,4 but not 1,2,3,4 since it is looping over the stores only. This would be the case for either forward or backward fetches on range-key-range-time.

For single key time range fetch, or course, there's no such issue.

I think it worth documenting this for now until we have a fix (and actually we are going to propose something soon).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, @jeqo noticed that back in the KeyValueStore PR. That's a good point about clarifying this point in the javadocs, not sure if it makes sense to do so on the ReadOnlyWindowStore methods directly vs on some IQ-related method/class? Maybe we can just do a quick followup PR to shore up the documentation here without blocking this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#9137 (comment) old discussion for context.

@guozhangwang is this scenario only possible with composite stores? or are there other implementations where this is an alternative path?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The composite stores are the ones used in IQ. So IQ queries would get impacted by this, for processing, since it is always within a single task there should be no problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -192,4 +192,3 @@ public void close() {
storeIterator.close();
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? We usually have a newline at file end in case some specific IDEs do not like otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks strange. I can see there is a newline in my IDE. Seems that in trunk it had 2 newlines at the end. I can roll-back this change if needed.

segmentId(timeTo), true
);
final NavigableMap<Long, S> segmentsInRange;
if (forward) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can just call subMap out of the condition and only call descendingMap() based on the condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer this immutable approach as it's the same used in keyValueStore changes, if that's ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, nit comments are just suggestions not mandatory.

}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comments: I think we do not need to add overloads for primitive types for the newly added APIs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would still need to keep this method: we're not removing all long-based APIs, just the public/IQ methods in ReadOnlyWindowStore. But we still want to keep the long-based methods on WindowStore and all the internal store interfaces for performance reasons.
Maybe once we move everything to use Instant all the way down to the serialization then we can remove these long-based methods. I guess we should consider that when discussing KIP-667, but for the time being at least, we should keep them for internal use

this.currentSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
this.lastSegmentId = cacheFunction.segmentId(timeFrom);

setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, maxObservedTimestamp.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this call is different based on the forward boolean? It's not clear to me. cc @ableegoldman @lct45 could you double check?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks right to me -- in the iterator constructor, we would normally start from timeFrom (the minimum time) and advance to the end of the current segment (that's what the "cache key range" defines, the range of the current segment) When iterating backwards, the current segment is actually the largest segment, so the cache key lower range is the current (largest) segment's beginning timestamp, and the upper range is the maximum timestamp of the backwards fetch. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

@jeqo jeqo force-pushed the backward-windowstore branch from 4decbb7 to 814ef3b Compare September 2, 2020 09:00
@guozhangwang guozhangwang merged commit 4f06d9e into apache:trunk Sep 2, 2020
guozhangwang pushed a commit that referenced this pull request Sep 13, 2021
…rder (#11292)

When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.

Currently, in Window store, we store records in [segments -> [records] ].

For example:
window size = 500,
input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window

So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".

Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.

Reviewers: Jorge Esteban Quilcate Otoya <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…rder (apache#11292)

When introducing backward iterator for WindowStroe in apache#9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.

Currently, in Window store, we store records in [segments -> [records] ].

For example:
window size = 500,
input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window

So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".

Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.

Reviewers: Jorge Esteban Quilcate Otoya <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants