Skip to content

Commit

Permalink
KAFKA-15326: [7/N] Processing thread non-busy waiting (apache#14180)
Browse files Browse the repository at this point in the history
Avoid busy waiting for processable tasks. We need to be a bit careful here to not have the task executors to sleep when work is available. We have to make sure to signal on the condition variable any time a task becomes "processable". Here are some situations where a task becomes processable:

- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.

So in summary, we

- We should probably lock tasks when they are paused and unlock them when they are resumed. We should also wake the task executors after every polling phase. This belongs to the StreamThread integration work (separate PR). We add DefaultTaskManager.signalProcessableTasks for this.
- We need to awake the task executors in DefaultTaskManager.unassignTask, DefaultTaskManager.unlockTasks and DefaultTaskManager.add.


Reviewers: Walker Carlson <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored and AnatolyPopov committed Feb 16, 2024
1 parent a4fd1f4 commit 552bccf
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 11 deletions.
6 changes: 6 additions & 0 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,10 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="DMI_RANDOM_USED_ONLY_ONCE"/>
</Match>

<Match>
<!-- Suppress a warning about await not being in a loop - we expect the loop to be outside the method. -->
<Class name="org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>

</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ private void runOnce(final long nowMs) {
currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this);
}

if (currentTask != null) {
if (currentTask == null) {
try {
taskManager.awaitProcessableTasks();
} catch (final InterruptedException ignored) {
// Can be ignored, the cause of the interrupted will be handled in the event loop
}
} else {
boolean progressed = false;

if (taskExecutionMetadata.canProcessTask(currentTask, nowMs) && currentTask.isProcessable(nowMs)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;

import java.util.concurrent.locks.Condition;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class DefaultTaskManager implements TaskManager {
private final TasksRegistry tasks;

private final Lock tasksLock = new ReentrantLock();
private final Condition tasksCondition = tasksLock.newCondition();
private final List<TaskId> lockedTasks = new ArrayList<>();
private final Map<TaskId, StreamsException> uncaughtExceptions = new HashMap<>();
private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap<>();
Expand Down Expand Up @@ -108,7 +110,7 @@ public StreamTask assignNextTask(final TaskExecutor executor) {

assignedTasks.put(task.id(), executor);

log.info("Assigned {} to executor {}", task.id(), executor.name());
log.debug("Assigned task {} to executor {}", task.id(), executor.name());

return (StreamTask) task;
}
Expand All @@ -118,6 +120,41 @@ public StreamTask assignNextTask(final TaskExecutor executor) {
});
}

@Override
public void awaitProcessableTasks() throws InterruptedException {
final boolean interrupted = returnWithTasksLocked(() -> {
for (final Task task : tasks.activeTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds())
) {
log.debug("Await unblocked: returning early from await since a processable task {} was found", task.id());
return false;
}
}
try {
log.debug("Await blocking");
tasksCondition.await();
} catch (final InterruptedException ignored) {
// we interrupt the thread for shut down and pause.
// we can ignore this exception.
log.debug("Await unblocked: Interrupted while waiting for processable tasks");
return true;
}
log.debug("Await unblocked: Woken up to check for processable tasks");
return false;
});

if (interrupted) {
throw new InterruptedException();
}
}

public void signalProcessableTasks() {
log.debug("Waking up task executors");
executeWithTasksLocked(tasksCondition::signalAll);
}

@Override
public void unassignTask(final StreamTask task, final TaskExecutor executor) {
executeWithTasksLocked(() -> {
Expand All @@ -132,7 +169,8 @@ public void unassignTask(final StreamTask task, final TaskExecutor executor) {

assignedTasks.remove(task.id());

log.info("Unassigned {} from executor {}", task.id(), executor.name());
log.debug("Unassigned {} from executor {}", task.id(), executor.name());
tasksCondition.signalAll();
});
}

Expand Down Expand Up @@ -188,7 +226,11 @@ public KafkaFuture<Void> lockAllTasks() {

@Override
public void unlockTasks(final Set<TaskId> taskIds) {
executeWithTasksLocked(() -> lockedTasks.removeAll(taskIds));
executeWithTasksLocked(() -> {
lockedTasks.removeAll(taskIds);
log.debug("Waking up task executors");
tasksCondition.signalAll();
});
}

@Override
Expand All @@ -202,6 +244,8 @@ public void add(final Set<StreamTask> tasksToAdd) {
for (final StreamTask task : tasksToAdd) {
tasks.addTask(task);
}
log.debug("Waking up task executors");
tasksCondition.signalAll();
});

log.info("Added tasks {} to the task manager to process", tasksToAdd);
Expand Down Expand Up @@ -261,12 +305,11 @@ public Map<TaskId, StreamsException> drainUncaughtExceptions() {
return result;
});

log.info("Drained {} uncaught exceptions", returnValue.size());
log.debug("Drained {} uncaught exceptions", returnValue.size());

return returnValue;
}


private void executeWithTasksLocked(final Runnable action) {
tasksLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public interface TaskManager {

/**
* Unlock all of the managed active tasks from the task manager. Similar to {@link #unlockTasks(Set)}.
*
* This method does not block, instead a future is returned.
*/
void unlockAllTasks();

Expand Down Expand Up @@ -121,4 +119,14 @@ public interface TaskManager {
*/
Map<TaskId, StreamsException> drainUncaughtExceptions();

/**
* Signals that at least one task has become processable, e.g. because it was resumed or new records may be available.
*/
void signalProcessableTasks();

/**
* Blocks until unassigned processable tasks may be available.
*/
void awaitProcessableTasks() throws InterruptedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public void shouldShutdownTaskExecutor() {
assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown");
}

@Test
public void shouldAwaitProcessableTasksIfNoneAssignable() throws InterruptedException {
assertNull(taskExecutor.currentTask(), "Have task assigned before startup");
when(taskManager.assignNextTask(taskExecutor)).thenReturn(null);

taskExecutor.start();

verify(taskManager, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).awaitProcessableTasks();
}

@Test
public void shouldUnassignTaskWhenNotProgressing() {
when(task.isProcessable(anyLong())).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
Expand All @@ -26,6 +29,7 @@
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.apache.kafka.test.StreamsTestUtils.TaskBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -37,20 +41,25 @@
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DefaultTaskManagerTest {

private final static long VERIFICATION_TIMEOUT = 15000;

private final Time time = new MockTime(1L);
private final StreamTask task = mock(StreamTask.class);
private final TaskId taskId = new TaskId(0, 0, "A");
private final StreamTask task = TaskBuilder.statelessTask(taskId).build();
private final TasksRegistry tasks = mock(TasksRegistry.class);
private final TaskExecutor taskExecutor = mock(TaskExecutor.class);
private final StreamsException exception = mock(StreamsException.class);
Expand All @@ -70,9 +79,9 @@ private Properties configProps() {

@BeforeEach
public void setUp() {
when(task.id()).thenReturn(new TaskId(0, 0, "A"));
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.isActive()).thenReturn(true);
when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
when(tasks.task(taskId)).thenReturn(task);
}

@Test
Expand All @@ -94,6 +103,114 @@ public void shouldAssignTaskThatCanBeProcessed() {
assertNull(taskManager.assignNextTask(taskExecutor));
}

private class AwaitingRunnable implements Runnable {
private final CountDownLatch awaitDone = new CountDownLatch(1);
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@Override
public void run() {
while (!shutdownRequested.get()) {
try {
taskManager.awaitProcessableTasks();
} catch (final InterruptedException ignored) {
}
awaitDone.countDown();
}
}

public void shutdown() {
shutdownRequested.set(true);
taskManager.signalProcessableTasks();
}
}

@Test
public void shouldBlockOnAwait() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();

assertFalse(awaitingRunnable.awaitDone.await(100, TimeUnit.MILLISECONDS));

awaitingRunnable.shutdown();
}

@Test
public void shouldReturnFromAwaitOnInterruption() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();

awaitingThread.interrupt();

assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));

awaitingRunnable.shutdown();
}

@Test
public void shouldReturnFromAwaitOnSignalProcessableTasks() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();

taskManager.signalProcessableTasks();

assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));

awaitingRunnable.shutdown();
}

@Test
public void shouldReturnFromAwaitOnUnassignment() throws InterruptedException {
taskManager.add(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true);

final StreamTask task = taskManager.assignNextTask(taskExecutor);
assertNotNull(task);
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();

taskManager.unassignTask(task, taskExecutor);

assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));

awaitingRunnable.shutdown();
}

@Test
public void shouldReturnFromAwaitOnAdding() throws InterruptedException {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();

taskManager.add(Collections.singleton(task));

assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));

awaitingRunnable.shutdown();
}

@Test
public void shouldReturnFromAwaitOnUnlocking() throws InterruptedException {
taskManager.add(Collections.singleton(task));
taskManager.lockTasks(Collections.singleton(task.id()));
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
verify(tasks, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();

taskManager.unlockAllTasks();

assertTrue(awaitingRunnable.awaitDone.await(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS));

awaitingRunnable.shutdown();
}

@Test
public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
taskManager.add(Collections.singleton(task));
Expand Down

0 comments on commit 552bccf

Please sign in to comment.