Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track cancellable tasks by parent ID #76186

Merged

Conversation

DaveCTurner
Copy link
Contributor

Today when cancelling a task with its descendants we perform a linear
scan through all the tasks looking for the few that have the right
parent ID. With potentially hundreds of thousands of tasks this takes
quite some time, particularly if there are many tasks to cancel.

This commit introduces a second map that tracks the tasks by their
parent ID so that it's super-cheap to find the descendants that need to
be cancelled.

Closes #75316

Today when cancelling a task with its descendants we perform a linear
scan through all the tasks looking for the few that have the right
parent ID. With potentially hundreds of thousands of tasks this takes
quite some time, particularly if there are many tasks to cancel.

This commit introduces a second map that tracks the tasks by their
parent ID so that it's super-cheap to find the descendants that need to
be cancelled.

Closes elastic#75316
@DaveCTurner DaveCTurner added >bug :Distributed Coordination/Task Management Issues for anything around the Tasks API - both persistent and node level. v7.14.1 v7.15.0 v8.0.0-alpha1 labels Aug 5, 2021
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Aug 5, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner
Copy link
Contributor Author

My biggest concern with this right now is the can_match phase of a search which I believe will make a child task of the same parent for every shard on a node. That might be hundreds of shards, and updating the new map is O(N^2) in the number of children. It might be ok since these tasks are handled directly on the transport worker thread which naturally limits the concurrency (to 6 by default) but that seems pretty implicit. It'd be awfully nice if we batched these up at the node level instead tho.

@DaveCTurner
Copy link
Contributor Author

DaveCTurner commented Aug 7, 2021

I wrote a simple benchmark:

/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the Elastic License
 * 2.0 and the Server Side Public License, v 1; you may not use this file except
 * in compliance with, at your election, the Elastic License 2.0 or the Server
 * Side Public License, v 1.
 */

package org.elasticsearch.benchmark.tasks;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.CancellableTasksTracker;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;

@Warmup(iterations = 5)
@Measurement(iterations = 7)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
@Fork(value = 1)
public class CancellableTasksTrackerBenchmark {

    @Param({"30000", "3000", "300", "30", "3"})
    private int parentCount;

    @Param({"1000", "100", "10", "1"})
    private int childCountPerParent;

    private List<BenchmarkTask> benchmarkTasks;
    private CancellableTasksTracker<String> cancellableTasksTracker;
    private TaskId targetParent;

    @Setup
    public void createTasks() {
        final int taskCount = parentCount * childCountPerParent;
        benchmarkTasks = new ArrayList<>(taskCount);
        for (int i = 0; i < taskCount; i++) {
            benchmarkTasks.add(new BenchmarkTask(i % parentCount, i / parentCount, i));
        }
        Randomness.shuffle(benchmarkTasks);
        cancellableTasksTracker = new CancellableTasksTracker<>(String.class, String[]::new);
        targetParent = new TaskId("node", parentCount / 2);
    }

    @Benchmark
    public long trackTasks() {
        for (BenchmarkTask benchmarkTask : benchmarkTasks) {
            cancellableTasksTracker.put(benchmarkTask.task, benchmarkTask.payload);
        }

        final long count = cancellableTasksTracker.getByParent(targetParent).count();

        if (count != childCountPerParent) {
            throw new AssertionError("wrong count: " + count);
        }

        for (BenchmarkTask benchmarkTask : benchmarkTasks) {
            cancellableTasksTracker.remove(benchmarkTask.task);
        }

        return count;
    }

    private static class BenchmarkTask {
        final int parentIndex;
        final int childIndex;
        final Task task;
        final String payload;

        public BenchmarkTask(int parentIndex, int childIndex, long taskId) {
            this.parentIndex = parentIndex;
            this.childIndex = childIndex;
            this.task = new CancellableTask(taskId, "test", "action", "", new TaskId("node", parentIndex), emptyMap());
            this.payload = "task-" + taskId + "-child-" + childIndex + "-parent-" + parentIndex;
        }
    }
}

These times track adding and then removing all the tasks; even with 30000 parent tasks each with 1000 child tasks the mean overhead per task is ~2.2µs.

(childCountPerParent)  (parentCount)  Mode  Cnt            Score            Error  Units  Mean overhead
                 1000          30000  avgt    7  67980271397.429 ± 4994967681.178  ns/op        2266 ns
                 1000           3000  avgt    7   4569715417.524 ± 2111582392.913  ns/op        1523 ns
                 1000            300  avgt    7    246249754.634 ±    2054156.379  ns/op         820 ns
                 1000             30  avgt    7     21136582.188 ±    4286645.660  ns/op         704 ns
                 1000              3  avgt    7      1602687.020 ±      67016.293  ns/op         534 ns
                  100          30000  avgt    7   3136513434.357 ±   92213951.212  ns/op        1045 ns
                  100           3000  avgt    7    120281883.393 ±    4909672.536  ns/op         400 ns
                  100            300  avgt    7      8515642.306 ±     442334.003  ns/op         283 ns
                  100             30  avgt    7       587387.451 ±      27483.477  ns/op         195 ns
                  100              3  avgt    7        47650.830 ±        587.080  ns/op         158 ns
                   10          30000  avgt    7    142939788.670 ±    9858893.675  ns/op         476 ns
                   10           3000  avgt    7      5624404.751 ±     386408.517  ns/op         187 ns
                   10            300  avgt    7       425179.307 ±      13136.734  ns/op         141 ns
                   10             30  avgt    7        37709.674 ±       2988.665  ns/op         125 ns
                   10              3  avgt    7         3535.776 ±        173.179  ns/op         117 ns
                    1          30000  avgt    7      4452360.723 ±     494622.243  ns/op         148 ns
                    1           3000  avgt    7       391214.089 ±      18580.306  ns/op         130 ns
                    1            300  avgt    7        46769.278 ±      16795.350  ns/op         155 ns
                    1             30  avgt    7         3648.374 ±        468.714  ns/op         121 ns
                    1              3  avgt    7          385.357 ±         30.390  ns/op         128 ns

@DaveCTurner
Copy link
Contributor Author

I reverted the "optimisation" for the one-item case because the same benchmark indicates it's actually slightly slower, even in the runs when there's only one child task per parent.

(childCountPerParent)  (parentCount)  Mode  Cnt            Score            Error  Units  Mean overhead
                 1000          30000  avgt    7  67134033515.714 ± 5033933634.050  ns/op        2238 ns
                 1000           3000  avgt    7   4236755545.619 ±   61234713.992  ns/op        1412 ns
                 1000            300  avgt    7    232844652.583 ±    4661204.334  ns/op         776 ns
                 1000             30  avgt    7     18675622.451 ±     560322.362  ns/op         623 ns
                 1000              3  avgt    7      1530250.913 ±      14278.994  ns/op         510 ns
                  100          30000  avgt    7   2956281942.714 ±   62127970.363  ns/op         985 ns
                  100           3000  avgt    7    104351606.728 ±    1645461.830  ns/op         348 ns
                  100            300  avgt    7      7655082.583 ±     103660.312  ns/op         255 ns
                  100             30  avgt    7       521039.197 ±      18967.814  ns/op         174 ns
                  100              3  avgt    7        43946.254 ±        266.907  ns/op         146 ns
                   10          30000  avgt    7    145160690.114 ±    3963524.892  ns/op         484 ns
                   10           3000  avgt    7      5117603.806 ±     187243.301  ns/op         171 ns
                   10            300  avgt    7       421472.832 ±      11077.558  ns/op         140 ns
                   10             30  avgt    7        36384.819 ±        814.380  ns/op         121 ns
                   10              3  avgt    7         3518.711 ±        177.121  ns/op         117 ns
                    1          30000  avgt    7      4274559.294 ±      70630.509  ns/op         142 ns
                    1           3000  avgt    7       389427.491 ±       8577.194  ns/op         130 ns
                    1            300  avgt    7        36697.487 ±        666.284  ns/op         122 ns
                    1             30  avgt    7         3421.890 ±        103.902  ns/op         114 ns
                    1              3  avgt    7          366.648 ±          3.457  ns/op         122 ns

@DaveCTurner
Copy link
Contributor Author

DaveCTurner commented Aug 8, 2021

The version in 76ed889 (== 03cf0a9) is a slight improvement at larger child counts and ~no difference otherwise.

(childCountPerParent)  (parentCount)  Mode  Cnt            Score            Error  Units  Mean overhead
                 1000          30000  avgt    7  57538573339.000 ± 1104612283.253  ns/op        1918 ns
                 1000           3000  avgt    7   4003685036.000 ±  198740962.106  ns/op        1335 ns
                 1000            300  avgt    7    219989120.589 ±    8365899.700  ns/op         733 ns
                 1000             30  avgt    7     16874024.911 ±     193951.882  ns/op         562 ns
                 1000              3  avgt    7      1327812.381 ±      80334.396  ns/op         443 ns
                  100          30000  avgt    7   2852655991.393 ±   57674477.573  ns/op         951 ns
                  100           3000  avgt    7    119850366.740 ±    1403569.814  ns/op         400 ns
                  100            300  avgt    7      7668485.509 ±     136576.566  ns/op         256 ns
                  100             30  avgt    7       472136.967 ±      13905.206  ns/op         157 ns
                  100              3  avgt    7        37112.812 ±        177.624  ns/op         124 ns
                   10          30000  avgt    7    140966921.858 ±    1561421.755  ns/op         470 ns
                   10           3000  avgt    7      5151352.980 ±     280335.160  ns/op         172 ns
                   10            300  avgt    7       397245.162 ±       8335.522  ns/op         132 ns
                   10             30  avgt    7        34949.180 ±        800.483  ns/op         116 ns
                   10              3  avgt    7         3360.763 ±        178.012  ns/op         112 ns
                    1          30000  avgt    7      4236124.666 ±     139988.026  ns/op         141 ns
                    1           3000  avgt    7       415290.801 ±       4607.121  ns/op         138 ns
                    1            300  avgt    7        37496.980 ±        983.645  ns/op         125 ns
                    1             30  avgt    7         3541.204 ±        295.871  ns/op         118 ns
                    1              3  avgt    7          378.129 ±          8.648  ns/op         126 ns

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Left a few comments around testing.

}

private final Map<Long, T> byTaskId = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final Map<TaskId, T[]> byParentTaskId = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend TaskAssertions.assertAllCancellableTasksAreCancelled and perhaps even ESIntegTestCase to verify that the two maps are in sync after tests have completed (at least that the byParentId map only contains items that are also in the byTaskId map).

We could consider doing it in an assertion too but doing it concurrently while running might be difficult?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I don't think we can express any true invariants very easily; I added an eventually-true assertion in 9554f12.

tracker.put(task, item);
state.incrementAndGet();

state.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure we would ever see state==2 in other threads, I would think it would be ok for the JVM to optimize it away.

This is possibly even true for the entire actionThread since there is no synchronization with the watch threads here. The watch threads may only see the the final output. I would advocate a yield should help, but that is outside JMM I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, an earlier iteration had some assertions in between these calls. In theory you're right that it would be legitimate for the compiler to collapse these tests to something fairly trivial, but in practice they do cover many interleavings and the other threads do indeed observe state==2 cases. Just try adding bugs :) I'll add a yield anyway.

watchThread = new Thread(() -> {
awaitStart.run();

for (int i = 0; i < 10; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not always run at least until state.get() ==4 (and then another round)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that this sometimes noticeably added to the test runtime, we spent ages doing this loop before actually doing the useful work. Again, in practice this covers what we care about.

task.join();
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also check the more trivial case where no parent id is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstand, I think we already do?

                    rarely() ? TaskId.EMPTY_TASK_ID : randomFrom(parentTaskIds),

private final Map<TaskId, T[]> byParentTaskId = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

/**
* Add an item for the given task. Should only be called once for each task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a comment that items must be unique per task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ although I'm not sure that's true, I believe we handle duplicates ok?

*/
public class CancellableTasksTracker<T> {

private final T[] empty;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can empty be a constant? It doesn't seems to modified anywhere and it's used in Arrays.copyOf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I didn't find a way to do so, but you're welcome to show me how :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this have to be a generic, T == CancellableTaskHolder is the only use case we have isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah in production, I didn't want to make CancellableTaskHolder public just for the tests and I don't think it makes a performance difference. I could be persuaded.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes a performance difference. I could be persuaded.

Nah there isn't any difference I think lets leave it as is then IMO :)

Copy link
Member

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (dropping the generic may be a nit but I can see how it's more fun to have it for tests)

* Return a collection of all the tracked items. May be large. In the presence of concurrent calls to {@link #put} and {@link #remove}
* it behaves similarly to {@link ConcurrentHashMap#values()}.
*/
public Iterable<T> values() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: might just as well return Collection here and simplify the copying of the result in the tests (the doc talks about a collection anyways :)).

*/
public class CancellableTasksTracker<T> {

private final T[] empty;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this have to be a generic, T == CancellableTaskHolder is the only use case we have isn't it?

@DaveCTurner DaveCTurner merged commit 56f33ce into elastic:master Aug 9, 2021
@DaveCTurner DaveCTurner deleted the 2021-08-05-track-tasks-by-parent branch August 9, 2021 15:11
DaveCTurner added a commit that referenced this pull request Aug 9, 2021
Today when cancelling a task with its descendants we perform a linear
scan through all the tasks looking for the few that have the right
parent ID. With potentially hundreds of thousands of tasks this takes
quite some time, particularly if there are many tasks to cancel.

This commit introduces a second map that tracks the tasks by their
parent ID so that it's super-cheap to find the descendants that need to
be cancelled.

Closes #75316
DaveCTurner added a commit that referenced this pull request Aug 9, 2021
Today when cancelling a task with its descendants we perform a linear
scan through all the tasks looking for the few that have the right
parent ID. With potentially hundreds of thousands of tasks this takes
quite some time, particularly if there are many tasks to cancel.

This commit introduces a second map that tracks the tasks by their
parent ID so that it's super-cheap to find the descendants that need to
be cancelled.

Closes #75316
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed Coordination/Task Management Issues for anything around the Tasks API - both persistent and node level. Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v7.14.1 v7.15.0 v8.0.0-alpha2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Performance regression starting from 7.8.0 when too many search requests closes before getting result
7 participants