From 249b7ffe080d5e523c2e928be59fccb012b23f74 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 28 May 2024 21:47:48 +0100 Subject: [PATCH 1/2] Fix trappy timeouts in enrich module Relates #107984 --- .../api/enrich.delete_policy.json | 6 + .../api/enrich.execute_policy.json | 4 + .../rest-api-spec/api/enrich.get_policy.json | 6 + .../rest-api-spec/api/enrich.put_policy.json | 6 + .../rest-api-spec/api/enrich.stats.json | 6 + .../action/DeleteEnrichPolicyAction.java | 5 +- .../core/enrich/action/EnrichStatsAction.java | 4 +- .../action/ExecuteEnrichPolicyAction.java | 5 +- .../enrich/action/GetEnrichPolicyAction.java | 16 +- .../enrich/action/PutEnrichPolicyAction.java | 9 +- .../xpack/enrich/EnrichMultiNodeIT.java | 36 +++-- .../xpack/enrich/EnrichProcessorIT.java | 6 +- .../xpack/enrich/EnrichRestartIT.java | 8 +- .../xpack/enrich/EnrichPolicyExecutor.java | 2 +- .../action/InternalExecutePolicyAction.java | 5 +- .../rest/RestDeleteEnrichPolicyAction.java | 6 +- .../enrich/rest/RestEnrichStatsAction.java | 8 +- .../rest/RestExecuteEnrichPolicyAction.java | 6 +- .../rest/RestGetEnrichPolicyAction.java | 10 +- .../rest/RestPutEnrichPolicyAction.java | 3 +- .../xpack/enrich/BasicEnrichTests.java | 41 +++-- .../enrich/EnrichPolicyExecutorTests.java | 19 ++- .../xpack/enrich/EnrichPolicyUpdateTests.java | 9 +- .../xpack/enrich/EnrichResiliencyTests.java | 6 +- .../DeleteEnrichPolicyActionRequestTests.java | 2 +- ...ExecuteEnrichPolicyActionRequestTests.java | 2 +- .../GetEnrichPolicyActionRequestTests.java | 2 +- ...ternalExecutePolicyActionRequestTests.java | 4 +- .../PutEnrichPolicyActionRequestTests.java | 2 +- ...ransportDeleteEnrichPolicyActionTests.java | 144 +++++++++++------- .../TransportGetEnrichPolicyActionTests.java | 55 ++++--- .../esql/action/CrossClustersEnrichIT.java | 17 ++- .../xpack/esql/action/EnrichIT.java | 8 +- .../enrich/EnrichStatsCollector.java | 2 +- 34 files changed, 282 insertions(+), 188 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.delete_policy.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.delete_policy.json index 3137f6b555361..5c6b05a548987 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.delete_policy.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.delete_policy.json @@ -22,6 +22,12 @@ } } ] + }, + "params": { + "master_timeout":{ + "type":"time", + "description":"Timeout for processing on master node" + } } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.execute_policy.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.execute_policy.json index 5e4c8a2251d1d..2add255148508 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.execute_policy.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.execute_policy.json @@ -28,6 +28,10 @@ "type":"boolean", "default":true, "description":"Should the request should block until the execution is complete." + }, + "master_timeout":{ + "type":"time", + "description":"Timeout for processing on master node" } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json index a3eb51942c4fa..aed7397877393 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json @@ -26,6 +26,12 @@ "methods": [ "GET" ] } ] + }, + "params": { + "master_timeout":{ + "type":"time", + "description":"Timeout for processing on master node" + } } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.put_policy.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.put_policy.json index 0d1cefd3e40aa..287c7d96dca9d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.put_policy.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.put_policy.json @@ -27,6 +27,12 @@ "body": { "description": "The enrich policy to register", "required": true + }, + "params": { + "master_timeout":{ + "type":"time", + "description":"Timeout for processing on master node" + } } } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json index b4218acf30eac..afd314a0dc804 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json @@ -16,6 +16,12 @@ "methods": [ "GET" ] } ] + }, + "params": { + "master_timeout":{ + "type":"time", + "description":"Timeout for processing on master node" + } } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java index 82f98176838ee..64dcd5afcb544 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/DeleteEnrichPolicyAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import java.io.IOException; import java.util.Objects; @@ -29,8 +30,8 @@ public static class Request extends MasterNodeRequest { - public Request() { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); } public Request(StreamInput in) throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java index 5d629365a8096..65bae5e94852c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/ExecuteEnrichPolicyAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -33,8 +34,8 @@ public static class Request extends MasterNodeRequest { private final String name; private boolean waitForCompletion; - public Request(String name) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); + public Request(TimeValue masterNodeTimeout, String name) { + super(masterNodeTimeout); this.name = Objects.requireNonNull(name, "name cannot be null"); this.waitForCompletion = true; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java index 37851a3641ebd..7f138dec7ee23 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java @@ -12,13 +12,12 @@ import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -38,19 +37,14 @@ public static class Request extends MasterNodeReadRequest { private final List names; - public Request() { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); - this.names = new ArrayList<>(); - } - - public Request(String[] names) { - super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT); - this.names = Arrays.asList(names); + public Request(TimeValue masterNodeTimeout, String... names) { + super(masterNodeTimeout); + this.names = List.of(names); } public Request(StreamInput in) throws IOException { super(in); - this.names = in.readStringCollectionAsList(); + this.names = in.readStringCollectionAsImmutableList(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java index d1031828e0522..6a6b6ff34d60d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/PutEnrichPolicyAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -27,8 +28,8 @@ private PutEnrichPolicyAction() { super(NAME); } - public static Request fromXContent(XContentParser parser, String name) throws IOException { - return new Request(name, EnrichPolicy.fromXContent(parser)); + public static Request fromXContent(TimeValue masterNodeTimeout, XContentParser parser, String name) throws IOException { + return new Request(masterNodeTimeout, name, EnrichPolicy.fromXContent(parser)); } public static class Request extends MasterNodeRequest { @@ -36,8 +37,8 @@ public static class Request extends MasterNodeRequest> keys, String coordinatingNo } } - EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()) - .actionGet(); + EnrichStatsAction.Response statsResponse = client().execute( + EnrichStatsAction.INSTANCE, + new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT) + ).actionGet(); assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size())); String nodeId = getNodeId(coordinatingNode); CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.nodeId().equals(nodeId)).findAny().get(); @@ -321,11 +327,11 @@ private static void createAndExecutePolicy(String policyName, String indexName) MATCH_FIELD, List.of(DECORATE_FIELDS) ); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); final ActionFuture policyExecuteFuture = client().execute( ExecuteEnrichPolicyAction.INSTANCE, - new ExecuteEnrichPolicyAction.Request(policyName) + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName) ); // Make sure we can deserialize enrich policy execution task status final List tasks = clusterAdmin().prepareListTasks().setActions(EnrichPolicyExecutor.TASK_ACTION).get().getTasks(); diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java index f3d2403ce5d96..d646aed11d7d9 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorIT.java @@ -49,7 +49,7 @@ protected Settings nodeSettings() { public void testEnrichCacheValuesCannotBeCorrupted() { // Ensure enrich cache is empty - var statsRequest = new EnrichStatsAction.Request(); + var statsRequest = new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT); var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet(); assertThat(statsResponse.getCacheStats().size(), equalTo(1)); assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L)); @@ -85,9 +85,9 @@ public void testEnrichCacheValuesCannotBeCorrupted() { client().index(indexRequest).actionGet(); // Store policy and execute it: - var putPolicyRequest = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet(); - var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(policyName); + var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName); client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet(); var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray(""" diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java index 86d18bcbbbbc4..9a77bea4ab78a 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java @@ -60,7 +60,7 @@ public void testRestart() throws Exception { createSourceIndices(client(), enrichPolicy); for (int i = 0; i < numPolicies; i++) { String policyName = POLICY_NAME + i; - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); } @@ -71,8 +71,10 @@ public void testRestart() throws Exception { } private static void verifyPolicies(int numPolicies, EnrichPolicy enrichPolicy) { - GetEnrichPolicyAction.Response response = client().execute(GetEnrichPolicyAction.INSTANCE, new GetEnrichPolicyAction.Request()) - .actionGet(); + GetEnrichPolicyAction.Response response = client().execute( + GetEnrichPolicyAction.INSTANCE, + new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT) + ).actionGet(); assertThat(response.getPolicies(), hasSize(numPolicies)); for (int i = 0; i < numPolicies; i++) { String policyName = POLICY_NAME + i; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index ecb03615307f9..2ebe268cc788d 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -85,7 +85,7 @@ public void coordinatePolicyExecution( String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp); Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName); try { - Request internalRequest = new Request(request.getName(), enrichIndexName); + Request internalRequest = new Request(request.masterNodeTimeout(), request.getName(), enrichIndexName); internalRequest.setWaitForCompletion(request.isWaitForCompletion()); internalRequest.setParentTask(request.getParentTask()); client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java index 769a86c5ec5b1..ed28599da9fbb 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskCancelledException; @@ -68,8 +69,8 @@ public static class Request extends ExecuteEnrichPolicyAction.Request { private final String enrichIndexName; - public Request(String name, String enrichIndexName) { - super(name); + public Request(TimeValue masterNodeTimeout, String name, String enrichIndexName) { + super(masterNodeTimeout, name); this.enrichIndexName = enrichIndexName; } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestDeleteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestDeleteEnrichPolicyAction.java index 26597f86b833c..810ec48edee8e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestDeleteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestDeleteEnrichPolicyAction.java @@ -9,12 +9,12 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; -import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.DELETE; @@ -33,8 +33,8 @@ public String getName() { } @Override - protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - final DeleteEnrichPolicyAction.Request request = new DeleteEnrichPolicyAction.Request(restRequest.param("name")); + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final var request = new DeleteEnrichPolicyAction.Request(RestUtils.getMasterNodeTimeout(restRequest), restRequest.param("name")); return channel -> client.execute(DeleteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java index e666319b563ea..8f61e4d398064 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java @@ -9,12 +9,12 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; -import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -33,8 +33,10 @@ public String getName() { } @Override - protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - final EnrichStatsAction.Request request = new EnrichStatsAction.Request(); + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest) + // NEW + ); return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java index 15f5bdb736621..523e0bf25a71f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestExecuteEnrichPolicyAction.java @@ -9,12 +9,12 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; -import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -34,8 +34,8 @@ public String getName() { } @Override - protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - final ExecuteEnrichPolicyAction.Request request = new ExecuteEnrichPolicyAction.Request(restRequest.param("name")); + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final var request = new ExecuteEnrichPolicyAction.Request(RestUtils.getMasterNodeTimeout(restRequest), restRequest.param("name")); request.setWaitForCompletion(restRequest.paramAsBoolean("wait_for_completion", true)); return channel -> client.execute(ExecuteEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java index 79dcd9315652f..2fb9f63c1eb4a 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java @@ -10,12 +10,12 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; -import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -34,9 +34,11 @@ public String getName() { } @Override - protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - String[] names = Strings.splitStringByCommaToArray(restRequest.param("name")); - final GetEnrichPolicyAction.Request request = new GetEnrichPolicyAction.Request(names); + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final var request = new GetEnrichPolicyAction.Request( + RestUtils.getMasterNodeTimeout(restRequest), + Strings.splitStringByCommaToArray(restRequest.param("name")) + ); return channel -> client.execute(GetEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestPutEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestPutEnrichPolicyAction.java index fb1522441fe43..f172d2e0cf411 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestPutEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestPutEnrichPolicyAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; @@ -41,7 +42,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina static PutEnrichPolicyAction.Request createRequest(RestRequest restRequest) throws IOException { try (XContentParser parser = restRequest.contentOrSourceParamParser()) { - return PutEnrichPolicyAction.fromXContent(parser, restRequest.param("name")); + return PutEnrichPolicyAction.fromXContent(RestUtils.getMasterNodeTimeout(restRequest), parser, restRequest.param("name")); } } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index e3822b366e122..d17728fdd8037 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -93,9 +93,10 @@ public void testIngestDataWithMatchProcessor() { MATCH_FIELD, List.of(DECORATE_FIELDS) ); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)) + .actionGet(); String pipelineName = "my-pipeline"; String pipelineBody = Strings.format(""" @@ -146,8 +147,10 @@ public void testIngestDataWithMatchProcessor() { } } - EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()) - .actionGet(); + EnrichStatsAction.Response statsResponse = client().execute( + EnrichStatsAction.INSTANCE, + new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT) + ).actionGet(); assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1)); String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId(); assertThat(statsResponse.getCoordinatorStats().get(0).nodeId(), equalTo(localNodeId)); @@ -186,9 +189,10 @@ public void testIngestDataWithGeoMatchProcessor() { matchField, List.of(enrichField) ); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)) + .actionGet(); String pipelineName = "my-pipeline"; String pipelineBody = Strings.format(""" @@ -226,8 +230,10 @@ public void testIngestDataWithGeoMatchProcessor() { assertThat(entries.containsKey(matchField), is(true)); assertThat(entries.get(enrichField), equalTo("94040")); - EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()) - .actionGet(); + EnrichStatsAction.Response statsResponse = client().execute( + EnrichStatsAction.INSTANCE, + new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT) + ).actionGet(); assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1)); String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId(); assertThat(statsResponse.getCoordinatorStats().get(0).nodeId(), equalTo(localNodeId)); @@ -246,9 +252,10 @@ public void testMultiplePolicies() { client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet(); EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-" + i), "key", List.of("value")); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)) + .actionGet(); String pipelineName = "pipeline" + i; String pipelineBody = Strings.format(""" @@ -290,11 +297,11 @@ public void testAsyncTaskExecute() throws Exception { } EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value")); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); ExecuteEnrichPolicyAction.Response executeResponse = client().execute( ExecuteEnrichPolicyAction.INSTANCE, - new ExecuteEnrichPolicyAction.Request(policyName).setWaitForCompletion(false) + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName).setWaitForCompletion(false) ).actionGet(); assertThat(executeResponse.getStatus(), is(nullValue())); @@ -346,9 +353,10 @@ public void testTemplating() throws Exception { MATCH_FIELD, List.of(DECORATE_FIELDS) ); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)) + .actionGet(); String pipelineName = "my-pipeline"; String pipelineBody = Strings.format( @@ -384,9 +392,10 @@ public void testFailureAfterEnrich() throws Exception { MATCH_FIELD, Arrays.asList(DECORATE_FIELDS) ); - PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName)) + .actionGet(); // A pipeline with a foreach that uses a non existing field that is specified after enrich has run: String pipelineName = "my-pipeline"; diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index 9f0b18679666b..06f9eb21fe2dc 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -88,7 +88,7 @@ public void testNonConcurrentPolicyCoordination() throws InterruptedException { // Launch a fake policy run that will block until firstTaskBlock is counted down. final CountDownLatch firstTaskComplete = new CountDownLatch(1); testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyName), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName), new LatchedActionListener<>(ActionListener.noop(), firstTaskComplete) ); @@ -97,7 +97,10 @@ public void testNonConcurrentPolicyCoordination() throws InterruptedException { EsRejectedExecutionException.class, "Expected exception but nothing was thrown", () -> { - testExecutor.coordinatePolicyExecution(new ExecuteEnrichPolicyAction.Request(testPolicyName), ActionListener.noop()); + testExecutor.coordinatePolicyExecution( + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName), + ActionListener.noop() + ); // Should throw exception on the previous statement, but if it doesn't, be a // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions latch.countDown(); @@ -118,7 +121,7 @@ public void testNonConcurrentPolicyCoordination() throws InterruptedException { // Ensure that the lock from the previous run has been cleared CountDownLatch secondTaskComplete = new CountDownLatch(1); testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyName), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName), new LatchedActionListener<>(ActionListener.noop(), secondTaskComplete) ); secondTaskComplete.await(); @@ -144,13 +147,13 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent final CountDownLatch firstTaskComplete = new CountDownLatch(1); testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "1"), new LatchedActionListener<>(ActionListener.noop(), firstTaskComplete) ); final CountDownLatch secondTaskComplete = new CountDownLatch(1); testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "2"), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "2"), new LatchedActionListener<>(ActionListener.noop(), secondTaskComplete) ); @@ -160,7 +163,7 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { "Expected exception but nothing was thrown", () -> { testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "3"), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "3"), ActionListener.noop() ); // Should throw exception on the previous statement, but if it doesn't, be a @@ -188,7 +191,7 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException { assertThat(locks.lockedPolices(), is(empty())); CountDownLatch finalTaskComplete = new CountDownLatch(1); testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyBaseName + "1"), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyBaseName + "1"), new LatchedActionListener<>(ActionListener.noop(), finalTaskComplete) ); finalTaskComplete.await(); @@ -279,7 +282,7 @@ protected void // Launch a fake policy run that will block until firstTaskBlock is counted down. PlainActionFuture firstTaskResult = new PlainActionFuture<>(); testExecutor.coordinatePolicyExecution( - new ExecuteEnrichPolicyAction.Request(testPolicyName).setWaitForCompletion(false), + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, testPolicyName).setWaitForCompletion(false), firstTaskResult ); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 1e4426661e06c..b015e97909179 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -52,11 +52,11 @@ public void testUpdatePolicyOnly() { EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index"), "key1", List.of("field1")); createSourceIndices(client(), instance1); - PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1); + PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "my_policy", instance1); assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); assertThat( "Execute failed", - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")) + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "my_policy")) .actionGet() .getStatus() .isCompleted(), @@ -74,7 +74,10 @@ public void testUpdatePolicyOnly() { createSourceIndices(client(), instance2); ResourceAlreadyExistsException exc = expectThrows( ResourceAlreadyExistsException.class, - client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2)) + client().execute( + PutEnrichPolicyAction.INSTANCE, + new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "my_policy", instance2) + ) ); assertTrue(exc.getMessage().contains("policy [my_policy] already exists")); } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java index 6c62d7a315872..3a2bfd87cff14 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java @@ -80,6 +80,7 @@ public void testWriteThreadLivenessBackToBack() throws Exception { client().execute( PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request( + TEST_REQUEST_TIMEOUT, enrichPolicyName, new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value")) ) @@ -87,7 +88,7 @@ public void testWriteThreadLivenessBackToBack() throws Exception { client().execute( ExecuteEnrichPolicyAction.INSTANCE, - new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true) + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, enrichPolicyName).setWaitForCompletion(true) ).actionGet(); XContentBuilder pipe1 = JsonXContent.contentBuilder(); @@ -179,6 +180,7 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { client().execute( PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request( + TEST_REQUEST_TIMEOUT, enrichPolicyName, new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value")) ) @@ -186,7 +188,7 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { client().execute( ExecuteEnrichPolicyAction.INSTANCE, - new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true) + new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, enrichPolicyName).setWaitForCompletion(true) ).actionGet(); XContentBuilder pipe1 = JsonXContent.contentBuilder(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/DeleteEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/DeleteEnrichPolicyActionRequestTests.java index 2e778d6b62215..e9cd348bf595e 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/DeleteEnrichPolicyActionRequestTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/DeleteEnrichPolicyActionRequestTests.java @@ -13,7 +13,7 @@ public class DeleteEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase { @Override protected DeleteEnrichPolicyAction.Request createTestInstance() { - return new DeleteEnrichPolicyAction.Request(randomAlphaOfLength(4)); + return new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(4)); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java index a945f49ac97d2..08d156e1012cf 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/ExecuteEnrichPolicyActionRequestTests.java @@ -14,7 +14,7 @@ public class ExecuteEnrichPolicyActionRequestTests extends AbstractWireSerializi @Override protected ExecuteEnrichPolicyAction.Request createTestInstance() { - return new ExecuteEnrichPolicyAction.Request(randomAlphaOfLength(3)); + return new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(3)); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java index f84b72727bca3..051eadac48467 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java @@ -14,7 +14,7 @@ public class GetEnrichPolicyActionRequestTests extends AbstractWireSerializingTe @Override protected GetEnrichPolicyAction.Request createTestInstance() { - return new GetEnrichPolicyAction.Request(generateRandomStringArray(0, 4, false)); + return new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, generateRandomStringArray(0, 4, false)); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java index 1a7bf20466ca1..68d0517a28404 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyActionRequestTests.java @@ -19,7 +19,7 @@ protected Writeable.Reader instanceReader() { @Override protected Request createTestInstance() { - Request request = new Request(randomAlphaOfLength(3), randomAlphaOfLength(5)); + Request request = new Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(3), randomAlphaOfLength(5)); if (randomBoolean()) { request.setWaitForCompletion(true); } @@ -39,7 +39,7 @@ protected Request mutateInstance(Request instance) { default -> throw new AssertionError("Illegal randomisation branch"); } - Request request = new Request(policyName, enrichIndexName); + Request request = new Request(TEST_REQUEST_TIMEOUT, policyName, enrichIndexName); request.setWaitForCompletion(waitForCompletion); return request; } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/PutEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/PutEnrichPolicyActionRequestTests.java index 7675524435d26..c2f698c323004 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/PutEnrichPolicyActionRequestTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/PutEnrichPolicyActionRequestTests.java @@ -19,7 +19,7 @@ public class PutEnrichPolicyActionRequestTests extends AbstractWireSerializingTe @Override protected PutEnrichPolicyAction.Request createTestInstance() { final EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON); - return new PutEnrichPolicyAction.Request(randomAlphaOfLength(3), policy); + return new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, randomAlphaOfLength(3), policy); } @Override diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java index 84700308662b9..32f39b0de1ef4 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java @@ -60,17 +60,22 @@ public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedExcept final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(fakeId), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - fail(); - } + ActionTestUtils.execute( + transportAction, + null, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, fakeId), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + fail(); + } - public void onFailure(final Exception e) { - reference.set(e); - latch.countDown(); + public void onFailure(final Exception e) { + reference.set(e); + latch.countDown(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); assertThat(reference.get(), instanceOf(ResourceNotFoundException.class)); @@ -92,17 +97,22 @@ public void testDeleteWithoutIndex() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - reference.set(acknowledgedResponse); - latch.countDown(); - } + ActionTestUtils.execute( + transportAction, + null, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + reference.set(acknowledgedResponse); + latch.countDown(); + } - public void onFailure(final Exception e) { - fail(); + public void onFailure(final Exception e) { + fail(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); assertTrue(reference.get().isAcknowledged()); @@ -137,17 +147,22 @@ public void testDeleteIsNotLocked() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - reference.set(acknowledgedResponse); - latch.countDown(); - } + ActionTestUtils.execute( + transportAction, + null, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + reference.set(acknowledgedResponse); + latch.countDown(); + } - public void onFailure(final Exception e) { - fail(); + public void onFailure(final Exception e) { + fail(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); assertTrue(reference.get().isAcknowledged()); @@ -188,17 +203,22 @@ public void testDeleteLocked() throws InterruptedException { { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); - ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - fail(); - } - - public void onFailure(final Exception e) { - reference.set(e); - latch.countDown(); + ActionTestUtils.execute( + transportAction, + null, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + fail(); + } + + public void onFailure(final Exception e) { + reference.set(e); + latch.countDown(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); assertThat(reference.get(), instanceOf(EsRejectedExecutionException.class)); @@ -214,17 +234,22 @@ public void onFailure(final Exception e) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); - ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - reference.set(acknowledgedResponse); - latch.countDown(); - } - - public void onFailure(final Exception e) { - fail(); + ActionTestUtils.execute( + transportAction, + null, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + reference.set(acknowledgedResponse); + latch.countDown(); + } + + public void onFailure(final Exception e) { + fail(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); assertTrue(reference.get().isAcknowledged()); @@ -256,17 +281,22 @@ public void testDeletePolicyPrefixes() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); - ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - reference.set(acknowledgedResponse); - latch.countDown(); - } - - public void onFailure(final Exception e) { - fail(); + ActionTestUtils.execute( + transportAction, + null, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + reference.set(acknowledgedResponse); + latch.countDown(); + } + + public void onFailure(final Exception e) { + fail(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); assertTrue(reference.get().isAcknowledged()); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java index 1a95627e9438d..6a3c1eb2555b1 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java @@ -34,7 +34,7 @@ public void cleanupPolicies() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(), new ActionListener<>() { + ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { reference.set(response); @@ -74,24 +74,18 @@ public void testListPolicies() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute( - transportAction, - null, - // empty or null should return the same - randomBoolean() ? new GetEnrichPolicyAction.Request() : new GetEnrichPolicyAction.Request(new String[] {}), - new ActionListener<>() { - @Override - public void onResponse(GetEnrichPolicyAction.Response response) { - reference.set(response); - latch.countDown(); + ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { + @Override + public void onResponse(GetEnrichPolicyAction.Response response) { + reference.set(response); + latch.countDown(); - } + } - public void onFailure(final Exception e) { - fail(); - } + public void onFailure(final Exception e) { + fail(); } - ); + }); latch.await(); assertNotNull(reference.get()); GetEnrichPolicyAction.Response response = reference.get(); @@ -107,7 +101,7 @@ public void testListEmptyPolicies() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(), new ActionListener<>() { + ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { reference.set(response); @@ -141,17 +135,22 @@ public void testGetPolicy() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(new String[] { name }), new ActionListener<>() { - @Override - public void onResponse(GetEnrichPolicyAction.Response response) { - reference.set(response); - latch.countDown(); - } + ActionTestUtils.execute( + transportAction, + null, + new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), + new ActionListener<>() { + @Override + public void onResponse(GetEnrichPolicyAction.Response response) { + reference.set(response); + latch.countDown(); + } - public void onFailure(final Exception e) { - fail(); + public void onFailure(final Exception e) { + fail(); + } } - }); + ); latch.await(); assertNotNull(reference.get()); GetEnrichPolicyAction.Response response = reference.get(); @@ -186,7 +185,7 @@ public void testGetMultiplePolicies() throws InterruptedException { ActionTestUtils.execute( transportAction, null, - new GetEnrichPolicyAction.Request(new String[] { name, anotherName }), + new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name, anotherName), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { @@ -220,7 +219,7 @@ public void testGetPolicyThrowsError() throws InterruptedException { ActionTestUtils.execute( transportAction, null, - new GetEnrichPolicyAction.Request(new String[] { "non-exists" }), + new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "non-exists"), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java index 12708fb626c36..147e62f7ee3bc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java @@ -118,8 +118,10 @@ public void setupHostsEnrich() { client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); } client.admin().indices().prepareRefresh("hosts").get(); - client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("hosts", hostPolicy)).actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("hosts")).actionGet(); + client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy)) + .actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) + .actionGet(); assertAcked(client.admin().indices().prepareDelete("hosts")); } } @@ -137,8 +139,10 @@ public void setupVendorPolicy() { client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get(); } client.admin().indices().prepareRefresh("vendors").get(); - client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("vendors", vendorPolicy)).actionGet(); - client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("vendors")).actionGet(); + client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy)) + .actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors")) + .actionGet(); assertAcked(client.admin().indices().prepareDelete("vendors")); } } @@ -195,7 +199,10 @@ public void wipeEnrichPolicies() { for (String cluster : allClusters()) { cluster(cluster).wipe(Set.of()); for (String policy : List.of("hosts", "vendors")) { - client(cluster).execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(policy)); + client(cluster).execute( + DeleteEnrichPolicyAction.INSTANCE, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy) + ); } } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index 1298e3374665b..5806cb8ef0982 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -166,15 +166,17 @@ record Song(String id, String title, String artist, double length) { client().prepareIndex("songs").setSource("song_id", s.id, "title", s.title, "artist", s.artist, "length", s.length).get(); } client().admin().indices().prepareRefresh("songs").get(); - client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("songs", policy)).actionGet(); - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("songs")).actionGet(); + client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "songs", policy)) + .actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "songs")) + .actionGet(); assertAcked(client().admin().indices().prepareDelete("songs")); } @After public void cleanEnrichPolicies() { cluster().wipe(Set.of()); - client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request("songs")); + client().execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "songs")); } @Before diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java index 234bc8f72a52b..b9743f022da84 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollector.java @@ -52,7 +52,7 @@ protected Collection doCollect(MonitoringDoc.Node node, long inte final long timestamp = timestamp(); final String clusterUuid = clusterUuid(clusterState); - final EnrichStatsAction.Request request = new EnrichStatsAction.Request(); + final EnrichStatsAction.Request request = new EnrichStatsAction.Request(getCollectionTimeout()); final EnrichStatsAction.Response response = client.execute(EnrichStatsAction.INSTANCE, request) .actionGet(getCollectionTimeout()); From 4f1f69b44fa94c919544c4a6f6e615f51fb6045c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 28 May 2024 22:25:28 +0100 Subject: [PATCH 2/2] Remove note-to-self --- .../xpack/enrich/rest/RestEnrichStatsAction.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java index 8f61e4d398064..3d64e7c1380fe 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java @@ -34,9 +34,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { - final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest) - // NEW - ); + final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest)); return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); }