Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tasks to improve APM support #87917

Merged
merged 17 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TaskRecoveryIT extends ESIntegTestCase {

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TaskRecoveryIT.EngineTestPlugin.class);
}

/**
* Checks that the parent / child task hierarchy is correct for tasks that are initiated by a recovery task.
* We use an engine plugin that stalls translog recovery, which gives us the opportunity to inspect the
* task hierarchy.
*/
public void testTaskForOngoingRecovery() throws Exception {
String indexName = "test";
internalCluster().startMasterOnlyNode();
String nodeWithPrimary = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", nodeWithPrimary)
)
);
try {
String nodeWithReplica = internalCluster().startDataOnlyNode();

// Create an index so that there is something to recover
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
)
);
// Translog recovery is stalled, so we can inspect the running tasks.
assertBusy(() -> {
List<TaskInfo> primaryTasks = client().admin()
.cluster()
.prepareListTasks(nodeWithPrimary)
.setActions(PeerRecoverySourceService.Actions.START_RECOVERY)
.get()
.getTasks();
assertThat("Expected a single primary task", primaryTasks.size(), equalTo(1));
List<TaskInfo> replicaTasks = client().admin()
.cluster()
.prepareListTasks(nodeWithReplica)
.setActions(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG)
.get()
.getTasks();
assertThat("Expected a single replica task", replicaTasks.size(), equalTo(1));
assertThat(
"Replica task's parent task ID was incorrect",
replicaTasks.get(0).parentTaskId(),
equalTo(primaryTasks.get(0).taskId())
);
});
} finally {
// Release the EngineTestPlugin, which will allow translog recovery to complete
StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
.flatMap(ps -> ps.filterPlugins(EnginePlugin.class).stream())
.map(EngineTestPlugin.class::cast)
.forEach(EngineTestPlugin::release);
}
ensureGreen(indexName);
}

/**
* An engine plugin that defers translog recovery until the engine is released via {@link #release()}.
*/
public static class EngineTestPlugin extends Plugin implements EnginePlugin {
private final CountDownLatch latch = new CountDownLatch(1);

public void release() {
latch.countDown();
}

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {

@Override
public void skipTranslogRecovery() {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.skipTranslogRecovery();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.service.BatchSummary;
import org.elasticsearch.tasks.Task;

/**
* Represents a cluster state update computed by the {@link org.elasticsearch.cluster.service.MasterService} for publication to the cluster.
Expand All @@ -24,6 +25,7 @@ public class ClusterStatePublicationEvent {
private final BatchSummary summary;
private final ClusterState oldState;
private final ClusterState newState;
private final Task task;
private final long computationTimeMillis;
private final long publicationStartTimeMillis;
private volatile long publicationContextConstructionElapsedMillis = NOT_SET;
Expand All @@ -35,12 +37,14 @@ public ClusterStatePublicationEvent(
BatchSummary summary,
ClusterState oldState,
ClusterState newState,
Task task,
long computationTimeMillis,
long publicationStartTimeMillis
) {
this.summary = summary;
this.oldState = oldState;
this.newState = newState;
this.task = task;
this.computationTimeMillis = computationTimeMillis;
this.publicationStartTimeMillis = publicationStartTimeMillis;
}
Expand All @@ -57,6 +61,10 @@ public ClusterState getNewState() {
return newState;
}

public Task getTask() {
return task;
}

public long getComputationTimeMillis() {
return computationTimeMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportException;
Expand Down Expand Up @@ -284,6 +285,7 @@ public class PublicationContext extends AbstractRefCounted {
private final DiscoveryNodes discoveryNodes;
private final ClusterState newState;
private final ClusterState previousState;
private final Task task;
private final boolean sendFullVersion;

// All the values of these maps have one ref for the context (while it's open) and one for each in-flight message.
Expand All @@ -294,6 +296,7 @@ public class PublicationContext extends AbstractRefCounted {
discoveryNodes = clusterStatePublicationEvent.getNewState().nodes();
newState = clusterStatePublicationEvent.getNewState();
previousState = clusterStatePublicationEvent.getOldState();
task = clusterStatePublicationEvent.getTask();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
}

Expand Down Expand Up @@ -421,10 +424,11 @@ private void sendClusterState(
return;
}
try {
transportService.sendRequest(
transportService.sendChildRequest(
destination,
PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, destination.getVersion()),
task,
STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, bytes::decRef),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;

public class ClusterService extends AbstractLifecycleComponent {
Expand Down Expand Up @@ -55,11 +56,11 @@ public class ClusterService extends AbstractLifecycleComponent {

private RerouteService rerouteService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) {
this(
settings,
clusterSettings,
new MasterService(settings, clusterSettings, threadPool),
new MasterService(settings, clusterSettings, threadPool, taskManager),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
);
}
Expand Down Expand Up @@ -262,5 +263,4 @@ public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
) {
masterService.submitStateUpdateTask(source, task, config, executor);
}

}
Loading