-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9929: Support backward iterator on SessionStore #9139
Conversation
e7a774e
to
07c31b9
Compare
07c31b9
to
18fb88e
Compare
cc @ableegoldman and team, this should be ready to review. Thanks! |
3a045e5
to
5c5a04b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jeqo -- finally got around to giving this a first pass, thanks for being so patient 😄
A few minor notes, mostly things we've covered in the KV/window store PRs already. I'm still going through the tests but things look good overall
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
Show resolved
Hide resolved
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() { | |||
setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); | |||
|
|||
current.close(); | |||
current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); | |||
if (forward) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're going to need some additional changes in this class similar to what we had in CachingWindowStore. Definitely at least in getNextSegmentIterator()
. Let's make sure to have some cross-segment test coverage here as well, especially because the iteration logic of session store range queries is the hardest to wrap your head around out of all the stores (at least, it is for me)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the reasons these missing pieces are not throwing exceptions seems to be that Caching Store tests are only covering or InMemory or Persistent underlying stores. This means only one path of the caching logic is followed (e.g. persistent() ? iteratorWraper : otherIterator
)
I rename the test classes to be explicit about what backing store is covered. We could probably cover this as another issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh yeah good catch. Kind of weird that we're inconsistent with which underlying store type is used in the existing caching store tests, too. But thanks for adding the in-memory + persistent flavors for the SessionStore, I think it sounds reasonable to file a ticket to follow up on the Window/KV flavors and clean up the caching tests
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
Outdated
Show resolved
Hide resolved
5c5a04b
to
6909a44
Compare
this.keyFrom = keyFrom; | ||
this.keyTo = keyTo; | ||
this.latestSessionStartTime = latestSessionStartTime; | ||
this.lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); | ||
this.segmentInterval = cacheFunction.getSegmentInterval(); | ||
this.forward = forward; | ||
|
||
this.currentSegmentId = cacheFunction.segmentId(earliestSessionEndTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think that for the reverse case, this should be initialized to cacheFunction.segmentId(maxObservedTimestamp)
and lastSegmentId
should be initialized to this (segmentId(earliestSessionEndTime)
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right! great catch! forgot to align this with WindowStore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more minor things, should definitely be able to merge this by feature freeze 👍
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
Outdated
Show resolved
Hide resolved
public KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it probably doesn't matter since we presumably aren't using these backward methods of the ReadOnlySessionStoreStub, but it seems like it might result in some tricky NPEs to debug if ever someone does try to use it in a test. If you don't feel like implementing it I think it's fine to just throw UnsupportedOperationException and say that you'll have to implement this to use it.
Or just copy the code from the forward direction and flip it 🤷♀️ Same goes for all the methods in here that return null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mixed choice here. hope is good enough
3c8cd51
to
a875cf1
Compare
Retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the reverse iterator KIP 😄
test this please |
Alright Jenkins is being wonky yet again, but I checked out this branch and ran the tests locally to verify. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @jeqo !
I had a few complaints about formatting and one about the exception message. Since we're down to the limit, I'll go ahead and add a commit addressing these concerns, test it for myself (thanks for your efforts there, @ableegoldman ), and merge it.
Thanks so much for this PR.
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, | ||
final long earliestSessionEndTime, | ||
final long latestSessionStartTime) { | ||
throw new UnsupportedOperationException("Moved from SessionStore"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't matter to users whether this method was moved from another interface or not. They just need to know why they're getting the exception. I.e., we just need to tell them that the store implementation they selected didn't implement the method.
throw new UnsupportedOperationException("Moved from SessionStore"); | |
throw new UnsupportedOperationException("This API is not supported by this implementation of ReadOnlySessionStore."); |
We should say the exact same thing in all default implementations. Right now, they're a bit inconsistent.
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), | ||
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)) | ||
cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key, earliestSessionEndTime)), | ||
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of unnecessary whitespace changes in this PR. You don't need to back them all out right now, but in the future, please clean up the diff before submitting a PR. These extra changes make it harder to review.
if (forward) entries = nextKeyEntry.getValue().descendingMap().entrySet(); | ||
else entries = nextKeyEntry.getValue().entrySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code style discourages inline conditionals. It's more maintainable to always use blocks.
Ok, I've pushed a couple of commits addressing my feedback. I absolutely share the desire to clean up bad formatting in the codebase, but when PRs are this extensive, I'd suggest eliminating absolutely all changes that aren't directly related to the change. The extra whitespace changes, etc., just add noise that makes it harder for reviewers to do their job. Plus, it increases the probability of merge conflicts. I'd encourage sending a separate PR applying formatting changes or other style fixes. This is what I do myself. Anyway, I rolled back the "extra" stuff. I also applied a couple of extra formatting changes to make the new code itself comply with the style guidelines. And it looks like Jenkins finally woke up, so I'll let it go ahead and finish running. |
Oh, ironically, there is a merge conflict with trunk, and it's due to one of the only "extra" changes I left in: renaming CachingStoreTest to CachingStoreTest and in the Session variant, making I didn't un-rename that class, but I did restore |
Jenkins is running again, so I'll let it finish and merge in the morning. |
Ok, it's merged. I'm waiting on permission from Bill before cherry-picking to 2.7 |
Implements KIP-617 for `SessionStore` Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]>
cherry-picked to 2.7. Thanks @jeqo ! |
Implements KIP-617 for `SessionStore` Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]>
* Updating trunk versions after cutting branch for 2.7 * KAFKA-9929: Support backward iterator on SessionStore (apache#9139) Implements KIP-617 for `SessionStore` Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]> * MINOR: remove unused scala files from core module (apache#9296) Reviewers: Mickael Maison <[email protected]>, Lee Dongjin <[email protected]> * MINOR: correct package of LinuxIoMetricsCollector (apache#9271) Reviewers: Mickael Maison <[email protected]>, Lee Dongjin <[email protected]> * KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (apache#9393) In this PR, I have addressed the review comments from @chia7712 in apache#9001 which were provided after apache#9001 was merged. The changes are made mainly to KafkaAdminClient: Improve error message in updateFeatures api when feature name is empty. Propagate top-level error message in updateFeatures api. Add an empty-parameter variety for describeFeatures api. Minor documentation updates to @param and @return to make these resemble other apis. Reviewers: Chia-Ping Tsai [email protected], Jun Rao [email protected] * KAFKA-10271: Performance regression while fetching a key from a single partition (apache#9020) StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions. Reviewers: John Roesler <[email protected]>, Guozhang Wang <[email protected]> Co-authored-by: Jorge Esteban Quilcate Otoya <[email protected]> Co-authored-by: Chia-Ping Tsai <[email protected]> Co-authored-by: Kowshik Prakasam <[email protected]> Co-authored-by: Dima Reznik <[email protected]>
…apache#11337) In apache#9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order. Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
…apache#11337) In apache#9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order. Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
Depends on #9138
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)