Skip to content

Commit

Permalink
KAFKA-7277: Migrate Streams API to Duration instead of longMs times (#…
Browse files Browse the repository at this point in the history
…5682)

Reviewers: Johne Roesler <[email protected]>, Matthias J. Sax <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
nizhikov authored and mjsax committed Oct 4, 2018
1 parent 260b07a commit ca641b3
Show file tree
Hide file tree
Showing 100 changed files with 1,311 additions and 536 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation,
Expand Down Expand Up @@ -207,7 +207,7 @@ public static void main(final String[] args) {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.groupByKey(Serialized.with(Serdes.String(), new JSONSerde<>()))
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(7)).advanceBy(TimeUnit.SECONDS.toMillis(1)))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -87,7 +88,7 @@ public static void main(final String[] args) throws Exception {
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.windowedBy(TimeWindows.of(7 * 24 * 60 * 60 * 1000L).advanceBy(1000))
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
.count()
.toStream()
.map((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.temperature;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -30,7 +31,6 @@

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Demonstrates, using the high-level KStream DSL, how to implement an IoT demo application
Expand Down Expand Up @@ -88,7 +88,7 @@ public static void main(final String[] args) {
// to group and reduce them, a key is needed ("temp" has been chosen)
.selectKey((key, value) -> "temp")
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
.windowedBy(TimeWindows.of(Duration.ofSeconds(TEMPERATURE_WINDOW_SIZE)))
.reduce((value1, value2) -> {
if (Integer.parseInt(value1) > Integer.parseInt(value2))
return value1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.examples.wordcount;

import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -63,7 +64,7 @@ public Processor<String, String> get() {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
this.context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> {
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");

Expand Down
20 changes: 20 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;

import java.time.Duration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -36,6 +37,7 @@
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
Expand Down Expand Up @@ -827,7 +829,9 @@ public void close() {
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @deprecated Use {@link #close(Duration)} instead
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));

Expand Down Expand Up @@ -895,6 +899,22 @@ public void run() {
}
}

/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(State, State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
return close(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

/**
* Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
* data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

public final class ApiUtils {
private ApiUtils() {
}

/**
* Validates that milliseconds from {@code duration} can be retrieved.
* @param duration Duration to check.
* @param name Name of params for an error message.
* @return Milliseconds from {@code duration}.
*/
public static long validateMillisecondDuration(final Duration duration, final String name) {
try {
if (duration == null)
throw new IllegalArgumentException("[" + Objects.toString(name) + "] shouldn't be null.");

return duration.toMillis();
} catch (final ArithmeticException e) {
throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e);
}
}

/**
* Validates that milliseconds from {@code instant} can be retrieved.
* @param instant Instant to check.
* @param name Name of params for an error message.
* @return Milliseconds from {@code instant}.
*/
public static long validateMillisecondInstant(final Instant instant, final String name) {
try {
if (instant == null)
throw new IllegalArgumentException("[" + name + "] shouldn't be null.");

return instant.toEpochMilli();
} catch (final ArithmeticException e) {
throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.TimestampExtractor;

import java.time.Duration;
Expand Down Expand Up @@ -110,42 +111,93 @@ private JoinWindows(final long beforeMs,
*
* @param timeDifferenceMs join window interval in milliseconds
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative
* @deprecated Use {@link #of(Duration)} instead.
*/
@Deprecated
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
// This is a static factory method, so we initialize grace and retention to the defaults.
return new JoinWindows(timeDifferenceMs, timeDifferenceMs, null, DEFAULT_RETENTION_MS);
}

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
* the timestamp of the record from the primary stream.
*
* @param timeDifference join window interval
* @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
*/
public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
return of(timeDifference.toMillis());
}

/**
* Changes the start window boundary to {@code timeDifferenceMs} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} earlier than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "after"
* {@code timeDifferenceMs} can be negative but its absolute value must not be larger than current window "after"
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window start time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #before(Duration)} instead.
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@Deprecated
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs, grace, maintainDurationMs, segments);
}

/**
* Changes the start window boundary to {@code timeDifference} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifference} earlier than the timestamp of the record from the primary stream.
* {@code timeDifference} can be negative but its absolute value must not be larger than current window "after"
* value (which would result in a negative window size).
*
* @param timeDifference relative window start time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
return before(timeDifference.toMillis());
}

/**
* Changes the end window boundary to {@code timeDifferenceMs} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} later than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "before"
* {@code timeDifferenceMs} can be negative but its absolute value must not be larger than current window "before"
* value (which would result in a negative window size).
*
* @param timeDifferenceMs relative window end time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
* @deprecated Use {@link #after(Duration)} instead
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
@Deprecated
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs, grace, maintainDurationMs, segments);
}

/**
* Changes the end window boundary to {@code timeDifference} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifference} later than the timestamp of the record from the primary stream.
* {@code timeDifference} can be negative but its absolute value must not be larger than current window "before"
* value (which would result in a negative window size).
*
* @param timeDifference relative window end time
* @throws IllegalArgumentException if the resulting window size is negative or {@code timeDifference} can't be represented as {@code long milliseconds}
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
return after(timeDifference.toMillis());
}

/**
* Not supported by {@code JoinWindows}.
* Throws {@link UnsupportedOperationException}.
Expand All @@ -163,20 +215,22 @@ public long size() {
}

/**
* Reject late events that arrive more than {@code millisAfterWindowEnd}
* Reject late events that arrive more than {@code afterWindowEnd}
* after the end of its window.
*
* Lateness is defined as (stream_time - record_timestamp).
*
* @param millisAfterWindowEnd The grace period to admit late-arriving events to a window.
* @param afterWindowEnd The grace period to admit late-arriving events to a window.
* @return this updated builder
* @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
*/
@SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
public JoinWindows grace(final long millisAfterWindowEnd) {
if (millisAfterWindowEnd < 0) {
public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
if (afterWindowEnd.toMillis() < 0) {
throw new IllegalArgumentException("Grace period must not be negative.");
}
return new JoinWindows(beforeMs, afterMs, Duration.ofMillis(millisAfterWindowEnd), maintainDurationMs, segments);
return new JoinWindows(beforeMs, afterMs, afterWindowEnd, maintainDurationMs, segments);
}

@SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
Expand All @@ -192,7 +246,7 @@ public long gracePeriodMs() {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
* @deprecated since 2.1. Use {@link JoinWindows#grace(long)} instead.
* @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead.
*/
@SuppressWarnings("deprecation")
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -243,12 +244,14 @@ public Materialized<K, V, S> withCachingDisabled() {
* from window-start through window-end, and for the entire grace period.
*
* @return itself
* @throws IllegalArgumentException if retention is negative or can't be represented as {@code long milliseconds}
*/
public Materialized<K, V, S> withRetention(final long retentionMs) {
if (retentionMs < 0) {
public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(retention, "retention");
if (retention.toMillis() < 0) {
throw new IllegalArgumentException("Retention must not be negative.");
}
retention = Duration.ofMillis(retentionMs);
this.retention = retention;
return this;
}
}
Loading

0 comments on commit ca641b3

Please sign in to comment.