Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7277: Migrate Streams API to Duration instead of longMs times #5682

Merged
merged 21 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0293466
KAFKA-7277: Migration to Duration and Instant in public API.
nizhikov Sep 20, 2018
6724276
Merge branch 'trunk' into KAFKA-7277
nizhikov Sep 22, 2018
9730461
KAFKA-7277: Call in tests and examples are changed to Duration versions.
nizhikov Sep 22, 2018
c7b3c9a
KAFKA-7277: Call in tests and examples are changed to Duration versions.
nizhikov Sep 23, 2018
c4f2335
KAFKA-7277: Tests fixed. All deprecated methods removed from tests, n…
nizhikov Sep 24, 2018
a6d47d4
KAFKA-7277: All new method executed via deprecated.
nizhikov Sep 25, 2018
b44aed9
KAFKA-7277: Code review imrpovements
nizhikov Sep 25, 2018
52c5a84
Merge branch 'trunk' into KAFKA-7277
nizhikov Sep 25, 2018
4e42ed4
KAFKA-7277: Fixed compilation after merge with trunk.
nizhikov Sep 25, 2018
e5b9e5f
KAFKA-7277: Code review fixes:
nizhikov Sep 26, 2018
527f6fe
KAFKA-7277: Code review fixes:
nizhikov Sep 27, 2018
6b25162
Merge branch 'trunk' into KAFKA-7277
nizhikov Sep 29, 2018
d63b729
KAFKA-7277: Code review fixes:
nizhikov Sep 29, 2018
8d40a40
Merge branch 'trunk' into KAFKA-7277
nizhikov Oct 1, 2018
5dbe795
KAFKA-7277: Code review fixes:
nizhikov Oct 1, 2018
518a914
Merge branch 'trunk' into KAFKA-7277
nizhikov Oct 2, 2018
6eb48e3
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-7277
nizhikov Oct 3, 2018
f241d3a
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-7277
nizhikov Oct 4, 2018
4e8b65f
KAFKA-7277: Duration -> Instant in ReadOnlyWindowStore.
nizhikov Oct 4, 2018
a583259
KAFKA-7277: Duration -> Instant in ReadOnlyWindowStore. Tests passed.
nizhikov Oct 4, 2018
5210f9f
KAFKA-7277: Javadoc fixed.
nizhikov Oct 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
Expand Down Expand Up @@ -207,7 +207,7 @@ public static void main(final String[] args) {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
.count()
.toStream()
.map((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -87,7 +88,7 @@ public static void main(final String[] args) throws Exception {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.temperature;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -30,7 +31,6 @@

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Demonstrates, using the high-level KStream DSL, how to implement an IoT demo application
Expand Down Expand Up @@ -88,7 +88,7 @@ public static void main(final String[] args) {
// to group and reduce them, a key is needed ("temp" has been chosen)
.selectKey((key, value) -> "temp")
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
.reduce((value1, value2) -> {
if (Integer.parseInt(value1) > Integer.parseInt(value2))
return value1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.wordcount;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -63,7 +64,7 @@ public Processor<String, String> get() {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> {
this.context.schedule(Duration.ofMillis(1000), PunctuationType.STREAM_TIME, timestamp -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to add a static import for ofMillis and STREAMS_TIME to shorten this line.

try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");

Expand Down
66 changes: 66 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/ApiUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public package and thus would make this class public API -- don't think we want this (also not part of the KIP). Maybe, we should create a new package org.apache.kafka.streams.internals and move it there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved


import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

/**
*/
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
public final class ApiUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, what we gain by introducing this class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @mjsax

As discussed in KIP thread we want to wrap NPE and ArithmeticException into IllegalArgumentException

See @vvcephei suggestion: "I still feel that surfacing the ArithmeticException directly would not be a great experience, so I still advocate for wrapping it in an IllegalArgumentException that explains our upper bound for Duration is "max-long number of milliseconds"

private ApiUtils() {
}

/**
* Validates that milliseconds from duration {@code d} can be retrieved.
* @param d Duration to check
* @param name Name of params for an error message.
*/
public static void validateMillisecondDuration(final Duration d, final String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid single-letter variables: d -> duration
(also below)

try {
Objects.requireNonNull(d);

//noinspection ResultOfMethodCallIgnored
d.toMillis();
} catch (final NullPointerException e) {
throw new IllegalArgumentException(name + " shouldn't be null.", e);
} catch (final ArithmeticException e) {
throw new IllegalArgumentException(name + " can't be converted to milliseconds. " + d +
" is negative or too big", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation doesn't actually check that it's non-negative. I'm not sure that we want to, though.
Also, I'd use Objects.toString() on name just in case it's null.
Finally, it might be nice to wrap the substituted variables with brackets to make the result more readable.
All together, this would yield:

  • "[" + Objects.toString(name) + "] shouldn't be null.", e);
  • "[" + Objects.toString(name) + "] is too big to be converted to milliseconds: [" + d + "]", e);

What do you think about this?
(also below)

}
}

/**
* Validates that milliseconds from instant {@code i} can be retrieved.
* @param i Instant to check
* @param name Name of params for an error message.
*/
public static void validateMillisecondInstant(final Instant i, final String name) {
try {
Objects.requireNonNull(i);

//noinspection ResultOfMethodCallIgnored
i.toEpochMilli();
} catch (final NullPointerException e) {
throw new IllegalArgumentException(name + " shouldn't be null.", e);
} catch (final ArithmeticException e) {
throw new IllegalArgumentException(name + " can't be converted to milliseconds. " + i +
" is negative or too big", e);
}
}
}
24 changes: 21 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;

import java.time.Duration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -813,7 +814,7 @@ public void run() {
* This will block until all threads have stopped.
*/
public void close() {
close(DEFAULT_CLOSE_TIMEOUT, TimeUnit.SECONDS);
close(Duration.ofSeconds(DEFAULT_CLOSE_TIMEOUT));
}

/**
Expand All @@ -826,9 +827,26 @@ public void close() {
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @deprecated Use {@link #close(Duration)} instead
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
return close(Duration.ofMillis(timeUnit.toMillis(timeout)));
}

/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@param timeout} is negative or too big
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeout.toMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use validateMillisecondDuration here.


if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
Expand Down Expand Up @@ -885,7 +903,7 @@ public void run() {
shutdownThread.start();
}

if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
if (waitOnState(State.NOT_RUNNING, timeout.toMillis())) {
log.info("Streams client stopped completely");
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.ApiUtils;
import org.apache.kafka.streams.processor.TimestampExtractor;

import java.time.Duration;
Expand Down Expand Up @@ -109,11 +110,27 @@ private JoinWindows(final long beforeMs,
* the timestamp of the record from the primary stream.
*
* @param timeDifferenceMs join window interval in milliseconds
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative or too big
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is actually guaranteed not to throw this exception. Since the parameter is a long of milliseconds, it's not possible to pass a value that is not expressible in milliseconds as a long.
(also applies elsewhere)

* @deprecated Use {@link #of(Duration)} instead.
*/
@Deprecated
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
// This is a static factory method, so we initialize grace and retention to the defaults.
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, null, DEFAULT_RETENTION_MS);
return of(Duration.ofMillis(timeDifferenceMs));
}

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
* @param timeDifference join window interval
* @throws IllegalArgumentException if {@code timeDifference} is negative or too big
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned before, we don't throw IAE if timeDifference is negative.
(also below)

*/
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");

return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), null, DEFAULT_RETENTION_MS);
}

/**
Expand All @@ -124,11 +141,30 @@ public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgument
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window start time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @throws IllegalArgumentException if the resulting window size is negative or too big
* @deprecated Use {@link #before(Duration)} instead.
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@Deprecated
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs, grace, maintainDurationMs, segments);
return before(Duration.ofMillis(timeDifferenceMs));
}

/**
* Changes the start window boundary to {@code timeDifference} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifference} earlier than the timestamp of the record from the primary stream.
* {@code timeDifference} can be negative but it's absolute value must not be larger than current window "after"
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
* value (which would result in a negative window size).
*
* @param timeDifference relative window start time
* @throws IllegalArgumentException if the resulting window size is negative or too big
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");

return new JoinWindows(timeDifference.toMillis(), afterMs, grace, maintainDurationMs, segments);
}

/**
Expand All @@ -139,11 +175,30 @@ public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentExc
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window end time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @throws IllegalArgumentException if the resulting window size is negative or too big
* @deprecated Use {@link #after(Duration)} instead
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@Deprecated
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs, grace, maintainDurationMs, segments);
return after(Duration.ofMillis(timeDifferenceMs));
}

/**
* Changes the end window boundary to {@code timeDifference} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifference} later than the timestamp of the record from the primary stream.
* {@code timeDifference} can be negative but it's absolute value must not be larger than current window "before"
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
* value (which would result in a negative window size).
*
* @param timeDifference relative window end time
* @throws IllegalArgumentException if the resulting window size is negative or too big
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");

return new JoinWindows(beforeMs, timeDifference.toMillis(), grace, maintainDurationMs, segments);
}

/**
Expand All @@ -163,20 +218,19 @@ public long size() {
}

/**
* Reject late events that arrive more than {@code millisAfterWindowEnd}
* Reject late events that arrive more than {@code afterWindowEnd}
* after the end of its window.
*
* Lateness is defined as (stream_time - record_timestamp).
*
* @param millisAfterWindowEnd The grace period to admit late-arriving events to a window.
* @param afterWindowEnd The grace period to admit late-arriving events to a window.
* @return this updated builder
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows grace(final long millisAfterWindowEnd) {
if (millisAfterWindowEnd < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new JoinWindows(beforeMs, afterMs, Duration.ofMillis(millisAfterWindowEnd), maintainDurationMs, segments);
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");

return new JoinWindows(beforeMs, afterMs, afterWindowEnd, maintainDurationMs, segments);
}

@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
Expand All @@ -192,7 +246,7 @@ public long gracePeriodMs() {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
* @deprecated since 2.1. Use {@link JoinWindows#grace(long)} instead.
* @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead.
*/
@SuppressWarnings("deprecation")
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -243,12 +244,12 @@ public Materialized<K, V, S> withCachingDisabled() {
* from window-start through window-end, and for the entire grace period.
*
* @return itself
* @throws IllegalArgumentException if retention is negative or too big
*/
public Materialized<K, V, S> withRetention(final long retentionMs) {
if (retentionMs < 0) {
throw new IllegalArgumentException("Retention must not be negative.");
}
retention = Duration.ofMillis(retentionMs);
public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(retention, "retention");

this.retention = retention;
return this;
}
}
Loading