Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API for stopping streaming tasks early #16310

Merged
merged 13 commits into from
May 14, 2024

Conversation

georgew5656
Copy link
Contributor

@georgew5656 georgew5656 commented Apr 18, 2024

Adding a api on the overlord to order a task group to checkpoint and exit early before taskDuration is met. This is helpful when trying to update old middle managers (by ending the existing tasks on them) and also could potentially let druid run streaming tasks indefinitely (since they can do intermediate checkpointing and can always be shutdown manually when needed).

Description

  • Add an API that given a supervisor-id, task-group-id, will tell the tasks in that task group to shutdown the next time the supervisor loop runs.

Fixed the bug ...

Renamed the class ...

Added a forbidden-apis entry ...

I thought integrating this into the checkTaskDuration flow was the best way to to this since it does everything needed during a regular handoff (when taskDuration is exceeded), and the logic we want here is exactly the same as what is done during regular handoff (as opposed to a intermediate handoff or a supervisor pause)

Release note

Add new API for telling streaming task groups to exit early.

Key changed/added classes in this PR
  • SupervisorManager
  • SupervisorResource
  • SeekableStreamSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@@ -3132,7 +3153,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
Copy link

@johnImply johnImply May 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it is still subject to the stopTaskCount limit ... for my use case I would want to be able to force this rollover without any other restrictions. If you have a use case that requires that it be subject to stopTaskCount, then can you add an optional parameter (e.g. "force=true") that will allow us to choose between the modes? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you just set stopTaskCount=0 to not have that config apply?

Copy link

@johnImply johnImply May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing stopTaskCount requires a Supervisor change, which defeats the whole purpose ;)

My clusters generally run with stopTaskCount=1 which works very well ... but for high taskCount jobs it is much more likely that one task will be cycling at any given time, which would render this command useless ...

Furthermore I may have to cycle more than one task, depending on how many of the tasks are running on the node being cycled ... that makes it that much more likely that I will go above the stopTaskCount level.

... unless this command queues up the tasks for cycling, but I didn't think that logic is in the code. So what happens now if you are at the stopTaskCount limit? Is the stop-early command ignored?

Copy link
Contributor Author

@georgew5656 georgew5656 May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this implementation it would respect the stopTaskCount and not stop the task until a "stop slot" is available.

I guess a alternate implementation could be to just short-circuit stopTaskCount and always stop tasks that have been stopped early, let me think about that a bit

i feel like having a config is too complicated imo, it should be one behavior or the other

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this some more i think it should probably just ignore stopTaskCount since the operator has manually requested the stop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment in the code explaining why we chose to ignore stopTaskCount when shutdownEarly is set to true. I think it makes sense to not respect it in this case, since stopTaskCount was meant to reduce task spikes during normal operation, and shutdownEarly is in response to an API call which is not a "normal" operation.

@georgew5656 georgew5656 changed the title (WIP) Try stopping streaming tasks early API for stopping streaming tasks early May 6, 2024

EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
SeekableStreamSupervisorSpec.getDataSchema
should be avoided because it has been deprecated.
@@ -395,6 +395,25 @@ public Response shutdown(@PathParam("id") final String id)
return terminate(id);
}

@POST
@Path("/{id}/taskGroups/restart")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Path("/{id}/taskGroups/restart")
@Path("/{id}/taskGroups/handoff")

^ maybe this is a good name to match the function name.

Can you also please add javadocs for this function explaining that it is best effort. I think it is worth calling out that if a user passes in a taskGroup that does not exist, it will not error here.

And some user visible docs here https://github.com/apache/druid/blob/master/docs/api-reference/supervisor-api.md

Comment on lines 98 to 100
default void handoffTaskGroupEarly(int taskGroupId)
{
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default void handoffTaskGroupEarly(int taskGroupId)
{
}
default void handoffTaskGroupEarly(int taskGroupId) throws DruidException
{
throw DruidException
.forPersona(DruidException.Persona.ADMIN)
.ofCategory(DruidException.Category.UNSUPPORTED)
.build(...);
}

I think the default implementation should indicate in some way that this is not supported. I don't have a strong opinion on if it needs to be via an exception or some other mechanism.

{
return asLeaderWithSupervisorManager(
manager -> {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should handle error case when the supervisor does not support handoffTaskGroupsEarly

We probably also need a null or empty check for taskGroupIds.

@@ -657,6 +669,37 @@ public String getType()
}
}

private class HandoffTaskGroupNotice implements Notice
{
final Integer taskGroupId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Integer taskGroupId;
final List<Integer> taskGroupId;

I think accepting a List of integers in the notice will make for nicer dev ergonomics

private static final String TYPE = "handoff_task_group_notice";

HandoffTaskGroupNotice(
final Integer taskGroupId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Nonnull since it should be validated elsewhere

Suggested change
final Integer taskGroupId
@Nonnull final List<Integer> taskGroupIds

@@ -3132,7 +3153,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment in the code explaining why we chose to ignore stopTaskCount when shutdownEarly is set to true. I think it makes sense to not respect it in this case, since stopTaskCount was meant to reduce task spikes during normal operation, and shutdownEarly is in response to an API call which is not a "normal" operation.

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Just have one comment about the body of the API.

Also it would be nice to have some sort of integration test for this since this is quite a subtle change. It is ok if that integration test comes in a follow up patch

```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/taskGroups/handoff"
--header 'Content-Type: application/json'
--data-raw '["1", "2", "3"]'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
--data-raw '["1", "2", "3"]'
--data-raw '{"taskGroupIds": [1, 2, 3]}'

I think this is the expected format for a REST API. Also it looks like taskGroupIds are integers, so I don't think quotes are needed around the numbers

georgew5656 and others added 3 commits May 13, 2024 10:21
…blestream/supervisor/SeekableStreamSupervisor.java

Co-authored-by: Suneet Saldanha <[email protected]>
…blestream/supervisor/SeekableStreamSupervisor.java

Co-authored-by: Suneet Saldanha <[email protected]>
@georgew5656 georgew5656 requested a review from suneet-s May 13, 2024 20:49
Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this API 🚀 🚀 🚀

@georgew5656 georgew5656 merged commit c1bf4fe into apache:master May 14, 2024
87 checks passed
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feature, @georgew5656 ! This would be very useful in upgrade scenarios and for future work of having streaming tasks running perpetually.

Even though this PR has already been merged, I have left some small suggestions.

manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the success case, why do we return the request parameters back in the response? We might as well just return an empty 200 OK response.

Alternatively, we could return the taskGroupIds which were actually marked for early hand-off. In the case where some of the requested taskGroupIds are non-existent or not actively reading, the returned set of taskGroupIds could differ from the requested one, thus telling the caller which ones will actually be handed off.

/** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
throw new NotImplementedException("Supervisor does not have the feature to handoff task groups early implemented");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why not use UnsupportedOperationException instead?
The error message seems to suggest that this is more of an "unsupported" scenario.

@@ -93,4 +94,10 @@ default Boolean isHealthy()
LagStats computeLagStats();

int getActiveTaskGroupsCount();

/** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some rephrase + Druid javadoc styling

Suggested change
/** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
/**
* Marks the given task groups as ready for segment hand-off irrespective of the task run times.
* In the subsequent run, the supervisor initiates segment publish and hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply ignored.
*/

Comment on lines +689 to +694
if (taskGroup == null) {
log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
continue;
}

taskGroup.setShutdownEarly();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
if (taskGroup == null) {
log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
continue;
}
taskGroup.setShutdownEarly();
if (taskGroup == null) {
log.info("Tried to stop task group[%d] for supervisor[%s] but it is not actively reading.", taskGroupId, supervisorId);
} else {
taskGroup.setShutdownEarly();
}

@Override
public void handle()
{
for (Integer taskGroupId : taskGroupIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add an info log line here saying that we are now going to handoff these task groups early. Otherwise, there is no way to know that such a request was even received by the supervisor.

We can probably also add a log line in SupervisorManager to indicate when the request was received. The log here can be used to identify when the notice is actually handled.

@@ -3143,14 +3194,15 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task group is marked for early shutdown, we should log it.

@@ -266,6 +268,16 @@ Set<String> taskIds()
return tasks.keySet();
}

void setShutdownEarly()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these methods be renamed to setHandoffEarly() to correspond to the API and the method in SupervisorManager / Supervisor classes?

@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants