-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9929: Support reverse iterator on KeyValueStore #9137
Conversation
This PR is ready for review, covering related feedback from #8976 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for splitting this up! Mostly minor comments on the KVStore PR, some general notes about the Streams style conventions:
- always use braces for
if
/else
/etc and put each condition on its own line - We're trying to move away from using
@Test(expected)
in the tests in favor ofassertThrows
. A lot of the tests haven't been migrated over yet, but just a heads up
* Order is not guaranteed as bytes lexicographical ordering might not represent key order. | ||
* | ||
* @param from The first key that could be in the range, where iteration ends. | ||
* @param to The last key that could be in the range, where iteration starts from. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a bit tricky to say that to is the variable where iteration starts from 😉 But I can see it both ways, so being clear in the javadocs is good enough for me
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
Outdated
Show resolved
Hide resolved
iter.seek(from.get()); | ||
rawLastKey = to.get(); | ||
} | ||
if (rawLastKey == null) { | ||
throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: RawToKey
--> RawLastKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it should be from
for the reverse case, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right. I've moved this into each condition to set a correct exception message.
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
Outdated
Show resolved
Hide resolved
b4c44eb
to
3a6db89
Compare
@ableegoldman this should be ready for another review :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick survey of everywhere that implements range()
and all()
to see if there might be anywhere else we need to add reverseRange
and reverseAll
. These guys seem to be missing their reverse counterparts:
KeyValueStoreReadWriteDecorator
KeyValueToTimestampedKeyValueByteStoreAdapter
Segment
(this one is not strictly necessary until the WindowStore PR though)
The fact that you've had to add these to about 25 different classes already really highlights the mess of the store hierarchy 😬
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
Outdated
Show resolved
Hide resolved
@@ -374,7 +412,7 @@ public Bytes peekNextKey() { | |||
if (next == null) { | |||
return allDone(); | |||
} else { | |||
if (comparator.compare(next.key.get(), upperBoundKey) <= 0) { | |||
if (comparator.compare(next.key.get(), lastKey) <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to branch on reverse
here too, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of unrelated, but WDYT about renaming RocksDBDualCFIterator
to RocksDBDualCFAllIterator
or something on the side? I feel like these iterators could be cleaned up a bit in general to be more understandable -- for example, it's weird that we do the iterator#seek
-ing in the actual all()
method but for range queries we do the seeking inside the iterator constructor.
Just thinking out loud though, we can do some followup refactoring once this is merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I will continue the current approoach and create an issue to follow up this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
Show resolved
Hide resolved
@@ -188,7 +188,55 @@ public void testPutGetRange() { | |||
} | |||
|
|||
@Test | |||
public void testPutGetRangeWithDefaultSerdes() { | |||
public void testPutGetReverseRange() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add some tests to RocksDBTimestampedStoreTest
. I thought it would extend AbstractKeyValueStoreTest
and thus benefit from everything you added here, but doesn't seem to be the case :/
Personally I found the RocksDBDualCFIterator
logic a bit difficult to follow even before the reverse iteration, so it would be nice to have some tests specifically covering reverse iterators over multi-column-family timestamped stores
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized that, I also thought that path was tested. Good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ableegoldman I have extended RocksDBTimestampedStoreTest
to use reverseAll
and reverseRange
as part of the current tests.
Unfortunately, AbstractKeyValueStoreTest
tests do not fit with the creation path of Timestamped stores as pre inserted data is required.
Will add this to the same JIRA ticket to consider when refactoring iterators and tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that makes sense I guess. Looks like the new additions to RocksDBTimestampedStoreTest
cover the cross-column family code path so that's good enough for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Things LGTM now, I'll ping a committer to give this another pass and get it merged.
Let me know when the WindowStore PR is ready and I'll start reviewing the next one 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @jeqo ! I made a partial pass today. It looks really good overall.
A high-level comment: in several places in this code, we have a internal boolean flag to represent the direction. It seems odd that in these places, true = reverse
. It seems like the "natural" iteration order is forward, so we should have blocks like if (forward) /*forward logic*/ else /*reverse logic*/
. But what we have here is if (not reverse) /*forward logic*/ else /*reverse logic*/
. It's not exactly hard to understand, but still it seems to be creating unnecessary hoops for people to jump through when they are reading the code.
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " | ||
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + | ||
"Note that the built-in numerical serdes do not follow this for negative numbers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This warning seems to miss the most likely scenario, that the user just passed the arguments in the wrong order.
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jeqo , I've completed my review. It looks good to me! Thanks so much for the KIP and PR.
Please let me know what you want to do about my prior comments, if anything :)
@@ -58,7 +62,7 @@ public synchronized boolean hasNext() { | |||
return allDone(); | |||
} else { | |||
next = getKeyValue(); | |||
iter.next(); | |||
advanceIterator.accept(iter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clever!
218c678
to
b124cb7
Compare
@vvcephei thanks for your feedback! Inverting flag actually make things more readable. Changing to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jeqo !
It looks like java 14 only failed while attempting to download some docs from Oracle, and java 8 failed on this unrelated test: |
Implements KIP-617 on WindowStore that depends on #9137. Testing strategy: extend existing tests to validate reverse operations are supported. Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
Implements KIP-617 on KeyValueStore.
Testing strategy: extend existing tests to validate reverse operations are supported.
Committer Checklist (excluded from commit message)