Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for cluster to recover before resolving index template #99797

Merged
merged 8 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/99797.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99797
summary: Wait for cluster to recover before resolving index template
area: CRUD
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -24,6 +31,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -262,4 +270,86 @@ public void testWithDedicatedIngestNode() throws Exception {
assertThat(source.get("y"), equalTo(0));
}

public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
internalCluster().startNode();

final var pipeline = new BytesArray("""
{
"processors" : [
{
"set": {
"field": "value",
"value": 42
}
}
]
}""");
final TimeValue timeout = TimeValue.timeValueSeconds(10);
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout);
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
{
"index" : {
"default_pipeline" : "test_pipeline"
}
}
""", XContentType.JSON).get(timeout);

internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
return Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "2").build();
}

@Override
public boolean validateClusterForming() {
return randomBoolean();
}
});

// this one should fail
assertThat(
expectThrows(
ClusterBlockException.class,
() -> client().prepareIndex("index")
.setId("fails")
.setSource("x", 1)
.setTimeout(TimeValue.timeValueMillis(100)) // 100ms, to fail quickly
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get(timeout)
).getMessage(),
equalTo("blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];")
);

// but this one should pass since it has a longer timeout
final PlainActionFuture<IndexResponse> future = new PlainActionFuture<>();
client().prepareIndex("index")
.setId("passes1")
.setSource("x", 2)
.setTimeout(TimeValue.timeValueSeconds(60)) // wait for second node to start in below
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(future);

// so the cluster state can be recovered
internalCluster().startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "1"));
ensureYellow("index");

final IndexResponse indexResponse = future.actionGet(timeout);
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

client().prepareIndex("index").setId("passes2").setSource("x", 3).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// successfully indexed documents should have the value field set by the pipeline
Map<String, Object> source = client().prepareGet("index", "passes1").get(timeout).getSource();
assertThat(source.get("x"), equalTo(2));
assertThat(source.get("value"), equalTo(42));

source = client().prepareGet("index", "passes2").get(timeout).getSource();
assertThat(source.get("x"), equalTo(3));
assertThat(source.get("value"), equalTo(42));

// and make sure this failed doc didn't get through
source = client().prepareGet("index", "fails").get(timeout).getSource();
assertNull(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public static <Response extends ReplicationResponse & WriteResponse> ActionListe
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
/*
* This is called on the Transport tread so we can check the indexing
* This is called on the Transport thread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
Expand All @@ -212,6 +212,52 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executorName, releasingListener);
}

private void ensureClusterStateThenForkAndExecute(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> releasingListener
) {
final ClusterState initialState = clusterService.state();
final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (false == blockException.retryable()) {
releasingListener.onFailure(blockException);
return;
}
logger.trace("cluster is blocked, waiting for it to recover", blockException);
final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(
initialState,
clusterService,
bulkRequest.timeout(),
logger,
threadPool.getThreadContext()
);
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, executorName, releasingListener);
}

@Override
public void onClusterServiceClose() {
releasingListener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
releasingListener.onFailure(blockException);
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
} else {
forkAndExecute(task, bulkRequest, executorName, releasingListener);
}
}

private void forkAndExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> releasingListener) {
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -95,6 +96,7 @@ private void indicesThatCannotBeCreatedTestCase(
ClusterState state = mock(ClusterState.class);
when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.blocks()).thenReturn(mock(ClusterBlocks.class));
when(clusterService.state()).thenReturn(state);
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -218,6 +219,7 @@ public void setupAction() {
.build();
when(state.getMetadata()).thenReturn(metadata);
when(state.metadata()).thenReturn(metadata);
when(state.blocks()).thenReturn(mock(ClusterBlocks.class));
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down