Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7477: Improve Streams close timeout semantics #5747

Merged
merged 13 commits into from
Oct 9, 2018
28 changes: 23 additions & 5 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,15 @@ public boolean isValidTransition(final State newState) {
private final Object stateLock = new Object();
protected volatile State state = State.CREATED;

private boolean waitOnState(final State targetState, final long waitMs) {
private boolean waitOnState(final State targetState, final long waitMs, final boolean newSemantics) {
final long begin = time.milliseconds();
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
if (waitMs == 0) {
if (newSemantics)
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: every conditional needs to have braces (per the code style)

not nit: I find this logic a little difficult to follow. Contrary to what @mjsax suggested, wouldn't it be pretty straightforward to map the old semantics on to the new ones like this:

  • negative numbers => 0
  • 0 => Long.MAX_VALUE
  • all other arguments stay the same
    ?

Then, the old close method could just transform its arguments and call the new method, with no need to have this "new semantics" flag and an early return in the middle of the loop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I like your proposal. Thanks!
Changed PR according to it.


try {
stateLock.wait();
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
} catch (final InterruptedException e) {
Expand Down Expand Up @@ -824,7 +827,7 @@ public void close() {
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown
* @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately.
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
* @param timeUnit unit of time used for timeout
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
* before all threads stopped
Expand All @@ -833,7 +836,17 @@ public void close() {
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
return close(timeUnit.toMillis(timeout), false);
}

/**
* @param timeoutMs how long to wait for the threads to shutdown.
* @param newSemantics If {@code true} new semantics used.
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
* before all threads stopped
*/
private synchronized boolean close(final long timeoutMs, final boolean newSemantics) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
nizhikov marked this conversation as resolved.
Show resolved Hide resolved

if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
Expand Down Expand Up @@ -890,7 +903,7 @@ public void run() {
shutdownThread.start();
}

if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
if (waitOnState(State.NOT_RUNNING, timeoutMs, newSemantics)) {
log.info("Streams client stopped completely");
return true;
} else {
Expand All @@ -912,7 +925,12 @@ public void run() {
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
return close(timeout.toMillis(), TimeUnit.MILLISECONDS);

final long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
nizhikov marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("Timeout can't be negative.");

return close(timeoutMs, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws InterruptedException {
}
}

@Test
public void shouldThrowOnNegativeTimeoutForClose() {
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.close(Duration.ofMillis(-1L));
fail("should not accept negative close parameter");
} catch (final IllegalArgumentException e) {
// expected
} finally {
streams.close();
}
}

@Test
public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));

th.start();

try {
th.join(30_000L);
assertFalse(th.isAlive());
} finally {
streams.close();
}
}

private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
final File taskDir = new File(appDir, "0_0");
TestUtils.waitForCondition(
Expand Down