Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tasks to improve APM support #87917

Merged
merged 17 commits into from
Jul 5, 2022

Conversation

pugnascotia
Copy link
Contributor

Part of #84369. Split out from #87696. Rework how some work is executed
by creating child tasks for them, so that when traced by APM, it results
in more meaningful parent and child tasks in the UI. It also improves
how Elasticsearch is modelling the work.

Part of elastic#84369. Split out from elastic#87696. Rework how some work is executed
by creating child tasks for them, so that when traced by APM, it results
in more meaningful parent and child tasks in the UI. It also improves
how Elasticsearch is modelling the work.
@pugnascotia pugnascotia added :Distributed Coordination/Task Management Issues for anything around the Tasks API - both persistent and node level. >refactoring v8.4.0 labels Jun 22, 2022
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Jun 22, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@pugnascotia
Copy link
Contributor Author

@elasticmachine run elasticsearch-ci/part-1 because the failure doesn't reproduce.

@pugnascotia pugnascotia changed the title Preliminary refactoring for APM support Refactor tasks to improve APM support Jun 23, 2022
Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks mostly good. I hope we can fix the constructor order. Also, I would like to see a bit of shallow testing that we actually utilize the task manager/generate child tasks during cluster state publishing and recovery, just to retain the functionality for the future.

@@ -74,6 +78,8 @@ public class MasterService extends AbstractLifecycleComponent {

static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

public static final String STATE_UPDATE_ACTION_NAME = "internal:cluster/coordination/update_state";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a transport action name. In other places where we register a task, we pick a simpler name with no "internal:" prefix, for instance for enrich, it is just policy_execution. I'd prefer the same here to avoid the confusion. publish_cluster_state_update for instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there are also examples in the same form - for example, JoinHelper has 3 action names that all start with internal:cluster/coordination. The internal: prefix will also make it easier to filter out cluster management tasks when capturing traces, because we can filter in and filter out by span name, which here equates to task name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those 3 are different, they are all 3 registered as transport actions and used on the wire. A common prefix for all artificial actions would be fine though, we just have not had it. We could start one now or leave this without one. I'd prefer not to reuse the space we have. I think filtering internal: is not going to be useful anyway, so many actions under that anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that filtering on prefixes like internal: is probably not helpful, and indeed end-users probably don't know much about the structure of these action names. It would have been good to have all action names following the same structure as the ones for transport actions, but I think we've already crossed that bridge unfortunately. So yes a bare publish_cluster_state_update would be ok with me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the task name, and had a go at adding a test in MasterServiceTests. Please let me know what you think, I hacked it together from other examples.

I looked into adding something similar for recoveries, but I'm a bit lost. I looked at PeerRecoverySourceServiceTests since it's already being changed to create a task to pass in, but the existing test is very narrow in scope and it's not obvious how to problem the task-executing behaviour without lots of new setup code or by opening up method visibility, neither of which is appealing. I'm hoping someone can point somewhere and say "oh, you just need something a bit like that."

...anyone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had something like IndexRecoveryIT.testOngoingRecoveryAndMasterFailOver in mind, i.e., ensure a recovery, then block it through transport, then check that the task is registered correctly (not doing the master failover part of the test).

I'd be happy to hack something together.

Did not look at your test yet, might be sufficient.

@pugnascotia
Copy link
Contributor Author

@henningandersen CI is green, can you take another look?

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Node construction part looks good to me. I left a couple more comments.

Also, I would like to see a bit of shallow testing that we actually utilize the task manager/generate child tasks during cluster state publishing and recovery, just to retain the functionality for the future.

I did not see this added, will you look into that please?

@@ -204,6 +225,28 @@ public TransportService(
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
ConnectionManager connectionManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd get rid of both the two old constructors, but one of them is widely used in tests. This one however looks like it is only used in a couple of tests and I'd prefer to just use the one below, creating a task manager in the test.

Can we add a comment to the other constructor that accepts "task headers" that it is used in tests only?

@@ -74,6 +78,8 @@ public class MasterService extends AbstractLifecycleComponent {

static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";

public static final String STATE_UPDATE_ACTION_NAME = "internal:cluster/coordination/update_state";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those 3 are different, they are all 3 registered as transport actions and used on the wire. A common prefix for all artificial actions would be fine though, we just have not had it. We could start one now or leave this without one. I'd prefer not to reuse the space we have. I think filtering internal: is not going to be useful anyway, so many actions under that anyway.

@henningandersen
Copy link
Contributor

This class can demonstrate it works for recovery too:

Click to show class
/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the Elastic License
 * 2.0 and the Server Side Public License, v 1; you may not use this file except
 * in compliance with, at your election, the Elastic License 2.0 or the Server
 * Side Public License, v 1.
 */

package org.elasticsearch.indices.recovery;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TaskRecoveryIT extends ESIntegTestCase {

    @Override
    protected boolean addMockInternalEngine() {
        return false;
    }

    @Override
    protected Collection<Class<? extends Plugin>> nodePlugins() {
        return CollectionUtils.appendToCopy(super.nodePlugins(), TaskRecoveryIT.EngineTestPlugin.class);
    }

    public void testTaskForOngoingRecovery() throws Exception {
        String indexName = "test";
        internalCluster().startMasterOnlyNode();
        String nodeWithPrimary = internalCluster().startDataOnlyNode();
        assertAcked(
            client().admin()
                .indices()
                .prepareCreate(indexName)
                .setSettings(
                    Settings.builder()
                        .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
                        .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
                        .put("index.routing.allocation.include._name", nodeWithPrimary)
                )
        );
        try {
            String nodeWithReplica = internalCluster().startDataOnlyNode();
            assertAcked(
                client().admin()
                    .indices()
                    .prepareUpdateSettings(indexName)
                    .setSettings(
                        Settings.builder()
                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
                            .put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
                    )
            );
            assertBusy(() -> {
                List<TaskInfo> primaryTasks = client().admin().cluster().prepareListTasks(nodeWithPrimary).setActions(PeerRecoverySourceService.Actions.START_RECOVERY).get().getTasks();
                assertThat(primaryTasks.size(), equalTo(1));
                List<TaskInfo> replicaTasks =
                    client().admin().cluster().prepareListTasks(nodeWithReplica).setActions(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG).get().getTasks();
                assertThat(replicaTasks.size(), equalTo(1));
                assertThat(replicaTasks.get(0).parentTaskId(), equalTo(primaryTasks.get(0).taskId()));
            });
        } finally {
            StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false).flatMap(ps -> ps.filterPlugins(EnginePlugin.class).stream()).map(EngineTestPlugin.class::cast).forEach(EngineTestPlugin::release);
        }
        ensureGreen(indexName);
    }

    public static class EngineTestPlugin extends Plugin implements EnginePlugin {
        private final CountDownLatch latch = new CountDownLatch(1);

        public EngineTestPlugin() {

        }
        public void release() {
            latch.countDown();
        }

        @Override
        public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
            return Optional.of(config -> new InternalEngine(config) {

                @Override
                public void skipTranslogRecovery() {
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new AssertionError(e);
                    }
                    super.skipTranslogRecovery();
                }
            });
        }
    }
}

@pugnascotia
Copy link
Contributor Author

@henningandersen brilliant! Thank you very much for the test 🙏

Can I ask for a final approval now?

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the extra iterations.

@pugnascotia pugnascotia merged commit ca7c21f into elastic:master Jul 5, 2022
@pugnascotia pugnascotia deleted the apm-integration-part-1 branch July 5, 2022 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Task Management Issues for anything around the Tasks API - both persistent and node level. >refactoring Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.4.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants