Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7277: Migrate Streams API to Duration instead of longMs times #5682

Merged
merged 21 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0293466
KAFKA-7277: Migration to Duration and Instant in public API.
nizhikov Sep 20, 2018
6724276
Merge branch 'trunk' into KAFKA-7277
nizhikov Sep 22, 2018
9730461
KAFKA-7277: Call in tests and examples are changed to Duration versions.
nizhikov Sep 22, 2018
c7b3c9a
KAFKA-7277: Call in tests and examples are changed to Duration versions.
nizhikov Sep 23, 2018
c4f2335
KAFKA-7277: Tests fixed. All deprecated methods removed from tests, n…
nizhikov Sep 24, 2018
a6d47d4
KAFKA-7277: All new method executed via deprecated.
nizhikov Sep 25, 2018
b44aed9
KAFKA-7277: Code review imrpovements
nizhikov Sep 25, 2018
52c5a84
Merge branch 'trunk' into KAFKA-7277
nizhikov Sep 25, 2018
4e42ed4
KAFKA-7277: Fixed compilation after merge with trunk.
nizhikov Sep 25, 2018
e5b9e5f
KAFKA-7277: Code review fixes:
nizhikov Sep 26, 2018
527f6fe
KAFKA-7277: Code review fixes:
nizhikov Sep 27, 2018
6b25162
Merge branch 'trunk' into KAFKA-7277
nizhikov Sep 29, 2018
d63b729
KAFKA-7277: Code review fixes:
nizhikov Sep 29, 2018
8d40a40
Merge branch 'trunk' into KAFKA-7277
nizhikov Oct 1, 2018
5dbe795
KAFKA-7277: Code review fixes:
nizhikov Oct 1, 2018
518a914
Merge branch 'trunk' into KAFKA-7277
nizhikov Oct 2, 2018
6eb48e3
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-7277
nizhikov Oct 3, 2018
f241d3a
Merge branch 'trunk' of github.com:apache/kafka into KAFKA-7277
nizhikov Oct 4, 2018
4e8b65f
KAFKA-7277: Duration -> Instant in ReadOnlyWindowStore.
nizhikov Oct 4, 2018
a583259
KAFKA-7277: Duration -> Instant in ReadOnlyWindowStore. Tests passed.
nizhikov Oct 4, 2018
5210f9f
KAFKA-7277: Javadoc fixed.
nizhikov Oct 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
Expand Down Expand Up @@ -207,7 +207,7 @@ public static void main(final String[] args) {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
.count()
.toStream()
.map((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -87,7 +88,7 @@ public static void main(final String[] args) throws Exception {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.temperature;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -30,7 +31,6 @@

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Demonstrates, using the high-level KStream DSL, how to implement an IoT demo application
Expand Down Expand Up @@ -88,7 +88,7 @@ public static void main(final String[] args) {
// to group and reduce them, a key is needed ("temp" has been chosen)
.selectKey((key, value) -> "temp")
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
.reduce((value1, value2) -> {
if (Integer.parseInt(value1) > Integer.parseInt(value2))
return value1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.wordcount;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -63,7 +64,7 @@ public Processor<String, String> get() {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> {
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");

Expand Down
20 changes: 20 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;

import java.time.Duration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -36,6 +37,7 @@
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
Expand Down Expand Up @@ -826,7 +828,9 @@ public void close() {
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @deprecated Use {@link #close(Duration)} instead
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));

Expand Down Expand Up @@ -894,6 +898,22 @@ public void run() {
}
}

/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* A {@code timeout} of 0 means to wait forever.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @bbejeck @vvcephei Should we change this to "zero or negative" ? Comparing the implementation, passing in a negative timestamp will "expire" the timeout even without checking the state transition at all and return immediately (with false) even if the state transition was successful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with those semantics, maybe we can make a Jira.

Thinking about it more, though, wouldn't it make more sense to:

  • reject negative numbers
  • make 0 just signal and return immediately (after checking the state once)
  • if I want to wait "forever", I can use ofYears(1) or ofMillis(Long.MAX_VALUE) or some other intuitively "long enough to be forever" value instead of a magic value.

Regardless, I agree the current behavior is a little weird, and I'd be in favor of a Jira/KIP to revise it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I'm ok with the semantics.

But my first instinct of a timeout of 0 implies shutdown immediately with no wait and blocking forever takes a value of Long.MAX_VALUE.

In short, I'm +1 as well on revising the behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we should fix this in 2.1 -- I am suggestion this, because we could use the new semantics for close(Duration) only and stay with old semantics for close(long, TimeUnit) -- if we don't so this, it would be an backward incompatible change and thus we could only do it in 3.0.0..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nizhikov Are you willing for address 7477 in this PR? If not, it's also fine and we do a follow up PR. However, it should be part of the KIP description. Could you update the KIP accordingly?

Copy link
Collaborator Author

@nizhikov nizhikov Oct 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I'll take care of KAFKA-7477 in follow up PR.

KIP-358 updated. Please, see, "Proposed Changes" section.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you start working on KAFKA-7477 already @nizhikov ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can understand thus fix is trivial.
I'm planning to provide PR in a 24 hour after this PR finish. Is it OK?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't call it trivial, but sure, this sound good! Thanks a lot. Just want to make sure we get it on time to not miss code freeze deadline. Thanks a lot!

*
* @param timeout how long to wait for the threads to shutdown
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(State, State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
return close(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

/**
* Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
* data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

public final class ApiUtils {
private ApiUtils() {
}

/**
* Validates that milliseconds from {@code duration} can be retrieved.
* @param duration Duration to check.
* @param name Name of params for an error message.
* @return Milliseconds from {@code duration}.
*/
public static long validateMillisecondDuration(final Duration duration, final String name) {
try {
if (duration == null)
throw new IllegalArgumentException("[" + Objects.toString(name) + "] shouldn't be null.");

return duration.toMillis();
} catch (final ArithmeticException e) {
throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e);
}
}

/**
* Validates that milliseconds from {@code instant} can be retrieved.
* @param instant Instant to check.
* @param name Name of params for an error message.
* @return Milliseconds from {@code instant}.
*/
public static long validateMillisecondInstant(final Instant instant, final String name) {
try {
if (instant == null)
throw new IllegalArgumentException("[" + name + "] shouldn't be null.");

return instant.toEpochMilli();
} catch (final ArithmeticException e) {
throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.TimestampExtractor;

import java.time.Duration;
Expand Down Expand Up @@ -110,42 +111,93 @@ private JoinWindows(final long beforeMs,
*
* @param timeDifferenceMs join window interval in milliseconds
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative
* @deprecated Use {@link #of(Duration)} instead.
*/
@Deprecated
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
// This is a static factory method, so we initialize grace and retention to the defaults.
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, null, DEFAULT_RETENTION_MS);
}

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
* @param timeDifference join window interval
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
*/
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
return of(timeDifference.toMillis());
}

/**
* Changes the start window boundary to {@code timeDifferenceMs} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} earlier than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "after"
* {@code timeDifferenceMs} can be negative but its absolute value must not be larger than current window "after"
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window start time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #before(Duration)} instead.
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@Deprecated
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs, grace, maintainDurationMs, segments);
}

/**
* Changes the start window boundary to {@code timeDifference} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifference} earlier than the timestamp of the record from the primary stream.
* {@code timeDifference} can be negative but its absolute value must not be larger than current window "after"
* value (which would result in a negative window size).
*
* @param timeDifference relative window start time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
return before(timeDifference.toMillis());
}

/**
* Changes the end window boundary to {@code timeDifferenceMs} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} later than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "before"
* {@code timeDifferenceMs} can be negative but its absolute value must not be larger than current window "before"
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window end time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #after(Duration)} instead
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@Deprecated
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs, grace, maintainDurationMs, segments);
}

/**
* Changes the end window boundary to {@code timeDifference} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifference} later than the timestamp of the record from the primary stream.
* {@code timeDifference} can be negative but its absolute value must not be larger than current window "before"
* value (which would result in a negative window size).
*
* @param timeDifference relative window end time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
return after(timeDifference.toMillis());
}

/**
* Not supported by {@code JoinWindows}.
* Throws {@link UnsupportedOperationException}.
Expand All @@ -163,20 +215,22 @@ public long size() {
}

/**
* Reject late events that arrive more than {@code millisAfterWindowEnd}
* Reject late events that arrive more than {@code afterWindowEnd}
* after the end of its window.
*
* Lateness is defined as (stream_time - record_timestamp).
*
* @param millisAfterWindowEnd The grace period to admit late-arriving events to a window.
* @param afterWindowEnd The grace period to admit late-arriving events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows grace(final long millisAfterWindowEnd) {
if (millisAfterWindowEnd < 0) {
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
if (afterWindowEnd.toMillis() < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new JoinWindows(beforeMs, afterMs, Duration.ofMillis(millisAfterWindowEnd), maintainDurationMs, segments);
return new JoinWindows(beforeMs, afterMs, afterWindowEnd, maintainDurationMs, segments);
}

@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
Expand All @@ -192,7 +246,7 @@ public long gracePeriodMs() {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
* @deprecated since 2.1. Use {@link JoinWindows#grace(long)} instead.
* @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead.
*/
@SuppressWarnings("deprecation")
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -243,12 +244,14 @@ public Materialized<K, V, S> withCachingDisabled() {
* from window-start through window-end, and for the entire grace period.
*
* @return itself
* @throws IllegalArgumentException if retention is negative or can't be represented as {@code long milliseconds}
*/
public Materialized<K, V, S> withRetention(final long retentionMs) {
if (retentionMs < 0) {
public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(retention, "retention");
if (retention.toMillis() < 0) {
throw new IllegalArgumentException("Retention must not be negative.");
}
retention = Duration.ofMillis(retentionMs);
this.retention = retention;
return this;
}
}
Loading