Skip to content

Commit

Permalink
Fix trappy timeouts in enrich module (elastic#109136)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner authored May 29, 2024
1 parent 0e03920 commit 58cb500
Show file tree
Hide file tree
Showing 34 changed files with 280 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
}
}
]
},
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
"type":"boolean",
"default":true,
"description":"Should the request should block until the execution is complete."
},
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
"methods": [ "GET" ]
}
]
},
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
"body": {
"description": "The enrich policy to register",
"required": true
},
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
"methods": [ "GET" ]
}
]
},
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -29,8 +30,8 @@ public static class Request extends MasterNodeRequest<DeleteEnrichPolicyAction.R

private final String name;

public Request(String name) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout, String name) {
super(masterNodeTimeout);
this.name = Objects.requireNonNull(name, "name cannot be null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private EnrichStatsAction() {

public static class Request extends MasterNodeRequest<Request> {

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

public Request(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -33,8 +34,8 @@ public static class Request extends MasterNodeRequest<Request> {
private final String name;
private boolean waitForCompletion;

public Request(String name) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout, String name) {
super(masterNodeTimeout);
this.name = Objects.requireNonNull(name, "name cannot be null");
this.waitForCompletion = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -38,19 +37,14 @@ public static class Request extends MasterNodeReadRequest<Request> {

private final List<String> names;

public Request() {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
this.names = new ArrayList<>();
}

public Request(String[] names) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
this.names = Arrays.asList(names);
public Request(TimeValue masterNodeTimeout, String... names) {
super(masterNodeTimeout);
this.names = List.of(names);
}

public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readStringCollectionAsList();
this.names = in.readStringCollectionAsImmutableList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

Expand All @@ -27,17 +28,17 @@ private PutEnrichPolicyAction() {
super(NAME);
}

public static Request fromXContent(XContentParser parser, String name) throws IOException {
return new Request(name, EnrichPolicy.fromXContent(parser));
public static Request fromXContent(TimeValue masterNodeTimeout, XContentParser parser, String name) throws IOException {
return new Request(masterNodeTimeout, name, EnrichPolicy.fromXContent(parser));
}

public static class Request extends MasterNodeRequest<PutEnrichPolicyAction.Request> {

private final EnrichPolicy policy;
private final String name;

public Request(String name, EnrichPolicy policy) {
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT);
public Request(TimeValue masterNodeTimeout, String name, EnrichPolicy policy) {
super(masterNodeTimeout);
this.name = Objects.requireNonNull(name, "name cannot be null");
this.policy = policy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,34 @@ public void testEnrichAPIs() throws ExecutionException, InterruptedException {
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
.actionGet();

EnrichPolicy.NamedPolicy result = client().execute(
GetEnrichPolicyAction.INSTANCE,
new GetEnrichPolicyAction.Request(new String[] { policyName })
new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)
).actionGet().getPolicies().get(0);
assertThat(result, equalTo(new EnrichPolicy.NamedPolicy(policyName, enrichPolicy)));
String enrichIndexPrefix = EnrichPolicy.getBaseName(policyName) + "*";
refresh(enrichIndexPrefix);
assertHitCount(client().search(new SearchRequest(enrichIndexPrefix)), numDocsInSourceIndex);
}

GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request())
.actionGet();
GetEnrichPolicyAction.Response response = client().execute(
GetEnrichPolicyAction.INSTANCE,
new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT)
).actionGet();
assertThat(response.getPolicies().size(), equalTo(numPolicies));

for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(policyName)).actionGet();
client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName))
.actionGet();
}

response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request()).actionGet();
response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT)).actionGet();
assertThat(response.getPolicies().size(), equalTo(0));
}

Expand Down Expand Up @@ -188,9 +192,9 @@ public void testExecutePolicyWithDedicatedMasterNodes() throws Exception {
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME, enrichPolicy);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME);
executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
assertThat(executePolicyResponse.getStatus(), nullValue());
Expand All @@ -215,9 +219,9 @@ public void testExecutePolicyNeverOnElectedMaster() throws Exception {
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
var putPolicyRequest = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME, enrichPolicy);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(POLICY_NAME);
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, POLICY_NAME);
executePolicyRequest.setWaitForCompletion(false); // From tne returned taks id the node that executes the policy can be determined
var executePolicyResponse = client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
assertThat(executePolicyResponse.getStatus(), nullValue());
Expand Down Expand Up @@ -264,8 +268,10 @@ private static void enrich(Map<String, List<String>> keys, String coordinatingNo
}
}

EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
.actionGet();
EnrichStatsAction.Response statsResponse = client().execute(
EnrichStatsAction.INSTANCE,
new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT)
).actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = getNodeId(coordinatingNode);
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.nodeId().equals(nodeId)).findAny().get();
Expand Down Expand Up @@ -321,11 +327,11 @@ private static void createAndExecutePolicy(String policyName, String indexName)
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
final ActionFuture<ExecuteEnrichPolicyAction.Response> policyExecuteFuture = client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request(policyName)
new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)
);
// Make sure we can deserialize enrich policy execution task status
final List<TaskInfo> tasks = clusterAdmin().prepareListTasks().setActions(EnrichPolicyExecutor.TASK_ACTION).get().getTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ protected Settings nodeSettings() {

public void testEnrichCacheValuesCannotBeCorrupted() {
// Ensure enrich cache is empty
var statsRequest = new EnrichStatsAction.Request();
var statsRequest = new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT);
var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L));
Expand Down Expand Up @@ -85,9 +85,9 @@ public void testEnrichCacheValuesCannotBeCorrupted() {
client().index(indexRequest).actionGet();

// Store policy and execute it:
var putPolicyRequest = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(policyName);
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();

var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testRestart() throws Exception {
createSourceIndices(client(), enrichPolicy);
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
}

Expand All @@ -71,8 +71,10 @@ public void testRestart() throws Exception {
}

private static void verifyPolicies(int numPolicies, EnrichPolicy enrichPolicy) {
GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request())
.actionGet();
GetEnrichPolicyAction.Response response = client().execute(
GetEnrichPolicyAction.INSTANCE,
new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT)
).actionGet();
assertThat(response.getPolicies(), hasSize(numPolicies));
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void coordinatePolicyExecution(
String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp);
Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName);
try {
Request internalRequest = new Request(request.getName(), enrichIndexName);
Request internalRequest = new Request(request.masterNodeTimeout(), request.getName(), enrichIndexName);
internalRequest.setWaitForCompletion(request.isWaitForCompletion());
internalRequest.setParentTask(request.getParentTask());
client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -68,8 +69,8 @@ public static class Request extends ExecuteEnrichPolicyAction.Request {

private final String enrichIndexName;

public Request(String name, String enrichIndexName) {
super(name);
public Request(TimeValue masterNodeTimeout, String name, String enrichIndexName) {
super(masterNodeTimeout, name);
this.enrichIndexName = enrichIndexName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.DELETE;
Expand All @@ -33,8 +33,8 @@ public String getName() {
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final DeleteEnrichPolicyAction.Request request = new DeleteEnrichPolicyAction.Request(restRequest.param("name"));
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final var request = new DeleteEnrichPolicyAction.Request(RestUtils.getMasterNodeTimeout(restRequest), restRequest.param("name"));
return channel -> client.execute(DeleteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
Expand All @@ -33,8 +33,8 @@ public String getName() {
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final EnrichStatsAction.Request request = new EnrichStatsAction.Request();
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest));
return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

Expand Down
Loading

0 comments on commit 58cb500

Please sign in to comment.