From 066af1ef47e00ccc067340a34994fe7ea36d513e Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Fri, 5 Oct 2018 14:43:36 +0300 Subject: [PATCH 01/11] KAFKA-7477: Enhancement of close logic. --- .../org/apache/kafka/streams/KafkaStreams.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 5fb89598507c8..299021f1f0b05 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -219,11 +219,7 @@ private boolean waitOnState(final State targetState, final long waitMs) { long elapsedMs = 0L; while (state != targetState) { if (waitMs == 0) { - try { - stateLock.wait(); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration - } + return false; } else if (waitMs > elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { @@ -824,7 +820,7 @@ public void close() { * threads to join. * A {@code timeout} of 0 means to wait forever. * - * @param timeout how long to wait for the threads to shutdown + * @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately. * @param timeUnit unit of time used for timeout * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached * before all threads stopped @@ -833,7 +829,11 @@ public void close() { */ @Deprecated public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); + final long timeoutMillis = timeUnit.toMillis(timeout); + if (timeoutMillis < 0) + throw new IllegalArgumentException("Timeout can't be negative."); + + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMillis); if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN @@ -890,7 +890,7 @@ public void run() { shutdownThread.start(); } - if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) { + if (waitOnState(State.NOT_RUNNING, timeoutMillis)) { log.info("Streams client stopped completely"); return true; } else { From da86a48ee3ae66d8e7aae2a393aca5b50bbb5868 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Fri, 5 Oct 2018 15:28:49 +0300 Subject: [PATCH 02/11] KAFKA-7477: Enhancement of close logic. --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 299021f1f0b05..8b2d75edb0753 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -126,7 +126,7 @@ public class KafkaStreams { private static final String JMX_PREFIX = "kafka.streams"; - private static final int DEFAULT_CLOSE_TIMEOUT = 0; + private static final int DEFAULT_CLOSE_TIMEOUT = 30; // processId is expected to be unique across JVMs and to be used // in userData of the subscription request to allow assignor be aware From fd65d4f343205b8865b2e0249decc7535c06b78e Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sat, 6 Oct 2018 08:45:52 +0300 Subject: [PATCH 03/11] KAFKA-7477: Code review fixes. --- .../apache/kafka/streams/KafkaStreams.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 8b2d75edb0753..f92b2df628329 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -126,7 +126,7 @@ public class KafkaStreams { private static final String JMX_PREFIX = "kafka.streams"; - private static final int DEFAULT_CLOSE_TIMEOUT = 30; + private static final int DEFAULT_CLOSE_TIMEOUT = 0; // processId is expected to be unique across JVMs and to be used // in userData of the subscription request to allow assignor be aware @@ -213,14 +213,18 @@ public boolean isValidTransition(final State newState) { private final Object stateLock = new Object(); protected volatile State state = State.CREATED; - private boolean waitOnState(final State targetState, final long waitMs) { + private boolean waitOnState(final State targetState, final long waitMs, final boolean newSemantics) { final long begin = time.milliseconds(); synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { - if (waitMs == 0) { - return false; - } else if (waitMs > elapsedMs) { + if (!newSemantics && waitMs == 0) { + try { + stateLock.wait(); + } catch (final InterruptedException e) { + // it is ok: just move on to the next iteration + } + } else if (waitMs >= elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); @@ -829,11 +833,17 @@ public void close() { */ @Deprecated public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - final long timeoutMillis = timeUnit.toMillis(timeout); - if (timeoutMillis < 0) - throw new IllegalArgumentException("Timeout can't be negative."); + return close(timeUnit.toMillis(timeout), false); + } - log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMillis); + /** + * @param timeoutMs how long to wait for the threads to shutdown. + * @param newSemantics If {@code true} new semantics used. + * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached + * before all threads stopped + */ + private synchronized boolean close(final long timeoutMs, final boolean newSemantics) { + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN @@ -890,7 +900,7 @@ public void run() { shutdownThread.start(); } - if (waitOnState(State.NOT_RUNNING, timeoutMillis)) { + if (waitOnState(State.NOT_RUNNING, timeoutMs, newSemantics)) { log.info("Streams client stopped completely"); return true; } else { @@ -912,7 +922,12 @@ public void run() { */ public synchronized boolean close(final Duration timeout) throws IllegalArgumentException { ApiUtils.validateMillisecondDuration(timeout, "timeout"); - return close(timeout.toMillis(), TimeUnit.MILLISECONDS); + + final long timeoutMs = timeout.toMillis(); + if (timeoutMs < 0) + throw new IllegalArgumentException("Timeout can't be negative."); + + return close(timeoutMs, true); } /** From 88dae830bce6266dd9fc6d932ed5a30ce8e1158d Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sat, 6 Oct 2018 09:13:53 +0300 Subject: [PATCH 04/11] KAFKA-7477: Tests added. --- .../kafka/streams/KafkaStreamsTest.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index abc4cb90b7d62..46a3dd73d843d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -548,6 +548,33 @@ public void shouldCleanupOldStateDirs() throws InterruptedException { } } + @Test + public void shouldThrowOnNegativeTimeoutForClose() { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + try { + streams.close(Duration.ofMillis(-1L)); + fail("should not accept negative close parameter"); + } catch (final IllegalArgumentException e) { + // expected + } finally { + streams.close(); + } + } + + @Test + public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L))); + + th.start(); + + try { + th.join(30_000L); + } finally { + streams.close(); + } + } + private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException { final File taskDir = new File(appDir, "0_0"); TestUtils.waitForCondition( From c1cb772277e612941a665d042c35bafebae179c4 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 7 Oct 2018 10:02:54 +0300 Subject: [PATCH 05/11] KAFKA-7477: Fixed for a `waitOnState` method --- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f92b2df628329..a8b628d5b5f55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -218,13 +218,15 @@ private boolean waitOnState(final State targetState, final long waitMs, final bo synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { - if (!newSemantics && waitMs == 0) { + if (newSemantics && waitMs == 0) + return false; + else if (!newSemantics && waitMs == 0) { try { stateLock.wait(); } catch (final InterruptedException e) { // it is ok: just move on to the next iteration } - } else if (waitMs >= elapsedMs) { + } else if (waitMs > elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); From a55748813e6f4d1aa1ae08d00a33e0c9c88e0621 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 7 Oct 2018 10:05:19 +0300 Subject: [PATCH 06/11] KAFKA-7477: Fixed for a `waitOnState` method --- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index a8b628d5b5f55..f44eefdb9c91b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -218,9 +218,10 @@ private boolean waitOnState(final State targetState, final long waitMs, final bo synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { - if (newSemantics && waitMs == 0) - return false; - else if (!newSemantics && waitMs == 0) { + if (waitMs == 0) { + if (newSemantics) + return false; + try { stateLock.wait(); } catch (final InterruptedException e) { From d3685600b6f35d0cfa963ef99f4026ce2aa8db90 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 7 Oct 2018 10:09:13 +0300 Subject: [PATCH 07/11] KAFKA-7477: Fixed for a `waitOnState` method --- .../src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 46a3dd73d843d..b9d542bc9b698 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -570,6 +570,7 @@ public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException { try { th.join(30_000L); + assertFalse(th.isAlive()); } finally { streams.close(); } From fa7a93dc1b368cebd79267e83b10fad509260e91 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 8 Oct 2018 23:27:17 +0300 Subject: [PATCH 08/11] KAFKA-7477: `waitOnState` improvement --- .../apache/kafka/streams/KafkaStreams.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f44eefdb9c91b..1c67b9dbd9082 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -213,21 +213,14 @@ public boolean isValidTransition(final State newState) { private final Object stateLock = new Object(); protected volatile State state = State.CREATED; - private boolean waitOnState(final State targetState, final long waitMs, final boolean newSemantics) { + private boolean waitOnState(final State targetState, final long waitMs) { final long begin = time.milliseconds(); synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { if (waitMs == 0) { - if (newSemantics) - return false; - - try { - stateLock.wait(); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration - } - } else if (waitMs > elapsedMs) { + return false; + } else if (waitMs >= elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); @@ -836,16 +829,23 @@ public void close() { */ @Deprecated public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - return close(timeUnit.toMillis(timeout), false); + long timeoutMs = timeUnit.toMillis(timeout); + + if (timeoutMs < 0) { + timeoutMs = 0; + } else if (timeoutMs == 0) { + timeoutMs = Long.MAX_VALUE; + } + + return close(timeoutMs); } /** * @param timeoutMs how long to wait for the threads to shutdown. - * @param newSemantics If {@code true} new semantics used. * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached * before all threads stopped */ - private synchronized boolean close(final long timeoutMs, final boolean newSemantics) { + private synchronized boolean close(final long timeoutMs) { log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); if (!setState(State.PENDING_SHUTDOWN)) { @@ -903,7 +903,7 @@ public void run() { shutdownThread.start(); } - if (waitOnState(State.NOT_RUNNING, timeoutMs, newSemantics)) { + if (waitOnState(State.NOT_RUNNING, timeoutMs)) { log.info("Streams client stopped completely"); return true; } else { @@ -930,7 +930,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument if (timeoutMs < 0) throw new IllegalArgumentException("Timeout can't be negative."); - return close(timeoutMs, true); + return close(timeoutMs); } /** From c82c5ac9b273f008222addb66d1c646e822a8645 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 8 Oct 2018 23:32:03 +0300 Subject: [PATCH 09/11] KAFKA-7477: `waitOnState` improvement --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 1c67b9dbd9082..764619c519b83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -220,7 +220,7 @@ private boolean waitOnState(final State targetState, final long waitMs) { while (state != targetState) { if (waitMs == 0) { return false; - } else if (waitMs >= elapsedMs) { + } else if (waitMs > elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); From d2a64d538843edddcfa0b01dd036bca384cfe85a Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Tue, 9 Oct 2018 00:35:38 +0300 Subject: [PATCH 10/11] KAFKA-7477: code review fixes. --- .../org/apache/kafka/streams/KafkaStreams.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 764619c519b83..dbfa545cd8373 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -831,6 +831,9 @@ public void close() { public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { long timeoutMs = timeUnit.toMillis(timeout); + log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " + + "Please, consider update your code.", timeoutMs); + if (timeoutMs < 0) { timeoutMs = 0; } else if (timeoutMs == 0) { @@ -840,14 +843,7 @@ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { return close(timeoutMs); } - /** - * @param timeoutMs how long to wait for the threads to shutdown. - * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached - * before all threads stopped - */ - private synchronized boolean close(final long timeoutMs) { - log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); - + private boolean close(final long timeoutMs) { if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped @@ -927,8 +923,11 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument ApiUtils.validateMillisecondDuration(timeout, "timeout"); final long timeoutMs = timeout.toMillis(); - if (timeoutMs < 0) + if (timeoutMs < 0) { throw new IllegalArgumentException("Timeout can't be negative."); + } + + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); return close(timeoutMs); } From d055d3e380704470a874bc477902c3a649298717 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Tue, 9 Oct 2018 00:40:02 +0300 Subject: [PATCH 11/11] KAFKA-7477: code review fixes. --- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index dbfa545cd8373..d419ff5087034 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -218,9 +218,7 @@ private boolean waitOnState(final State targetState, final long waitMs) { synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { - if (waitMs == 0) { - return false; - } else if (waitMs > elapsedMs) { + if (waitMs > elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); @@ -825,7 +823,7 @@ public void close() { * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached * before all threads stopped * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}. - * @deprecated Use {@link #close(Duration)} instead + * @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`. */ @Deprecated public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {