Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API for stopping streaming tasks early #16310

Merged
merged 13 commits into from
May 14, 2024
55 changes: 55 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3550,6 +3550,61 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
</details>

### Handoff task groups for a supervisor early

Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`

#### Sample request

The following example shows how to handoff task groups for a Kafka supervisor with the name `social_media` and has the task groups: `1,2,3`.

<Tabs>

<TabItem value="3" label="cURL">


```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/taskGroups/handoff"
--header 'Content-Type: application/json'
--data-raw '["1", "2", "3"]'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
--data-raw '["1", "2", "3"]'
--data-raw '{"taskGroupIds": [1, 2, 3]}'

I think this is the expected format for a REST API. Also it looks like taskGroupIds are integers, so I don't think quotes are needed around the numbers

```

</TabItem>
<TabItem value="4" label="HTTP">


```HTTP
POST /druid/indexer/v1/supervisor/social_media/taskGroups/handoff HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json

[1, 2, 3]
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"id": "social_media",
"taskGroupIds": [
1,
2,
3
]
}
```
</details>

### Shut down a supervisor

Shuts down a supervisor. This endpoint is deprecated and will be removed in future releases. Use the equivalent [terminate](#terminate-a-supervisor) endpoint instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ public Optional<SupervisorStateManager.State> getSupervisorState(String id)
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
}

public boolean handoffTaskGroupsEarly(String id, List<Integer> taskGroupIds)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
if (supervisor == null || supervisor.lhs == null) {
return false;
}
supervisor.lhs.handoffTaskGroupsEarly(taskGroupIds);
return true;
}

public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
Expand Down Expand Up @@ -395,6 +396,43 @@ public Response shutdown(@PathParam("id") final String id)
return terminate(id);
}

/**
* This method will immediately try to handoff the list of task group ids for the given supervisor.
* This is a best effort API and makes no guarantees of execution, e.g. if a non-existent task group id
* is passed to it, the API call will still suceced.
*/
@POST
@Path("/{id}/taskGroups/handoff")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response handoffTaskGroups(@PathParam("id") final String id, final List<Integer> taskGroupIds)
{
if (taskGroupIds == null || taskGroupIds.isEmpty()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty"))
.build();

}
return asLeaderWithSupervisorManager(
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the success case, why do we return the request parameters back in the response? We might as well just return an empty 200 OK response.

Alternatively, we could return the taskGroupIds which were actually marked for early hand-off. In the case where some of the requested taskGroupIds are non-existent or not actively reading, the returned set of taskGroupIds could differ from the requested one, thus telling the caller which ones will actually be handed off.

} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))
.build();
}
}
catch (NotImplementedException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor [%s] does not support early handoff", id)))
.build();
}
}
);
}

@POST
@Path("/{id}/terminate")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public class TaskGroup
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action

Boolean shutdownEarly = false; // set by SupervisorManager.stopTaskGroupEarly
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved

TaskGroup(
int groupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
Expand Down Expand Up @@ -266,6 +268,16 @@ Set<String> taskIds()
return tasks.keySet();
}

void setShutdownEarly()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these methods be renamed to setHandoffEarly() to correspond to the API and the method in SupervisorManager / Supervisor classes?

{
shutdownEarly = true;
}

Boolean getShutdownEarly()
{
return shutdownEarly;
}

@VisibleForTesting
public String getBaseSequenceName()
{
Expand Down Expand Up @@ -657,6 +669,39 @@ public String getType()
}
}

private class HandoffTaskGroupsNotice implements Notice
{
final List<Integer> taskGroupIds;
private static final String TYPE = "handoff_task_group_notice";

HandoffTaskGroupsNotice(
@Nonnull final List<Integer> taskGroupIds
)
{
this.taskGroupIds = taskGroupIds;
}

@Override
public void handle()
{
for (Integer taskGroupId : taskGroupIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add an info log line here saying that we are now going to handoff these task groups early. Otherwise, there is no way to know that such a request was even received by the supervisor.

We can probably also add a log line in SupervisorManager to indicate when the request was received. The log here can be used to identify when the notice is actually handled.

TaskGroup taskGroup = activelyReadingTaskGroups.getOrDefault(taskGroupId, null);
if (taskGroup == null) {
log.info("Tried to stop task group that wasn't actively reading.");
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

taskGroup.setShutdownEarly();
}
}

@Override
public String getType()
{
return TYPE;
}
}

protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
Expand Down Expand Up @@ -1932,6 +1977,12 @@ private boolean isTaskInPendingCompletionGroups(String taskId)
return false;
}

@Override
public void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
addNotice(new HandoffTaskGroupsNotice(taskGroupIds));
}

private void discoverTasks() throws ExecutionException, InterruptedException
{
int taskCount = 0;
Expand Down Expand Up @@ -3143,14 +3194,15 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
Copy link

@johnImply johnImply May 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it is still subject to the stopTaskCount limit ... for my use case I would want to be able to force this rollover without any other restrictions. If you have a use case that requires that it be subject to stopTaskCount, then can you add an optional parameter (e.g. "force=true") that will allow us to choose between the modes? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you just set stopTaskCount=0 to not have that config apply?

Copy link

@johnImply johnImply May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing stopTaskCount requires a Supervisor change, which defeats the whole purpose ;)

My clusters generally run with stopTaskCount=1 which works very well ... but for high taskCount jobs it is much more likely that one task will be cycling at any given time, which would render this command useless ...

Furthermore I may have to cycle more than one task, depending on how many of the tasks are running on the node being cycled ... that makes it that much more likely that I will go above the stopTaskCount level.

... unless this command queues up the tasks for cycling, but I didn't think that logic is in the code. So what happens now if you are at the stopTaskCount limit? Is the stop-early command ignored?

Copy link
Contributor Author

@georgew5656 georgew5656 May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this implementation it would respect the stopTaskCount and not stop the task until a "stop slot" is available.

I guess a alternate implementation could be to just short-circuit stopTaskCount and always stop tasks that have been stopped early, let me think about that a bit

i feel like having a config is too complicated imo, it should be one behavior or the other

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this some more i think it should probably just ignore stopTaskCount since the operator has manually requested the stop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment in the code explaining why we chose to ignore stopTaskCount when shutdownEarly is set to true. I think it makes sense to not respect it in this case, since stopTaskCount was meant to reduce task spikes during normal operation, and shutdownEarly is in response to an API call which is not a "normal" operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task group is marked for early shutdown, we should log it.

// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action.
if (pendingCompletionTaskGroups.values()
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops()) {
< ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,27 @@ public void testGetSupervisorStatus()
verifyAll();
}

@Test
public void testHandoffTaskGroupsEarly()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1));

replayAll();

manager.start();

Assert.assertTrue(manager.handoffTaskGroupsEarly("id1", ImmutableList.of(1)));
Assert.assertFalse(manager.handoffTaskGroupsEarly("id2", ImmutableList.of(1)));

verifyAll();
}

@Test
public void testStartAlreadyStarted()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,133 @@
Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}

@Test(timeout = 10_000L)
public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, InterruptedException
{
DateTime startTime = DateTimes.nowUtc();
SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
1,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L),
null
)
{
};

EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
SeekableStreamSupervisorSpec.getDataSchema
should be avoided because it has been deprecated.
EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
{
@Override
public Duration getEmissionDuration()
{
return new Period("PT2S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

Map<String, Object> context = new HashMap<>();
context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
"id1",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
0,
Collections.singletonMap("0", "10"),
Collections.singletonMap("0", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"0"
);

final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));

EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();

EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
.andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();

EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
.andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
.anyTimes();

ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, partitionOffset);

EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();

// The task should only be pause/resumed in one of the runInternal commands, after stopTaskGroupEarly has been called.
EasyMock.expect(indexTaskClient.resumeAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.once();
EasyMock.expect(indexTaskClient.pauseAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.once();
taskQueue.shutdown("id1", "All tasks in group[%s] failed to transition to publishing state", 0);

replayAll();

SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();
supervisor.runInternal();
supervisor.handoffTaskGroupsEarly(ImmutableList.of(0));

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
supervisor.runInternal();
verifyAll();
}

@Test
public void testEmitBothLag() throws Exception
{
Expand Down
Loading
Loading