-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7277: Migrate Streams API to Duration instead of longMs times #5682
Conversation
Hello, @vvcephei |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @nizhikov ,
Thanks for the PR!
I've made a quick pass and left some comments that I think will apply to basically all of the methods, so I'll stop here.
At a high level, this is looking really good.
Please let me know if you have questions/objections to my feedback.
Thanks,
-John
throw new IllegalArgumentException(name + " shouldn't be null.", e); | ||
} catch (final ArithmeticException e) { | ||
throw new IllegalArgumentException(name + " can't be converted to milliseconds. " + d + | ||
" is negative or too big", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This validation doesn't actually check that it's non-negative. I'm not sure that we want to, though.
Also, I'd use Objects.toString()
on name
just in case it's null.
Finally, it might be nice to wrap the substituted variables with brackets to make the result more readable.
All together, this would yield:
"[" + Objects.toString(name) + "] shouldn't be null.", e);
"[" + Objects.toString(name) + "] is too big to be converted to milliseconds: [" + d + "]", e);
What do you think about this?
(also below)
* @param d Duration to check | ||
* @param name Name of params for an error message. | ||
*/ | ||
public static void validateMillisecondDuration(final Duration d, final String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid single-letter variables: d
-> duration
(also below)
...ams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
Show resolved
Hide resolved
@@ -63,7 +64,7 @@ | |||
@SuppressWarnings("unchecked") | |||
public void init(final ProcessorContext context) { | |||
this.context = context; | |||
this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> { | |||
this.context.schedule(Duration.ofMillis(1000), PunctuationType.STREAM_TIME, timestamp -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to add a static import for ofMillis
and STREAMS_TIME
to shorten this line.
* @throws IllegalArgumentException if {@param timeout} is negative or too big | ||
*/ | ||
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException { | ||
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeout.toMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use validateMillisecondDuration
here.
* the timestamp of the record from the primary stream. | ||
* | ||
* @param timeDifference join window interval | ||
* @throws IllegalArgumentException if {@code timeDifference} is negative or too big |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned before, we don't throw IAE
if timeDifference
is negative.
(also below)
@@ -109,11 +110,27 @@ private JoinWindows(final long beforeMs, | |||
* the timestamp of the record from the primary stream. | |||
* | |||
* @param timeDifferenceMs join window interval in milliseconds | |||
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative | |||
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative or too big |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is actually guaranteed not to throw this exception. Since the parameter is a long
of milliseconds, it's not possible to pass a value that is not expressible in milliseconds as a long
.
(also applies elsewhere)
@vvcephei I refactor new methods. So they now call deprecated methods. |
I'll take a look tomorrow. Thanks @nizhikov . |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.streams; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a public package and thus would make this class public API -- don't think we want this (also not part of the KIP). Maybe, we should create a new package org.apache.kafka.streams.internals
and move it there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
|
||
/** | ||
*/ | ||
public final class ApiUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering, what we gain by introducing this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @mjsax
As discussed in KIP thread we want to wrap NPE and ArithmeticException
into IllegalArgumentException
See @vvcephei suggestion: "I still feel that surfacing the ArithmeticException directly would not be a great experience, so I still advocate for wrapping it in an IllegalArgumentException that explains our upper bound for Duration is "max-long number of milliseconds"
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
* ApiUtils moved to internals package. * JavaDoc fixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nizhikov ,
I made another pass over this.
Mostly minor comments, with the exception of the one on streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Thanks again,
-John
streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
* Typos fixed. * Error messages improved. * Wrong new method removed * fetch, fetchAll added to WindowStore
Hello @vvcephei. |
Failure unrelated. Retest this, please. |
1 similar comment
Failure unrelated. Retest this, please. |
@vvcephei @mjsax I've checked failed tests locally and it all succeed.
|
Failure unrelated. Retest this, please. |
1 similar comment
Failure unrelated. Retest this, please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @nizhikov ,
I have just one more request (which applies multiple times); otherwise, it looks good to me!
@@ -179,6 +182,7 @@ public synchronized void put(final Bytes key, final byte[] value, final long win | |||
} | |||
|
|||
@Override | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't need to be marked deprecated, thanks to the non-deprecated definition in WindowStore
now.
This comment applies to all the implementations of WindowStore
in this PR.
* Unneeded @deprecated removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Thanks @nizhikov
@mjsax Tests passed. What else should be done for merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @nizhikov for the patch. I left some general comments but overall LGTM.
/** | ||
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the | ||
* threads to join. | ||
* A {@code timeout} of 0 means to wait forever. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I'm ok with the semantics.
But my first instinct of a timeout of 0
implies shutdown immediately with no wait and blocking forever takes a value of Long.MAX_VALUE
.
In short, I'm +1 as well on revising the behavior.
@@ -361,7 +364,7 @@ public boolean conditionMet() { | |||
final int index = metadata.hostInfo().port(); | |||
final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); | |||
final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); | |||
return store != null && store.fetch(key, from, to) != null; | |||
return store != null && store.fetch(key, ofEpochMilli(from), ofMillis(to - from)) != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, I have a slight preference for the from
- to
approach, but that's due more to personal preference.
I agree with @vvcephei that either way it's the same semantics just expressed differently.
So overall I'd prefer Instant
, Instant
, but if others want Instant
, Duration
I'm good with that as well.
I've resolved merge conflicts. |
Failure unrelated. Retest this, please. |
* @throws NullPointerException If {@code null} is used for any key. | ||
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} | ||
*/ | ||
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, Instant fromTime, Duration duration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just talked to @guozhangwang @bbejeck and @vvcephei -- it seems, we are in favor to go with Instant/Instant
types here and also for fetchAll()
. 2.1
branch is not cut, and thus we can still get this into 2.1 -- can you update the PR accordingly? (Sorry for the long delay, but we were quite busy with many other deadline related things...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated (KIP-358)[https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times] and PR according to your proposal.
Please, review.
@@ -361,7 +364,7 @@ public boolean conditionMet() { | |||
final int index = metadata.hostInfo().port(); | |||
final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); | |||
final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); | |||
return store != null && store.fetch(key, from, to) != null; | |||
return store != null && store.fetch(key, ofEpochMilli(from), ofMillis(to - from)) != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just synced up with the others -- seems we are in favor to switch to Instant/Instant
and try to get it into 2.1
(see my other comment).
Hey @nizhikov , I apologise that I didn't think of this earlier, but can you provide default implementations of the new fetch methods in the WindowStore interface? The default implementations would just defer to the long/long methods. This would mean that any implementations would just automatically work with the new APIs. In fact, you could then pull the implementations out of all our window store implementations. Does that make sense? (I'm tempted not to bring it up at this late stage, but I think it would be very nice for store implementers not to have to add the trivial delegate methods.) |
@vvcephei Do you mean we need to add default implementations into |
No, I mean in |
But we have classes that only extends |
Because we deprecate However, I am not even sure how much we gain adding any default implementation. I am not against it, however, only people who implement their own |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick change @nizhikov. The changes LGTM
Sounds good to me. |
Cool. I'll wait until Jenkins passes (and go over the changes -- my initial "skip over review" looked good) and than merge, if @guozhangwang does not raise any concern. |
Many thanks for this feature (and your patience) @nizhikov ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @nizhikov !
@mjsax Tests passed. |
…pache#5682) Reviewers: Johne Roesler <[email protected]>, Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
Public API changed according to KIP-358
Committer Checklist (excluded from commit message)