From 0a7a199a0cd99324f86f135816bf705eb877c0b8 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 24 Nov 2021 10:09:30 +0100 Subject: [PATCH 01/28] rewrites checkpointing as internal actions, reducing several sub-calls to only 1 per data node that has at least 1 primary shard of the indexes of interest. --- .../transform/action/GetCheckpointAction.java | 113 +++++++++++++ .../action/GetCheckpointNodeAction.java | 127 +++++++++++++++ .../GetCheckpointActionRequestTests.java | 67 ++++++++ .../GetCheckpointActionResponseTests.java | 44 ++++++ .../GetCheckpointNodeActionRequestTests.java | 55 +++++++ .../GetCheckpointNodeActionResponseTests.java | 44 ++++++ .../xpack/security/operator/Constants.java | 2 + .../xpack/transform/Transform.java | 10 +- .../action/TransportGetCheckpointAction.java | 149 ++++++++++++++++++ .../TransportGetCheckpointNodeAction.java | 60 +++++++ .../checkpoint/DefaultCheckpointProvider.java | 56 ++++++- 11 files changed, 725 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionResponseTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionResponseTests.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java new file mode 100644 index 000000000000..79fa53ac6a49 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +public class GetCheckpointAction extends ActionType { + + public static final GetCheckpointAction INSTANCE = new GetCheckpointAction(); + public static final String NAME = "cluster:internal/transform/checkpoint"; + + private GetCheckpointAction() { + super(NAME, GetCheckpointAction.Response::new); + } + + public static class Request extends ActionRequest implements IndicesRequest { + + private final String[] indices; + private final IndicesOptions indicesOptions; + + public Request(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } + + public Request(String[] indices, IndicesOptions indicesOptions) { + this.indices = indices; + this.indicesOptions = indicesOptions; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + } + + public static class Response extends ActionResponse { + + private final Map checkpoints; + + public Response(Map checkpoints) { + this.checkpoints = checkpoints; + } + + public Response(StreamInput in) throws IOException { + this.checkpoints = in.readOrderedMap(StreamInput::readString, StreamInput::readLongArray); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(getCheckpoints(), StreamOutput::writeString, StreamOutput::writeLongArray); + } + + public Map getCheckpoints() { + return Collections.unmodifiableMap(checkpoints); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Response that = (Response) obj; + + return this.checkpoints.size() == that.checkpoints.size() + && this.checkpoints.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), that.checkpoints.get(e.getKey()))); + } + + @Override + public int hashCode() { + int hash = 1; + + for (Entry e : checkpoints.entrySet()) { + hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue())); + } + + return hash; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java new file mode 100644 index 000000000000..302ee9d59135 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +public class GetCheckpointNodeAction extends ActionType { + + public static final GetCheckpointNodeAction INSTANCE = new GetCheckpointNodeAction(); + public static final String NAME = "cluster:internal/transform/checkpoint[s]"; + + private GetCheckpointNodeAction() { + super(NAME, GetCheckpointNodeAction.Response::new); + } + + public static class Response extends ActionResponse { + private final Map checkpoints; + + public Response(Map checkpoints) { + this.checkpoints = checkpoints; + } + + public Response(StreamInput in) throws IOException { + this.checkpoints = in.readOrderedMap(StreamInput::readString, StreamInput::readLongArray); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(getCheckpoints(), StreamOutput::writeString, StreamOutput::writeLongArray); + } + + public Map getCheckpoints() { + return checkpoints; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Response that = (Response) obj; + + return this.checkpoints.size() == that.checkpoints.size() + && this.checkpoints.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), that.checkpoints.get(e.getKey()))); + } + + @Override + public int hashCode() { + int hash = 1; + + for (Entry e : checkpoints.entrySet()) { + hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue())); + } + + return hash; + } + } + + public static class Request extends ActionRequest { + + private final Set shards; + + public Request(Set shards) { + this.shards = shards; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.shards = Collections.unmodifiableSet(in.readSet(ShardId::new)); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeCollection(shards); + } + + public Set getShards() { + return shards; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Request that = (Request) obj; + + return Objects.equals(shards, that.shards); + } + + @Override + public int hashCode() { + return Objects.hash(shards); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java new file mode 100644 index 000000000000..51927906e1cc --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +public class GetCheckpointActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Request createTestInstance() { + return new Request( + generateRandomStringArray(10, 10, false, false), + IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + Boolean.toString(randomBoolean()), + SearchRequest.DEFAULT_INDICES_OPTIONS + ) + ); + } + + @Override + protected Reader instanceReader() { + return Request::new; + } + + @Override + protected Request mutateInstance(Request instance) throws IOException { + List indices = new ArrayList<>(Arrays.asList(instance.indices())); + IndicesOptions indicesOptions = instance.indicesOptions(); + + switch (between(0, 1)) { + case 0: + indices.add(randomAlphaOfLengthBetween(1, 20)); + break; + case 1: + indicesOptions = IndicesOptions.fromParameters( + randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), + Boolean.toString(instance.indicesOptions().ignoreUnavailable() == false), + Boolean.toString(instance.indicesOptions().allowNoIndices() == false), + Boolean.toString(instance.indicesOptions().ignoreThrottled() == false), + SearchRequest.DEFAULT_INDICES_OPTIONS + ); + break; + default: + throw new AssertionError("Illegal randomization branch"); + } + + return new Request(indices.toArray(new String[0]), indicesOptions); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionResponseTests.java new file mode 100644 index 000000000000..fdb7a59a4792 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionResponseTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Response; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class GetCheckpointActionResponseTests extends AbstractWireSerializingTestCase { + + public static Response randomCheckpointResponse() { + Map checkpointsByIndex = new TreeMap<>(); + int indices = randomIntBetween(1, 10); + for (int i = 0; i < indices; ++i) { + List checkpoints = new ArrayList<>(); + int shards = randomIntBetween(1, 20); + for (int j = 0; j < shards; ++j) { + checkpoints.add(randomLongBetween(0, 1_000_000)); + } + checkpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), checkpoints.stream().mapToLong(l -> l).toArray()); + } + return new Response(checkpointsByIndex); + } + + @Override + protected Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + return randomCheckpointResponse(); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java new file mode 100644 index 000000000000..7e3f91942ebc --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction.Request; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +public class GetCheckpointNodeActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + Set shards = new HashSet<>(); + int numberOfRandomShardIds = randomInt(10); + + for (int i = 0; i < numberOfRandomShardIds; ++i) { + shards.add(new ShardId(randomAlphaOfLength(4) + i, randomAlphaOfLength(4), randomInt(5))); + } + + return new Request(shards); + } + + @Override + protected Request mutateInstance(Request instance) throws IOException { + Set shards = new HashSet<>(instance.getShards()); + + if (randomBoolean() && shards.size() > 0) { + ShardId firstShard = shards.iterator().next(); + shards.remove(firstShard); + if (randomBoolean()) { + shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5))); + } + } else { + shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5))); + } + + return new Request(shards); + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionResponseTests.java new file mode 100644 index 000000000000..f189a4f0faae --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionResponseTests.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction.Response; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GetCheckpointNodeActionResponseTests extends AbstractWireSerializingTestCase { + + public static Response randomNodeCheckpointResponse() { + Map checkpointsByIndex = new HashMap<>(); + int indices = randomIntBetween(1, 10); + for (int i = 0; i < indices; ++i) { + List checkpoints = new ArrayList<>(); + int shards = randomIntBetween(1, 20); + for (int j = 0; j < shards; ++j) { + checkpoints.add(randomLongBetween(0, 1_000_000)); + } + checkpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), checkpoints.stream().mapToLong(l -> l).toArray()); + } + return new Response(checkpointsByIndex); + } + + @Override + protected Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + return randomNodeCheckpointResponse(); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 1cff2e41c26f..d03156c89527 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -95,6 +95,8 @@ public class Constants { "cluster:admin/transform/update", "cluster:admin/transform/upgrade", "cluster:admin/transform/validate", + "cluster:internal/transform/checkpoint[s]", + "cluster:internal/transform/checkpoint", // "cluster:admin/voting_config/add_exclusions", // "cluster:admin/voting_config/clear_exclusions", "cluster:admin/xpack/ccr/auto_follow_pattern/activate", diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 1cde900c3b54..3c9b3f359602 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -60,6 +60,8 @@ import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; import org.elasticsearch.xpack.core.transform.action.GetTransformAction; import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; @@ -72,6 +74,8 @@ import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; import org.elasticsearch.xpack.transform.action.TransportDeleteTransformAction; +import org.elasticsearch.xpack.transform.action.TransportGetCheckpointAction; +import org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction; import org.elasticsearch.xpack.transform.action.TransportGetTransformAction; import org.elasticsearch.xpack.transform.action.TransportGetTransformStatsAction; import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction; @@ -188,10 +192,14 @@ public List getRestHandlers( new ActionHandler<>(PreviewTransformAction.INSTANCE, TransportPreviewTransformAction.class), new ActionHandler<>(UpdateTransformAction.INSTANCE, TransportUpdateTransformAction.class), new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class), - new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class), new ActionHandler<>(UpgradeTransformsAction.INSTANCE, TransportUpgradeTransformsAction.class), new ActionHandler<>(ResetTransformAction.INSTANCE, TransportResetTransformAction.class), + // internal, no rest endpoint + new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class), + new ActionHandler<>(GetCheckpointAction.INSTANCE, TransportGetCheckpointAction.class), + new ActionHandler<>(GetCheckpointNodeAction.INSTANCE, TransportGetCheckpointNodeAction.class), + // usage and info new ActionHandler<>(XPackUsageFeatureAction.TRANSFORM, TransformUsageTransportAction.class), new ActionHandler<>(XPackInfoFeatureAction.TRANSFORM, TransformInfoTransportAction.class) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java new file mode 100644 index 000000000000..815498136ed5 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Response; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +public class TransportGetCheckpointAction extends HandledTransportAction { + + private final Client client; + private final ClusterService clusterService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + + @Inject + public TransportGetCheckpointAction( + final TransportService transportService, + final ActionFilters actionFilters, + final Client client, + final ClusterService clusterService, + final IndexNameExpressionResolver indexNameExpressionResolver + ) { + super(GetCheckpointAction.NAME, transportService, actionFilters, Request::new); + this.client = client; + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + + // TODO: does this action respect user headers? + String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames( + state, + request.indicesOptions(), + true, // includeDataStreams + request.indices() + ); + + new AsyncGetCheckpointsFromNodesAction(task, resolveIndicesToPrimaryShards(state, concreteIndices), listener).start(); + } + + private Map> resolveIndicesToPrimaryShards(ClusterState state, String[] concreteIndices) { + final DiscoveryNodes nodes = state.nodes(); + Map> nodesAndShards = new HashMap<>(); + + ShardsIterator shardsIt = state.routingTable().allShards(concreteIndices); + for (ShardRouting shard : shardsIt) { + // only take primary shards, which should be exactly 1, this isn't strictly necessary + // and we should consider taking any shard copy, but than we need another way to de-dup + if (shard.primary() && shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) { + String nodeId = shard.currentNodeId(); + nodesAndShards.computeIfAbsent(nodeId, k -> new HashSet<>()).add(shard.shardId()); + } + } + return nodesAndShards; + } + + protected class AsyncGetCheckpointsFromNodesAction { + private final Task task; + private final ActionListener listener; + private final Map> nodesAndShards; + + protected AsyncGetCheckpointsFromNodesAction( + Task task, + Map> nodesAndShards, + ActionListener listener + ) { + this.task = task; + this.listener = listener; + this.nodesAndShards = nodesAndShards; + } + + public void start() { + + GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap(responses -> { + // the final list should be ordered by key + Map checkpointsByIndexReduced = new TreeMap<>(); + + // merge the node responses + for (GetCheckpointNodeAction.Response response : responses) { + response.getCheckpoints().forEach((index, checkpoint) -> { + if (checkpointsByIndexReduced.containsKey(index)) { + long[] shardCheckpoints = checkpointsByIndexReduced.get(index); + for (int i = 0; i < checkpoint.length; ++i) { + shardCheckpoints[i] = Math.max(shardCheckpoints[i], checkpoint[i]); + } + } else { + checkpointsByIndexReduced.put(index, checkpoint); + } + + }); + } + + listener.onResponse(new Response(checkpointsByIndexReduced)); + }, listener::onFailure), + nodesAndShards.size() + ); + + for (Entry> oneNodeAndItsShards : nodesAndShards.entrySet()) { + GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( + oneNodeAndItsShards.getValue() + ); + nodeCheckpointsRequest.setParentTask(clusterService.localNode().getId(), task.getId()); + + // TODO: shortcut if shard is on the local node + + // execute it with transform origin + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.TRANSFORM_ORIGIN, + GetCheckpointNodeAction.INSTANCE, + nodeCheckpointsRequest, + groupedListener + ); + } + } + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java new file mode 100644 index 000000000000..5e640788ad38 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction.Request; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction.Response; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class TransportGetCheckpointNodeAction extends HandledTransportAction { + + private final IndicesService indicesService; + + @Inject + public TransportGetCheckpointNodeAction( + final TransportService transportService, + final ActionFilters actionFilters, + IndicesService indicesService + ) { + super(GetCheckpointNodeAction.NAME, transportService, actionFilters, Request::new); + this.indicesService = indicesService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + getGlobalCheckpoints(request.getShards(), listener); + } + + private void getGlobalCheckpoints(Set shards, ActionListener listener) { + Map checkpointsByIndexOfThisNode = new HashMap<>(); + for (ShardId shardId : shards) { + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + + checkpointsByIndexOfThisNode.computeIfAbsent( + shardId.getIndexName(), + k -> new long[indexService.getIndexSettings().getNumberOfShards()] + ); + checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats().getGlobalCheckpoint(); + } + listener.onResponse(new Response(checkpointsByIndexOfThisNode)); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 801fb1f7064d..b2870199f833 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; @@ -98,9 +99,21 @@ public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final } protected void getIndexCheckpoints(ActionListener> listener) { + logger.info("getting index checkpoints"); + try { ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex()); - ActionListener> groupedListener = listener; + ActionListener> groupedListener = ActionListener.wrap(r -> { + getCheckpointsFromOneClusterV2( + client, + transformConfig.getHeaders(), + resolvedIndexes.getLocalIndices().toArray(new String[0]), + RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, + r, + listener + + ); + }, listener::onFailure); if (resolvedIndexes.numClusters() > 1) { ActionListener>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> { @@ -139,6 +152,47 @@ protected void getIndexCheckpoints(ActionListener> listener) } } + private static void getCheckpointsFromOneClusterV2( + Client client, + Map headers, + String[] indices, + String prefix, + Map toCompare, + ActionListener> listener + ) { + GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(indices, IndicesOptions.LENIENT_EXPAND_OPEN); + logger.info("get checkpoints"); + + ClientHelper.executeWithHeadersAsync( + headers, + ClientHelper.TRANSFORM_ORIGIN, + client, + GetCheckpointAction.INSTANCE, + getCheckpointRequest, + ActionListener.wrap(checkpointResponse -> { + + logger.info("got response for checkpoint indexes {}", checkpointResponse.getCheckpoints().size()); + + if (toCompare.size() != checkpointResponse.getCheckpoints().size()) { + listener.onFailure(new RuntimeException("checkpoints differ!")); + return; + } + + if (checkpointResponse.getCheckpoints() + .entrySet() + .stream() + .allMatch(e -> Arrays.equals(e.getValue(), toCompare.get(e.getKey()))) == false) { + listener.onFailure(new RuntimeException("checkpoints differ!")); + return; + } + + logger.info("both checkpoints are equal"); + listener.onResponse(toCompare); + }, listener::onFailure) + ); + + } + private static void getCheckpointsFromOneCluster( Client client, Map headers, From d57bd40728aa1cd045a27e198b9a2d1d9bfd4446 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 30 Nov 2021 09:46:51 +0100 Subject: [PATCH 02/28] implement switch for default and fallback --- .../transform/action/GetCheckpointAction.java | 25 ++++++++ .../checkpoint/DefaultCheckpointProvider.java | 64 +++++++++---------- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index 79fa53ac6a49..518d5b9c6399 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -62,6 +62,31 @@ public String[] indices() { public IndicesOptions indicesOptions() { return indicesOptions; } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Request that = (Request) obj; + + return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(indices), indicesOptions); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArrayNullable(indices); + indicesOptions.writeIndicesOptions(out); + } } public static class Response extends ActionResponse { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index b2870199f833..05cfd44eaaeb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; @@ -60,6 +61,9 @@ class DefaultCheckpointProvider implements CheckpointProvider { protected final TransformAuditor transformAuditor; protected final TransformConfig transformConfig; + // set of clusters that do not support 8.1+ checkpoint actions + private final Set fallbackToBWC = new HashSet<>(); + DefaultCheckpointProvider( final Clock clock, final Client client, @@ -103,17 +107,7 @@ protected void getIndexCheckpoints(ActionListener> listener) try { ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex()); - ActionListener> groupedListener = ActionListener.wrap(r -> { - getCheckpointsFromOneClusterV2( - client, - transformConfig.getHeaders(), - resolvedIndexes.getLocalIndices().toArray(new String[0]), - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, - r, - listener - - ); - }, listener::onFailure); + ActionListener> groupedListener = listener; if (resolvedIndexes.numClusters() > 1) { ActionListener>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> { @@ -152,12 +146,31 @@ protected void getIndexCheckpoints(ActionListener> listener) } } + private void getCheckpointsFromOneCluster( + Client client, + Map headers, + String[] indices, + String prefix, + ActionListener> listener + ) { + if (fallbackToBWC.contains(prefix)) { + getCheckpointsFromOneClusterBWC(client, headers, indices, prefix, listener); + } + getCheckpointsFromOneClusterV2(client, headers, indices, prefix, ActionListener.wrap(listener::onResponse, e -> { + if (e instanceof ActionNotFoundTransportException) { + fallbackToBWC.add(prefix); + getCheckpointsFromOneClusterBWC(client, headers, indices, prefix, listener); + } else { + listener.onFailure(e); + } + })); + } + private static void getCheckpointsFromOneClusterV2( Client client, Map headers, String[] indices, String prefix, - Map toCompare, ActionListener> listener ) { GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(indices, IndicesOptions.LENIENT_EXPAND_OPEN); @@ -169,31 +182,14 @@ private static void getCheckpointsFromOneClusterV2( client, GetCheckpointAction.INSTANCE, getCheckpointRequest, - ActionListener.wrap(checkpointResponse -> { - - logger.info("got response for checkpoint indexes {}", checkpointResponse.getCheckpoints().size()); - - if (toCompare.size() != checkpointResponse.getCheckpoints().size()) { - listener.onFailure(new RuntimeException("checkpoints differ!")); - return; - } - - if (checkpointResponse.getCheckpoints() - .entrySet() - .stream() - .allMatch(e -> Arrays.equals(e.getValue(), toCompare.get(e.getKey()))) == false) { - listener.onFailure(new RuntimeException("checkpoints differ!")); - return; - } - - logger.info("both checkpoints are equal"); - listener.onResponse(toCompare); - }, listener::onFailure) + ActionListener.wrap(checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()), listener::onFailure) ); - } - private static void getCheckpointsFromOneCluster( + /** + * BWC version for <8.1 + */ + private static void getCheckpointsFromOneClusterBWC( Client client, Map headers, String[] indices, From a571ee838c0881be8e25cff40c998834a8e972ce Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 30 Nov 2021 09:59:57 +0100 Subject: [PATCH 03/28] fix doc string --- .../xpack/transform/checkpoint/DefaultCheckpointProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 05cfd44eaaeb..af0592856417 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -187,7 +187,7 @@ private static void getCheckpointsFromOneClusterV2( } /** - * BWC version for <8.1 + * BWC fallback for nodes/cluster older than 8.1 */ private static void getCheckpointsFromOneClusterBWC( Client client, From e49244a9f8931757477ee4da2da24a0ce9cabb5c Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 30 Nov 2021 17:08:36 +0100 Subject: [PATCH 04/28] use transport to target node directly instead of using a client --- .../action/TransportGetCheckpointAction.java | 61 ++++++++++++++----- .../TransportGetCheckpointNodeAction.java | 6 +- .../checkpoint/DefaultCheckpointProvider.java | 38 +++++++----- 3 files changed, 73 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index 815498136ed5..7c17d4db39ef 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -6,27 +6,33 @@ */ package org.elasticsearch.xpack.transform.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Response; import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -36,20 +42,23 @@ public class TransportGetCheckpointAction extends HandledTransportAction { - private final Client client; + private static final Logger logger = LogManager.getLogger(TransportGetCheckpointAction.class); private final ClusterService clusterService; + private final IndicesService indicesService; + private final TransportService transportService; private final IndexNameExpressionResolver indexNameExpressionResolver; @Inject public TransportGetCheckpointAction( final TransportService transportService, final ActionFilters actionFilters, - final Client client, + final IndicesService indicesService, final ClusterService clusterService, final IndexNameExpressionResolver indexNameExpressionResolver ) { super(GetCheckpointAction.NAME, transportService, actionFilters, Request::new); - this.client = client; + this.transportService = transportService; + this.indicesService = indicesService; this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; } @@ -58,7 +67,6 @@ public TransportGetCheckpointAction( protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); - // TODO: does this action respect user headers? String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames( state, request.indicesOptions(), @@ -89,6 +97,8 @@ protected class AsyncGetCheckpointsFromNodesAction { private final Task task; private final ActionListener listener; private final Map> nodesAndShards; + private final DiscoveryNodes nodes; + private final String localNodeId; protected AsyncGetCheckpointsFromNodesAction( Task task, @@ -98,6 +108,9 @@ protected AsyncGetCheckpointsFromNodesAction( this.task = task; this.listener = listener; this.nodesAndShards = nodesAndShards; + ClusterState clusterState = clusterService.state(); + nodes = clusterState.nodes(); + localNodeId = clusterService.localNode().getId(); } public void start() { @@ -118,7 +131,6 @@ public void start() { } else { checkpointsByIndexReduced.put(index, checkpoint); } - }); } @@ -128,20 +140,39 @@ public void start() { ); for (Entry> oneNodeAndItsShards : nodesAndShards.entrySet()) { + if (localNodeId.equals(oneNodeAndItsShards.getKey())) { + TransportGetCheckpointNodeAction.getGlobalCheckpoints(indicesService, oneNodeAndItsShards.getValue(), groupedListener); + continue; + } + GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( oneNodeAndItsShards.getValue() ); nodeCheckpointsRequest.setParentTask(clusterService.localNode().getId(), task.getId()); + DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); + logger.trace("get checkpoints from node {}", node); + transportService.sendRequest( + node, + GetCheckpointNodeAction.NAME, + nodeCheckpointsRequest, + new TransportResponseHandler() { - // TODO: shortcut if shard is on the local node + @Override + public GetCheckpointNodeAction.Response read(StreamInput in) throws IOException { + return new GetCheckpointNodeAction.Response(in); + } - // execute it with transform origin - ClientHelper.executeAsyncWithOrigin( - client, - ClientHelper.TRANSFORM_ORIGIN, - GetCheckpointNodeAction.INSTANCE, - nodeCheckpointsRequest, - groupedListener + @Override + public void handleResponse(GetCheckpointNodeAction.Response response) { + groupedListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + groupedListener.onFailure(exp); + } + + } ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java index 5e640788ad38..9b5965b823ee 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java @@ -32,7 +32,7 @@ public class TransportGetCheckpointNodeAction extends HandledTransportAction listener) { - getGlobalCheckpoints(request.getShards(), listener); + getGlobalCheckpoints(indicesService, request.getShards(), listener); } - private void getGlobalCheckpoints(Set shards, ActionListener listener) { + static void getGlobalCheckpoints(IndicesService indicesService, Set shards, ActionListener listener) { Map checkpointsByIndexOfThisNode = new HashMap<>(); for (ShardId shardId : shards) { final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index af0592856417..e80ff5a81dd2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -137,7 +138,7 @@ protected void getIndexCheckpoints(ActionListener> listener) remoteClient, transformConfig.getHeaders(), remoteIndex.getValue().toArray(new String[0]), - remoteIndex.getKey() + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR, + remoteIndex.getKey(), groupedListener ); } @@ -150,16 +151,24 @@ private void getCheckpointsFromOneCluster( Client client, Map headers, String[] indices, - String prefix, + String cluster, ActionListener> listener ) { - if (fallbackToBWC.contains(prefix)) { - getCheckpointsFromOneClusterBWC(client, headers, indices, prefix, listener); + if (fallbackToBWC.contains(cluster)) { + getCheckpointsFromOneClusterBWC(client, headers, indices, cluster, listener); } - getCheckpointsFromOneClusterV2(client, headers, indices, prefix, ActionListener.wrap(listener::onResponse, e -> { - if (e instanceof ActionNotFoundTransportException) { - fallbackToBWC.add(prefix); - getCheckpointsFromOneClusterBWC(client, headers, indices, prefix, listener); + getCheckpointsFromOneClusterV2(client, headers, indices, cluster, ActionListener.wrap(listener::onResponse, e -> { + Throwable unwrappedException = ExceptionsHelper.unwrapCause(e); + if (unwrappedException instanceof ActionNotFoundTransportException) { + // this is an implementation detail, so not necessary to audit or warn, but only report as debug + logger.debug( + "[{}] Cluster [{}] does not support transform checkpoint API, falling back to ordinary checkpointing (more resource intensive)", + transformConfig.getId(), + cluster + ); + + fallbackToBWC.add(cluster); + getCheckpointsFromOneClusterBWC(client, headers, indices, cluster, listener); } else { listener.onFailure(e); } @@ -170,11 +179,10 @@ private static void getCheckpointsFromOneClusterV2( Client client, Map headers, String[] indices, - String prefix, + String cluster, ActionListener> listener ) { GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(indices, IndicesOptions.LENIENT_EXPAND_OPEN); - logger.info("get checkpoints"); ClientHelper.executeWithHeadersAsync( headers, @@ -193,7 +201,7 @@ private static void getCheckpointsFromOneClusterBWC( Client client, Map headers, String[] indices, - String prefix, + String cluster, ActionListener> listener ) { // 1st get index to see the indexes the user has access to @@ -239,14 +247,14 @@ private static void getCheckpointsFromOneClusterBWC( ); return; } - listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix)); + listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, cluster)); }, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))) ); }, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))) ); } - static Map extractIndexCheckPoints(ShardStats[] shards, Set userIndices, String prefix) { + static Map extractIndexCheckPoints(ShardStats[] shards, Set userIndices, String cluster) { Map> checkpointsByIndex = new TreeMap<>(); for (ShardStats shard : shards) { @@ -255,7 +263,9 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(fullIndexName); From 5cd7a2e49193c5b58b45d1f106d22ff05ea12986 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 30 Nov 2021 17:25:00 +0100 Subject: [PATCH 05/28] fix tests --- .../checkpoint/DefaultCheckpointProvider.java | 2 +- .../TimeBasedCheckpointProviderTests.java | 25 ++++--------------- 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index e80ff5a81dd2..4682a6e9b26a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -162,7 +162,7 @@ private void getCheckpointsFromOneCluster( if (unwrappedException instanceof ActionNotFoundTransportException) { // this is an implementation detail, so not necessary to audit or warn, but only report as debug logger.debug( - "[{}] Cluster [{}] does not support transform checkpoint API, falling back to ordinary checkpointing (more resource intensive)", + "[{}] Cluster [{}] does not support transform checkpoint API, falling back to legacy checkpointing", transformConfig.getId(), cluster ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index 4e30dd38141f..902a69b88507 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -12,18 +12,12 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; -import org.elasticsearch.action.admin.indices.get.GetIndexAction; -import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -36,6 +30,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -53,6 +48,7 @@ import org.mockito.stubbing.Answer; import java.time.Clock; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -62,8 +58,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -255,19 +251,8 @@ private void testCreateNextCheckpoint( TransformCheckpoint lastCheckpoint, TransformCheckpoint expectedNextCheckpoint ) throws InterruptedException { - GetIndexResponse getIndexResponse = new GetIndexResponse( - new String[] { "some-index" }, - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of(), - ImmutableOpenMap.of() - ); - doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); - IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); - when(indicesStatsResponse.getShards()).thenReturn(new ShardStats[0]); - when(indicesStatsResponse.getFailedShards()).thenReturn(0); - doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); + GetCheckpointAction.Response checkpointResponse = new GetCheckpointAction.Response(Collections.emptyMap()); + doAnswer(withResponse(checkpointResponse)).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); TransformConfig transformConfig = newTransformConfigWithDateHistogram( transformId, From ca65ec7648bd48d288adadec112d6e365614fbdc Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 1 Dec 2021 08:59:19 +0100 Subject: [PATCH 06/28] rename the integration test for legacy checkpoints --- ...formLegacyCheckpointServiceNodeTests.java} | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) rename x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/{TransformCheckpointServiceNodeTests.java => TransformLegacyCheckpointServiceNodeTests.java} (95%) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java similarity index 95% rename from x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java rename to x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java index 7c4746989c88..95b3fbd9cfe7 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java @@ -44,6 +44,8 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; @@ -73,7 +75,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTestCase { +/** + * Test suite for legacy checkpointing using 2 calls: getindex, getindexstats + */ +public class TransformLegacyCheckpointServiceNodeTests extends TransformSingleNodeTestCase { // re-use the mock client for the whole test suite as the underlying thread pool and the // corresponding context if recreated cause unreliable test execution @@ -111,6 +116,14 @@ protected void ActionListener listener ) { + // fallback to legacy checkpointing + if (request instanceof GetCheckpointAction.Request) { + listener.onFailure( + new ActionNotFoundTransportException(GetCheckpointAction.NAME) + ); + return; + } + if (request instanceof GetIndexRequest) { // for this test we only need the indices assert (indices != null); @@ -118,7 +131,9 @@ protected void listener.onResponse((Response) indexResponse); return; - } else if (request instanceof IndicesStatsRequest) { + } + + if (request instanceof IndicesStatsRequest) { // IndicesStatsResponse is package private, therefore using a mock final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); From 037d466d87816db38cd8bd65b03518024d42f858 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 1 Dec 2021 09:04:30 +0100 Subject: [PATCH 07/28] spotlessApply --- .../checkpoint/TransformLegacyCheckpointServiceNodeTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java index 95b3fbd9cfe7..76acfd8c3dc0 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java @@ -118,9 +118,7 @@ protected void // fallback to legacy checkpointing if (request instanceof GetCheckpointAction.Request) { - listener.onFailure( - new ActionNotFoundTransportException(GetCheckpointAction.NAME) - ); + listener.onFailure(new ActionNotFoundTransportException(GetCheckpointAction.NAME)); return; } From da52e2c9938dc7b321ea10ed433951ed03fe4642 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 1 Dec 2021 16:31:37 +0100 Subject: [PATCH 08/28] swith the checkpoint action to admin, as it has to be called with user credentials, the node/shard action remains internal --- .../xpack/core/transform/action/GetCheckpointAction.java | 4 +++- .../xpack/core/transform/action/GetCheckpointNodeAction.java | 2 ++ .../org/elasticsearch/xpack/security/operator/Constants.java | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index 518d5b9c6399..b0d06505a1f7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -26,7 +26,9 @@ public class GetCheckpointAction extends ActionType { public static final GetCheckpointAction INSTANCE = new GetCheckpointAction(); - public static final String NAME = "cluster:internal/transform/checkpoint"; + + // note: this is an admin action, it must be called with user headers + public static final String NAME = "cluster:admin/transform/checkpoint"; private GetCheckpointAction() { super(NAME, GetCheckpointAction.Response::new); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java index 302ee9d59135..d9de14681de9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java @@ -26,6 +26,8 @@ public class GetCheckpointNodeAction extends ActionType { public static final GetCheckpointNodeAction INSTANCE = new GetCheckpointNodeAction(); + + // note: this is an internal action public static final String NAME = "cluster:internal/transform/checkpoint[s]"; private GetCheckpointNodeAction() { diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index d03156c89527..cf14187ae45d 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -95,8 +95,8 @@ public class Constants { "cluster:admin/transform/update", "cluster:admin/transform/upgrade", "cluster:admin/transform/validate", + "cluster:admin/transform/checkpoint", "cluster:internal/transform/checkpoint[s]", - "cluster:internal/transform/checkpoint", // "cluster:admin/voting_config/add_exclusions", // "cluster:admin/voting_config/clear_exclusions", "cluster:admin/xpack/ccr/auto_follow_pattern/activate", From 2805381609f88ec6efb5be537c58b0b103dee3bc Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 1 Dec 2021 17:47:43 +0100 Subject: [PATCH 09/28] change the shard action, too --- .../xpack/core/transform/action/GetCheckpointNodeAction.java | 2 +- .../org/elasticsearch/xpack/security/operator/Constants.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java index d9de14681de9..24f0a2367380 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java @@ -28,7 +28,7 @@ public class GetCheckpointNodeAction extends ActionType Date: Wed, 1 Dec 2021 18:42:27 +0100 Subject: [PATCH 10/28] fix branch logic --- .../checkpoint/DefaultCheckpointProvider.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 4682a6e9b26a..6baa91b63760 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -156,23 +156,24 @@ private void getCheckpointsFromOneCluster( ) { if (fallbackToBWC.contains(cluster)) { getCheckpointsFromOneClusterBWC(client, headers, indices, cluster, listener); - } - getCheckpointsFromOneClusterV2(client, headers, indices, cluster, ActionListener.wrap(listener::onResponse, e -> { - Throwable unwrappedException = ExceptionsHelper.unwrapCause(e); - if (unwrappedException instanceof ActionNotFoundTransportException) { - // this is an implementation detail, so not necessary to audit or warn, but only report as debug - logger.debug( - "[{}] Cluster [{}] does not support transform checkpoint API, falling back to legacy checkpointing", - transformConfig.getId(), - cluster - ); + } else { + getCheckpointsFromOneClusterV2(client, headers, indices, cluster, ActionListener.wrap(listener::onResponse, e -> { + Throwable unwrappedException = ExceptionsHelper.unwrapCause(e); + if (unwrappedException instanceof ActionNotFoundTransportException) { + // this is an implementation detail, so not necessary to audit or warn, but only report as debug + logger.debug( + "[{}] Cluster [{}] does not support transform checkpoint API, falling back to legacy checkpointing", + transformConfig.getId(), + cluster + ); - fallbackToBWC.add(cluster); - getCheckpointsFromOneClusterBWC(client, headers, indices, cluster, listener); - } else { - listener.onFailure(e); - } - })); + fallbackToBWC.add(cluster); + getCheckpointsFromOneClusterBWC(client, headers, indices, cluster, listener); + } else { + listener.onFailure(e); + } + })); + } } private static void getCheckpointsFromOneClusterV2( From a41b5eb4e19c6806837bd388155ce85b4cd8a1d5 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 2 Dec 2021 14:10:12 +0100 Subject: [PATCH 11/28] initialize sequence number array with unassigned --- .../action/TransportGetCheckpointNodeAction.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java index 9b5965b823ee..b8c6e1a92dcc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -20,6 +21,7 @@ import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction.Request; import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction.Response; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -49,10 +51,11 @@ static void getGlobalCheckpoints(IndicesService indicesService, Set sha final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.id()); - checkpointsByIndexOfThisNode.computeIfAbsent( - shardId.getIndexName(), - k -> new long[indexService.getIndexSettings().getNumberOfShards()] - ); + checkpointsByIndexOfThisNode.computeIfAbsent(shardId.getIndexName(), k -> { + long[] seqNumbers = new long[indexService.getIndexSettings().getNumberOfShards()]; + Arrays.fill(seqNumbers, SequenceNumbers.UNASSIGNED_SEQ_NO); + return seqNumbers; + }); checkpointsByIndexOfThisNode.get(shardId.getIndexName())[shardId.getId()] = indexShard.seqNoStats().getGlobalCheckpoint(); } listener.onResponse(new Response(checkpointsByIndexOfThisNode)); From 58b586076a45b82e22da17cfd2f0f64711321d2a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 2 Dec 2021 17:14:22 +0100 Subject: [PATCH 12/28] remove debug log --- .../xpack/transform/checkpoint/DefaultCheckpointProvider.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 6baa91b63760..0eb03d55b585 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -104,8 +104,6 @@ public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final } protected void getIndexCheckpoints(ActionListener> listener) { - logger.info("getting index checkpoints"); - try { ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex()); ActionListener> groupedListener = listener; From cd3991b31df011174fd859ec68778834af617da4 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 14 Dec 2021 09:36:10 +0100 Subject: [PATCH 13/28] add tests --- ... TransformCheckpointServiceNodeTests.java} | 42 ++++---- .../TransformGetCheckpointTests.java | 97 +++++++++++++++++++ 2 files changed, 118 insertions(+), 21 deletions(-) rename x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/{TransformLegacyCheckpointServiceNodeTests.java => TransformCheckpointServiceNodeTests.java} (92%) create mode 100644 x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java similarity index 92% rename from x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java rename to x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 76acfd8c3dc0..4ae9b8310cc1 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformLegacyCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -66,11 +66,9 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -78,7 +76,7 @@ /** * Test suite for legacy checkpointing using 2 calls: getindex, getindexstats */ -public class TransformLegacyCheckpointServiceNodeTests extends TransformSingleNodeTestCase { +public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTestCase { // re-use the mock client for the whole test suite as the underlying thread pool and the // corresponding context if recreated cause unreliable test execution @@ -90,22 +88,18 @@ public class TransformLegacyCheckpointServiceNodeTests extends TransformSingleNo private class MockClientForCheckpointing extends NoOpClient { - private volatile ShardStats[] shardStats; + private final boolean oldCheckpointAPI; + private volatile Map checkpoints; private volatile String[] indices; - MockClientForCheckpointing(String testName) { + MockClientForCheckpointing(String testName, boolean oldCheckpointAPI) { super(testName); + this.oldCheckpointAPI = oldCheckpointAPI; } - void setShardStats(ShardStats[] shardStats) { - this.shardStats = shardStats; - - Set indicesSet = new HashSet<>(); - for (ShardStats s : shardStats) { - indicesSet.add(s.getShardRouting().getIndexName()); - } - - this.indices = indicesSet.toArray(new String[0]); + void setCheckpoints(Map checkpoints) { + this.checkpoints = checkpoints; + this.indices = checkpoints.keySet().toArray(new String[0]); } @SuppressWarnings("unchecked") @@ -116,9 +110,15 @@ protected void ActionListener listener ) { - // fallback to legacy checkpointing if (request instanceof GetCheckpointAction.Request) { - listener.onFailure(new ActionNotFoundTransportException(GetCheckpointAction.NAME)); + // fallback to legacy checkpointing if requested + if (oldCheckpointAPI) { + listener.onFailure(new ActionNotFoundTransportException(GetCheckpointAction.NAME)); + return; + } + + final GetCheckpointAction.Response getCheckpointResponse = new GetCheckpointAction.Response(checkpoints); + listener.onResponse((Response) getCheckpointResponse); return; } @@ -135,7 +135,7 @@ protected void // IndicesStatsResponse is package private, therefore using a mock final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); - when(indicesStatsResponse.getShards()).thenReturn(shardStats); + when(indicesStatsResponse.getShards()).thenReturn(createShardStats(checkpoints)); when(indicesStatsResponse.getFailedShards()).thenReturn(0); listener.onResponse((Response) indicesStatsResponse); @@ -150,7 +150,7 @@ protected void public void createComponents() { // it's not possible to run it as @BeforeClass as clients aren't initialized if (mockClientForCheckpointing == null) { - mockClientForCheckpointing = new MockClientForCheckpointing("TransformCheckpointServiceNodeTests"); + mockClientForCheckpointing = new MockClientForCheckpointing("TransformCheckpointServiceNodeTests", randomBoolean()); } ClusterService clusterService = mock(ClusterService.class); transformsConfigManager = new IndexBasedTransformConfigManager( @@ -283,7 +283,7 @@ public void testGetCheckpointStats() throws InterruptedException { assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint2, listener), true, null, null); - mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 20, 20, 20))); + mockClientForCheckpointing.setCheckpoints(createCheckPointMap(transformId, 20, 20, 20)); TransformCheckpointingInfo checkpointInfo = new TransformCheckpointingInfo( new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), @@ -299,7 +299,7 @@ public void testGetCheckpointStats() throws InterruptedException { null ); - mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 50, 33))); + mockClientForCheckpointing.setCheckpoints(createCheckPointMap(transformId, 10, 50, 33)); checkpointInfo = new TransformCheckpointingInfo( new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), @@ -315,7 +315,7 @@ public void testGetCheckpointStats() throws InterruptedException { ); // same as current - mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 10, 10))); + mockClientForCheckpointing.setCheckpoints(createCheckPointMap(transformId, 10, 10, 10)); checkpointInfo = new TransformCheckpointingInfo( new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java new file mode 100644 index 000000000000..2d1a9d984365 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.apache.commons.lang3.ArrayUtils; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.stream.Collectors; + +/** + * Test suite for checkpointing using transform getcheckpoint API + */ +public class TransformGetCheckpointTests extends TransformSingleNodeTestCase { + + public void testGetCheckpoint() throws Exception { + final String indexNamePrefix = "test_index-"; + final int shards = randomIntBetween(1, 5); + final int indices = randomIntBetween(1, 5); + + for (int i = 0; i < indices; ++i) { + client().admin() + .indices() + .prepareCreate(indexNamePrefix + i) + .setSettings(Settings.builder().put("index.number_of_shards", shards).put("index.number_of_replicas", 1)) + .get(); + } + + final GetCheckpointAction.Request request = new GetCheckpointAction.Request( + new String[] { indexNamePrefix + "*" }, + IndicesOptions.LENIENT_EXPAND_OPEN + ); + + final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get(); + assertEquals(indices, response.getCheckpoints().size()); + + // empty indices should report -1 as sequence id + assertFalse( + response.getCheckpoints().entrySet().stream().anyMatch(entry -> Arrays.stream(entry.getValue()).anyMatch(l -> l != -1L)) + ); + + final int docsToCreatePerShard = randomIntBetween(0, 10); + for (int d = 0; d < docsToCreatePerShard; ++d) { + for (int i = 0; i < indices; ++i) { + for (int j = 0; j < shards; ++j) { + client().prepareIndex(indexNamePrefix + i).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get(); + } + } + } + + client().admin().indices().refresh(new RefreshRequest(indexNamePrefix + "*")); + + final GetCheckpointAction.Response response2 = client().execute(GetCheckpointAction.INSTANCE, request).get(); + assertEquals(indices, response2.getCheckpoints().size()); + + // check the sum, counting starts with 0, so we have to take docsToCreatePerShard - 1 + long checkpointSum = response2.getCheckpoints().values().stream().map(l -> Arrays.stream(l).sum()).mapToLong(Long::valueOf).sum(); + assertEquals( + "Expected " + + (docsToCreatePerShard - 1) * shards * indices + + " as sum of " + + response2.getCheckpoints() + .entrySet() + .stream() + .map(e -> e.getKey() + ": {" + Strings.arrayToCommaDelimitedString(ArrayUtils.toObject(e.getValue())) + "}") + .collect(Collectors.joining(",")), + (docsToCreatePerShard - 1) * shards * indices, + checkpointSum + ); + + final IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexNamePrefix + "*").get(); + + assertEquals( + "Checkpoint API and indices stats don't match", + Arrays.stream(statsResponse.getShards()) + .filter(i -> i.getShardRouting().primary()) + .sorted(Comparator.comparingInt(value -> value.getShardRouting().id())) + .mapToLong(s -> s.getSeqNoStats().getGlobalCheckpoint()) + .sum(), + checkpointSum + ); + } + +} From 4677f6f6204721d6cc59ffda7253107bd13b86fb Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 15 Dec 2021 12:32:50 +0100 Subject: [PATCH 14/28] improve naming --- .../TransformCheckpointServiceNodeTests.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 4ae9b8310cc1..1e3d89b565e0 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -73,9 +73,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -/** - * Test suite for legacy checkpointing using 2 calls: getindex, getindexstats - */ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTestCase { // re-use the mock client for the whole test suite as the underlying thread pool and the @@ -88,13 +85,19 @@ public class TransformCheckpointServiceNodeTests extends TransformSingleNodeTest private class MockClientForCheckpointing extends NoOpClient { - private final boolean oldCheckpointAPI; + private final boolean supportTransformCheckpointApi; private volatile Map checkpoints; private volatile String[] indices; - MockClientForCheckpointing(String testName, boolean oldCheckpointAPI) { + /** + * Mock client for checkpointing + * + * @param testName name of the test, used for naming the threadpool + * @param supportTransformCheckpointApi whether to mock the checkpoint API, if false throws action not found + */ + MockClientForCheckpointing(String testName, boolean supportTransformCheckpointApi) { super(testName); - this.oldCheckpointAPI = oldCheckpointAPI; + this.supportTransformCheckpointApi = supportTransformCheckpointApi; } void setCheckpoints(Map checkpoints) { @@ -111,8 +114,8 @@ protected void ) { if (request instanceof GetCheckpointAction.Request) { - // fallback to legacy checkpointing if requested - if (oldCheckpointAPI) { + // throw action not found if checkpoint API is not supported, transform should fallback to legacy checkpointing + if (supportTransformCheckpointApi == false) { listener.onFailure(new ActionNotFoundTransportException(GetCheckpointAction.NAME)); return; } From 77b08e0802632c07a09b0495d3f73effdd7c52b6 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 20 Dec 2021 17:18:49 +0100 Subject: [PATCH 15/28] add test for merging node results --- .../checkpoint/TransformGetCheckpointIT.java | 97 +++++++ .../TransformGetCheckpointTests.java | 261 +++++++++++++----- .../action/TransportGetCheckpointAction.java | 42 ++- .../TransportGetCheckpointNodeAction.java | 2 +- 4 files changed, 328 insertions(+), 74 deletions(-) create mode 100644 x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java new file mode 100644 index 000000000000..2130a15760ac --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointIT.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.apache.commons.lang3.ArrayUtils; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.stream.Collectors; + +/** + * Test suite for checkpointing using transform getcheckpoint API + */ +public class TransformGetCheckpointIT extends TransformSingleNodeTestCase { + + public void testGetCheckpoint() throws Exception { + final String indexNamePrefix = "test_index-"; + final int shards = randomIntBetween(1, 5); + final int indices = randomIntBetween(1, 5); + + for (int i = 0; i < indices; ++i) { + client().admin() + .indices() + .prepareCreate(indexNamePrefix + i) + .setSettings(Settings.builder().put("index.number_of_shards", shards).put("index.number_of_replicas", 1)) + .get(); + } + + final GetCheckpointAction.Request request = new GetCheckpointAction.Request( + new String[] { indexNamePrefix + "*" }, + IndicesOptions.LENIENT_EXPAND_OPEN + ); + + final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get(); + assertEquals(indices, response.getCheckpoints().size()); + + // empty indices should report -1 as sequence id + assertFalse( + response.getCheckpoints().entrySet().stream().anyMatch(entry -> Arrays.stream(entry.getValue()).anyMatch(l -> l != -1L)) + ); + + final int docsToCreatePerShard = randomIntBetween(0, 10); + for (int d = 0; d < docsToCreatePerShard; ++d) { + for (int i = 0; i < indices; ++i) { + for (int j = 0; j < shards; ++j) { + client().prepareIndex(indexNamePrefix + i).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get(); + } + } + } + + client().admin().indices().refresh(new RefreshRequest(indexNamePrefix + "*")); + + final GetCheckpointAction.Response response2 = client().execute(GetCheckpointAction.INSTANCE, request).get(); + assertEquals(indices, response2.getCheckpoints().size()); + + // check the sum, counting starts with 0, so we have to take docsToCreatePerShard - 1 + long checkpointSum = response2.getCheckpoints().values().stream().map(l -> Arrays.stream(l).sum()).mapToLong(Long::valueOf).sum(); + assertEquals( + "Expected " + + (docsToCreatePerShard - 1) * shards * indices + + " as sum of " + + response2.getCheckpoints() + .entrySet() + .stream() + .map(e -> e.getKey() + ": {" + Strings.arrayToCommaDelimitedString(ArrayUtils.toObject(e.getValue())) + "}") + .collect(Collectors.joining(",")), + (docsToCreatePerShard - 1) * shards * indices, + checkpointSum + ); + + final IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexNamePrefix + "*").get(); + + assertEquals( + "Checkpoint API and indices stats don't match", + Arrays.stream(statsResponse.getShards()) + .filter(i -> i.getShardRouting().primary()) + .sorted(Comparator.comparingInt(value -> value.getShardRouting().id())) + .mapToLong(s -> s.getSeqNoStats().getGlobalCheckpoint()) + .sum(), + checkpointSum + ); + } + +} diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java index 2d1a9d984365..cb4d4f9cd23d 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -7,91 +7,214 @@ package org.elasticsearch.xpack.transform.checkpoint; -import org.apache.commons.lang3.ArrayUtils; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.Strings; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.EmptySystemIndices; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; -import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Response; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; +import org.elasticsearch.xpack.transform.action.TransportGetCheckpointAction; +import org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransformGetCheckpointTests extends ESSingleNodeTestCase { + + private static final int NUMBER_OF_SHARDS = 5; + private TransportService transportService; + private ClusterService clusterService; + private IndicesService indicesService; + private ThreadPool threadPool; + private IndexNameExpressionResolver indexNameExpressionResolver; + private MockTransport mockTransport; + private final String indexName = "test_index"; + + private TestTransportGetCheckpointAction getCheckpointAction; + private TestTransportGetCheckpointNodeAction getCheckpointNodeAction; + private ClusterState clusterStateWithIndex; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("GetCheckpointActionTests"); + indexNameExpressionResolver = new MockResolver(); + clusterService = getInstanceFromNode(ClusterService.class); + indicesService = getInstanceFromNode(IndicesService.class); + mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(GetCheckpointNodeAction.NAME)) { + + getCheckpointNodeAction.execute(null, (GetCheckpointNodeAction.Request) request, ActionListener.wrap(r -> { + this.handleResponse(requestId, r); + + }, e -> { + this.handleError(requestId, new TransportException(e.getMessage(), e)); + + }) + + ); + } + } + }; + + transportService = mockTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + getCheckpointAction = new TestTransportGetCheckpointAction(); + getCheckpointNodeAction = new TestTransportGetCheckpointNodeAction(); + clusterStateWithIndex = ClusterStateCreationUtils.state(indexName, 3, NUMBER_OF_SHARDS); + } -import java.util.Arrays; -import java.util.Comparator; -import java.util.stream.Collectors; + @After + public void tearDown() throws Exception { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + super.tearDown(); + } -/** - * Test suite for checkpointing using transform getcheckpoint API - */ -public class TransformGetCheckpointTests extends TransformSingleNodeTestCase { - - public void testGetCheckpoint() throws Exception { - final String indexNamePrefix = "test_index-"; - final int shards = randomIntBetween(1, 5); - final int indices = randomIntBetween(1, 5); - - for (int i = 0; i < indices; ++i) { - client().admin() - .indices() - .prepareCreate(indexNamePrefix + i) - .setSettings(Settings.builder().put("index.number_of_shards", shards).put("index.number_of_replicas", 1)) - .get(); - } + public void testEmptyCheckpoint() { + + } - final GetCheckpointAction.Request request = new GetCheckpointAction.Request( - new String[] { indexNamePrefix + "*" }, + public void testSingleNodeRequests() { + GetCheckpointAction.Request request = new GetCheckpointAction.Request( + new String[] { indexName }, IndicesOptions.LENIENT_EXPAND_OPEN ); + Task transformTask = new Task( + 1L, + "persistent", + "action", + TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + "the_id", + TaskId.EMPTY_TASK_ID, + Collections.emptyMap() + ); - final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get(); - assertEquals(indices, response.getCheckpoints().size()); + ActionTestUtils.execute(getCheckpointAction, transformTask, request, ActionListener.wrap(response -> { + assertNotNull(response.getCheckpoints()); + Map checkpoints = response.getCheckpoints(); + assertEquals(1, checkpoints.size()); + assertTrue(checkpoints.containsKey(indexName)); + for (int i = 0; i < NUMBER_OF_SHARDS; ++i) { + assertEquals(42 + i, checkpoints.get(indexName)[i]); + } + }, e -> { fail("got unexpected exception: " + e.getMessage()); })); - // empty indices should report -1 as sequence id - assertFalse( - response.getCheckpoints().entrySet().stream().anyMatch(entry -> Arrays.stream(entry.getValue()).anyMatch(l -> l != -1L)) - ); + } - final int docsToCreatePerShard = randomIntBetween(0, 10); - for (int d = 0; d < docsToCreatePerShard; ++d) { - for (int i = 0; i < indices; ++i) { - for (int j = 0; j < shards; ++j) { - client().prepareIndex(indexNamePrefix + i).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get(); - } + class TestTransportGetCheckpointAction extends TransportGetCheckpointAction { + + public TestTransportGetCheckpointAction() { + super(transportService, new ActionFilters(emptySet()), indicesService, clusterService, indexNameExpressionResolver); + } + + protected void doExecute(Task task, Request request, ActionListener listener) { + resolveIndicesAndGetCheckpoint(task, request, listener, clusterStateWithIndex); + } + + } + + class TestTransportGetCheckpointNodeAction extends TransportGetCheckpointNodeAction { + + public TestTransportGetCheckpointNodeAction() { + super(transportService, new ActionFilters(emptySet()), indicesService); + } + + protected void doExecute( + Task task, + GetCheckpointNodeAction.Request request, + ActionListener listener + ) { + IndicesService mockIndicesService = mock(IndicesService.class); + IndexService mockIndexService = mock(IndexService.class); + when(mockIndicesService.indexServiceSafe(any())).thenReturn(mockIndexService); + IndexSettings mockIndexSettings = new IndexSettings( + clusterStateWithIndex.metadata().index(indexName), + clusterService.getSettings() + ); + when(mockIndexService.getIndexSettings()).thenReturn(mockIndexSettings); + for (int i = 0; i < NUMBER_OF_SHARDS; ++i) { + IndexShard mockIndexShard = mock(IndexShard.class); + when(mockIndexService.getShard(i)).thenReturn(mockIndexShard); + SeqNoStats seqNoStats = new SeqNoStats(42 + i, 42 + i, 42 + i); + when(mockIndexShard.seqNoStats()).thenReturn(seqNoStats); } + + getGlobalCheckpoints(mockIndicesService, request.getShards(), listener); } + } - client().admin().indices().refresh(new RefreshRequest(indexNamePrefix + "*")); - - final GetCheckpointAction.Response response2 = client().execute(GetCheckpointAction.INSTANCE, request).get(); - assertEquals(indices, response2.getCheckpoints().size()); - - // check the sum, counting starts with 0, so we have to take docsToCreatePerShard - 1 - long checkpointSum = response2.getCheckpoints().values().stream().map(l -> Arrays.stream(l).sum()).mapToLong(Long::valueOf).sum(); - assertEquals( - "Expected " - + (docsToCreatePerShard - 1) * shards * indices - + " as sum of " - + response2.getCheckpoints() - .entrySet() - .stream() - .map(e -> e.getKey() + ": {" + Strings.arrayToCommaDelimitedString(ArrayUtils.toObject(e.getValue())) + "}") - .collect(Collectors.joining(",")), - (docsToCreatePerShard - 1) * shards * indices, - checkpointSum - ); + static class MockResolver extends IndexNameExpressionResolver { + MockResolver() { + super(new ThreadContext(Settings.EMPTY), EmptySystemIndices.INSTANCE); + } - final IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexNamePrefix + "*").get(); + @Override + public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { + return request.indices(); + } - assertEquals( - "Checkpoint API and indices stats don't match", - Arrays.stream(statsResponse.getShards()) - .filter(i -> i.getShardRouting().primary()) - .sorted(Comparator.comparingInt(value -> value.getShardRouting().id())) - .mapToLong(s -> s.getSeqNoStats().getGlobalCheckpoint()) - .sum(), - checkpointSum - ); + public String[] concreteIndexNames( + ClusterState state, + IndicesOptions options, + boolean includeDataStreams, + String... indexExpressions + ) { + return indexExpressions; + } + + @Override + public Index[] concreteIndices(ClusterState state, IndicesRequest request) { + Index[] out = new Index[request.indices().length]; + for (int x = 0; x < out.length; x++) { + out[x] = new Index(request.indices()[x], "_na_"); + } + return out; + } } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index 7c17d4db39ef..bd607bfce037 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.HandledTransportAction; @@ -33,6 +35,7 @@ import org.elasticsearch.xpack.core.transform.action.GetCheckpointNodeAction; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -66,7 +69,10 @@ public TransportGetCheckpointAction( @Override protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); + resolveIndicesAndGetCheckpoint(task, request, listener, state); + } + protected void resolveIndicesAndGetCheckpoint(Task task, Request request, ActionListener listener, final ClusterState state) { String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames( state, request.indicesOptions(), @@ -74,10 +80,21 @@ protected void doExecute(Task task, Request request, ActionListener li request.indices() ); - new AsyncGetCheckpointsFromNodesAction(task, resolveIndicesToPrimaryShards(state, concreteIndices), listener).start(); + Map> nodesAndShards = resolveIndicesToPrimaryShards(state, concreteIndices); + + if (nodesAndShards.size() == 0) { + listener.onResponse(new Response(Collections.emptyMap())); + return; + } + + new AsyncGetCheckpointsFromNodesAction(state, task, nodesAndShards, listener).start(); } private Map> resolveIndicesToPrimaryShards(ClusterState state, String[] concreteIndices) { + if (concreteIndices.length == 0) { + return Collections.emptyMap(); + } + final DiscoveryNodes nodes = state.nodes(); Map> nodesAndShards = new HashMap<>(); @@ -85,9 +102,14 @@ private Map> resolveIndicesToPrimaryShards(ClusterState sta for (ShardRouting shard : shardsIt) { // only take primary shards, which should be exactly 1, this isn't strictly necessary // and we should consider taking any shard copy, but than we need another way to de-dup - if (shard.primary() && shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) { + if (shard.primary() == false) { + continue; + } + if (shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) { String nodeId = shard.currentNodeId(); nodesAndShards.computeIfAbsent(nodeId, k -> new HashSet<>()).add(shard.shardId()); + } else { + throw new NoShardAvailableActionException(shard.shardId(), " no primary shards available for shard [" + shard + "]"); } } return nodesAndShards; @@ -101,6 +123,7 @@ protected class AsyncGetCheckpointsFromNodesAction { private final String localNodeId; protected AsyncGetCheckpointsFromNodesAction( + ClusterState clusterState, Task task, Map> nodesAndShards, ActionListener listener @@ -108,13 +131,11 @@ protected AsyncGetCheckpointsFromNodesAction( this.task = task; this.listener = listener; this.nodesAndShards = nodesAndShards; - ClusterState clusterState = clusterService.state(); nodes = clusterState.nodes(); localNodeId = clusterService.localNode().getId(); } public void start() { - GroupedActionListener groupedListener = new GroupedActionListener<>( ActionListener.wrap(responses -> { // the final list should be ordered by key @@ -150,6 +171,19 @@ public void start() { ); nodeCheckpointsRequest.setParentTask(clusterService.localNode().getId(), task.getId()); DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); + + // paranoia: this should not be possible using the same cluster state + if (node == null) { + listener.onFailure( + new UnavailableShardsException( + oneNodeAndItsShards.getValue().iterator().next(), + "Node not found for [{}] shards", + oneNodeAndItsShards.getValue().size() + ) + ); + return; + } + logger.trace("get checkpoints from node {}", node); transportService.sendRequest( node, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java index b8c6e1a92dcc..b257ed80acf1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointNodeAction.java @@ -45,7 +45,7 @@ protected void doExecute(Task task, Request request, ActionListener li getGlobalCheckpoints(indicesService, request.getShards(), listener); } - static void getGlobalCheckpoints(IndicesService indicesService, Set shards, ActionListener listener) { + protected static void getGlobalCheckpoints(IndicesService indicesService, Set shards, ActionListener listener) { Map checkpointsByIndexOfThisNode = new HashMap<>(); for (ShardId shardId : shards) { final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); From 58e2afa3c80752598c1e2233ef2b438c4c130453 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 21 Dec 2021 09:26:47 +0100 Subject: [PATCH 16/28] checkstyle --- .../checkpoint/TransformGetCheckpointTests.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java index cb4d4f9cd23d..f264d33af52b 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -69,6 +69,7 @@ public class TransformGetCheckpointTests extends ESSingleNodeTestCase { private TestTransportGetCheckpointNodeAction getCheckpointNodeAction; private ClusterState clusterStateWithIndex; + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -109,6 +110,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req clusterStateWithIndex = ClusterStateCreationUtils.state(indexName, 3, NUMBER_OF_SHARDS); } + @Override @After public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); @@ -148,10 +150,11 @@ public void testSingleNodeRequests() { class TestTransportGetCheckpointAction extends TransportGetCheckpointAction { - public TestTransportGetCheckpointAction() { + TestTransportGetCheckpointAction() { super(transportService, new ActionFilters(emptySet()), indicesService, clusterService, indexNameExpressionResolver); } + @Override protected void doExecute(Task task, Request request, ActionListener listener) { resolveIndicesAndGetCheckpoint(task, request, listener, clusterStateWithIndex); } @@ -160,10 +163,11 @@ protected void doExecute(Task task, Request request, ActionListener li class TestTransportGetCheckpointNodeAction extends TransportGetCheckpointNodeAction { - public TestTransportGetCheckpointNodeAction() { + TestTransportGetCheckpointNodeAction() { super(transportService, new ActionFilters(emptySet()), indicesService); } + @Override protected void doExecute( Task task, GetCheckpointNodeAction.Request request, @@ -198,6 +202,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { return request.indices(); } + @Override public String[] concreteIndexNames( ClusterState state, IndicesOptions options, From bc224ffa4f11644e31e61aa347fe5af35ab70958 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 21 Dec 2021 12:25:16 +0100 Subject: [PATCH 17/28] add tests for multinode checkpointing --- .../TransformGetCheckpointTests.java | 152 +++++++++++++----- 1 file changed, 109 insertions(+), 43 deletions(-) diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java index f264d33af52b..2b3d403e74c1 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformGetCheckpointTests.java @@ -9,14 +9,17 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; @@ -45,25 +48,33 @@ import org.junit.After; import org.junit.Before; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static java.util.Collections.emptySet; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TransformGetCheckpointTests extends ESSingleNodeTestCase { - private static final int NUMBER_OF_SHARDS = 5; private TransportService transportService; private ClusterService clusterService; private IndicesService indicesService; private ThreadPool threadPool; private IndexNameExpressionResolver indexNameExpressionResolver; private MockTransport mockTransport; - private final String indexName = "test_index"; + private Task transformTask; + private final String indexNamePattern = "test_index-"; + private String[] testIndices; + private int numberOfNodes; + private int numberOfIndices; + private int numberOfShards; private TestTransportGetCheckpointAction getCheckpointAction; private TestTransportGetCheckpointNodeAction getCheckpointNodeAction; @@ -73,6 +84,10 @@ public class TransformGetCheckpointTests extends ESSingleNodeTestCase { @Before public void setUp() throws Exception { super.setUp(); + numberOfNodes = randomIntBetween(1, 10); + numberOfIndices = randomIntBetween(1, 10); + // create at least as many shards as nodes, so every node has at least 1 shard + numberOfShards = randomIntBetween(numberOfNodes, numberOfNodes * 3); threadPool = new TestThreadPool("GetCheckpointActionTests"); indexNameExpressionResolver = new MockResolver(); clusterService = getInstanceFromNode(ClusterService.class); @@ -81,15 +96,13 @@ public void setUp() throws Exception { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { if (action.equals(GetCheckpointNodeAction.NAME)) { + getCheckpointNodeAction.execute( + null, + (GetCheckpointNodeAction.Request) request, + ActionListener.wrap(r -> { this.handleResponse(requestId, r); }, e -> { + this.handleError(requestId, new TransportException(e.getMessage(), e)); - getCheckpointNodeAction.execute(null, (GetCheckpointNodeAction.Request) request, ActionListener.wrap(r -> { - this.handleResponse(requestId, r); - - }, e -> { - this.handleError(requestId, new TransportException(e.getMessage(), e)); - - }) - + }) ); } } @@ -105,9 +118,24 @@ protected void onSendRequest(long requestId, String action, TransportRequest req ); transportService.start(); transportService.acceptIncomingRequests(); + + List testIndicesList = new ArrayList<>(); + for (int i = 0; i < numberOfIndices; ++i) { + testIndicesList.add(indexNamePattern + i); + } + testIndices = testIndicesList.toArray(new String[0]); + clusterStateWithIndex = ClusterStateCreationUtils.state(numberOfNodes, testIndices, numberOfShards); + + transformTask = new Task( + 1L, + "persistent", + "action", + TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + "the_id", + TaskId.EMPTY_TASK_ID, + Collections.emptyMap() + ); getCheckpointAction = new TestTransportGetCheckpointAction(); getCheckpointNodeAction = new TestTransportGetCheckpointNodeAction(); - clusterStateWithIndex = ClusterStateCreationUtils.state(indexName, 3, NUMBER_OF_SHARDS); } @Override @@ -118,34 +146,49 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testEmptyCheckpoint() { + public void testEmptyCheckpoint() throws InterruptedException { + GetCheckpointAction.Request request = new GetCheckpointAction.Request(Strings.EMPTY_ARRAY, IndicesOptions.LENIENT_EXPAND_OPEN); + assertCheckpointAction(request, response -> { + assertNotNull(response.getCheckpoints()); + Map checkpoints = response.getCheckpoints(); + assertTrue(checkpoints.isEmpty()); + }); } - public void testSingleNodeRequests() { + public void testSingleIndexRequest() throws InterruptedException { GetCheckpointAction.Request request = new GetCheckpointAction.Request( - new String[] { indexName }, + new String[] { indexNamePattern + "0" }, IndicesOptions.LENIENT_EXPAND_OPEN ); - Task transformTask = new Task( - 1L, - "persistent", - "action", - TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + "the_id", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap() - ); - ActionTestUtils.execute(getCheckpointAction, transformTask, request, ActionListener.wrap(response -> { + assertCheckpointAction(request, response -> { assertNotNull(response.getCheckpoints()); Map checkpoints = response.getCheckpoints(); assertEquals(1, checkpoints.size()); - assertTrue(checkpoints.containsKey(indexName)); - for (int i = 0; i < NUMBER_OF_SHARDS; ++i) { - assertEquals(42 + i, checkpoints.get(indexName)[i]); + assertTrue(checkpoints.containsKey(indexNamePattern + "0")); + for (int i = 0; i < numberOfShards; ++i) { + assertEquals(42 + i, checkpoints.get(indexNamePattern + "0")[i]); } - }, e -> { fail("got unexpected exception: " + e.getMessage()); })); + assertEquals(numberOfNodes, getCheckpointNodeAction.getCalls()); + }); + } + + public void testMultiIndexRequest() throws InterruptedException { + GetCheckpointAction.Request request = new GetCheckpointAction.Request(testIndices, IndicesOptions.LENIENT_EXPAND_OPEN); + assertCheckpointAction(request, response -> { + assertNotNull(response.getCheckpoints()); + Map checkpoints = response.getCheckpoints(); + assertEquals(testIndices.length, checkpoints.size()); + for (int i = 0; i < this.numberOfIndices; ++i) { + assertTrue(checkpoints.containsKey(indexNamePattern + i)); + for (int j = 0; j < numberOfShards; ++j) { + assertEquals(42 + i + j, checkpoints.get(indexNamePattern + i)[j]); + } + } + assertEquals(numberOfNodes, getCheckpointNodeAction.getCalls()); + }); } class TestTransportGetCheckpointAction extends TransportGetCheckpointAction { @@ -163,8 +206,28 @@ protected void doExecute(Task task, Request request, ActionListener li class TestTransportGetCheckpointNodeAction extends TransportGetCheckpointNodeAction { + private final IndicesService mockIndicesService; + private int calls; + TestTransportGetCheckpointNodeAction() { super(transportService, new ActionFilters(emptySet()), indicesService); + calls = 0; + mockIndicesService = mock(IndicesService.class); + for (int i = 0; i < numberOfIndices; ++i) { + IndexService mockIndexService = mock(IndexService.class); + IndexMetadata indexMeta = clusterStateWithIndex.metadata().index(indexNamePattern + i); + + IndexSettings mockIndexSettings = new IndexSettings(indexMeta, clusterService.getSettings()); + when(mockIndexService.getIndexSettings()).thenReturn(mockIndexSettings); + for (int j = 0; j < numberOfShards; ++j) { + IndexShard mockIndexShard = mock(IndexShard.class); + when(mockIndexService.getShard(j)).thenReturn(mockIndexShard); + SeqNoStats seqNoStats = new SeqNoStats(42 + i + j, 42 + i + j, 42 + i + j); + when(mockIndexShard.seqNoStats()).thenReturn(seqNoStats); + } + + when(mockIndicesService.indexServiceSafe(indexMeta.getIndex())).thenReturn(mockIndexService); + } } @Override @@ -173,23 +236,13 @@ protected void doExecute( GetCheckpointNodeAction.Request request, ActionListener listener ) { - IndicesService mockIndicesService = mock(IndicesService.class); - IndexService mockIndexService = mock(IndexService.class); - when(mockIndicesService.indexServiceSafe(any())).thenReturn(mockIndexService); - IndexSettings mockIndexSettings = new IndexSettings( - clusterStateWithIndex.metadata().index(indexName), - clusterService.getSettings() - ); - when(mockIndexService.getIndexSettings()).thenReturn(mockIndexSettings); - for (int i = 0; i < NUMBER_OF_SHARDS; ++i) { - IndexShard mockIndexShard = mock(IndexShard.class); - when(mockIndexService.getShard(i)).thenReturn(mockIndexShard); - SeqNoStats seqNoStats = new SeqNoStats(42 + i, 42 + i, 42 + i); - when(mockIndexShard.seqNoStats()).thenReturn(seqNoStats); - } - + ++calls; getGlobalCheckpoints(mockIndicesService, request.getShards(), listener); } + + public int getCalls() { + return calls; + } } static class MockResolver extends IndexNameExpressionResolver { @@ -222,4 +275,17 @@ public Index[] concreteIndices(ClusterState state, IndicesRequest request) { } } + private void assertCheckpointAction(GetCheckpointAction.Request request, Consumer furtherTests) + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + ActionTestUtils.execute(getCheckpointAction, transformTask, request, listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } } From c3550e05274f0846f9cb67675394b5414890a6bf Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 21 Dec 2021 17:43:21 +0100 Subject: [PATCH 18/28] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Przemysław Witek --- .../xpack/core/transform/action/GetCheckpointAction.java | 5 +++-- .../transform/action/GetCheckpointActionRequestTests.java | 4 ++-- .../transform/action/TransportGetCheckpointAction.java | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index b0d06505a1f7..81a0191887b6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -46,7 +47,7 @@ public Request(StreamInput in) throws IOException { } public Request(String[] indices, IndicesOptions indicesOptions) { - this.indices = indices; + this.indices = indices != null ? indices : Strings.EMPTY_ARRAY; this.indicesOptions = indicesOptions; } @@ -86,7 +87,7 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeStringArrayNullable(indices); + out.writeStringArray(indices); indicesOptions.writeIndicesOptions(out); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java index 51927906e1cc..48704068e3d8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointActionRequestTests.java @@ -24,7 +24,7 @@ public class GetCheckpointActionRequestTests extends AbstractWireSerializingTest @Override protected Request createTestInstance() { return new Request( - generateRandomStringArray(10, 10, false, false), + randomBoolean() ? null : generateRandomStringArray(10, 10, false, false), IndicesOptions.fromParameters( randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT), Boolean.toString(randomBoolean()), @@ -42,7 +42,7 @@ protected Reader instanceReader() { @Override protected Request mutateInstance(Request instance) throws IOException { - List indices = new ArrayList<>(Arrays.asList(instance.indices())); + List indices = instance.indices() != null ? new ArrayList<>(Arrays.asList(instance.indices())) : new ArrayList<>(); IndicesOptions indicesOptions = instance.indicesOptions(); switch (between(0, 1)) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index bd607bfce037..a3fbc4e9e008 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -101,7 +101,7 @@ private Map> resolveIndicesToPrimaryShards(ClusterState sta ShardsIterator shardsIt = state.routingTable().allShards(concreteIndices); for (ShardRouting shard : shardsIt) { // only take primary shards, which should be exactly 1, this isn't strictly necessary - // and we should consider taking any shard copy, but than we need another way to de-dup + // and we should consider taking any shard copy, but then we need another way to de-dup if (shard.primary() == false) { continue; } @@ -131,8 +131,8 @@ protected AsyncGetCheckpointsFromNodesAction( this.task = task; this.listener = listener; this.nodesAndShards = nodesAndShards; - nodes = clusterState.nodes(); - localNodeId = clusterService.localNode().getId(); + this.nodes = clusterState.nodes(); + this.localNodeId = clusterService.localNode().getId(); } public void start() { From fdb02ad508f2419ac36dd5ffef6673a90da3ff8d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 7 Feb 2022 12:10:26 +0100 Subject: [PATCH 19/28] apply review comment --- .../xpack/transform/action/TransportGetCheckpointAction.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index a3fbc4e9e008..c79a3bd5571e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; @@ -185,10 +186,12 @@ public void start() { } logger.trace("get checkpoints from node {}", node); - transportService.sendRequest( + transportService.sendChildRequest( node, GetCheckpointNodeAction.NAME, nodeCheckpointsRequest, + task, + TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override From 9b5f57724518c706eb50c323942f8b9b3a32d532 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 7 Feb 2022 16:08:45 +0100 Subject: [PATCH 20/28] make checkpoint node operation internal --- .../xpack/core/transform/action/GetCheckpointNodeAction.java | 2 +- .../org/elasticsearch/xpack/security/operator/Constants.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java index 24f0a2367380..d9de14681de9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java @@ -28,7 +28,7 @@ public class GetCheckpointNodeAction extends ActionType Date: Mon, 7 Feb 2022 16:40:42 +0100 Subject: [PATCH 21/28] update doc comments --- .../xpack/transform/checkpoint/DefaultCheckpointProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 0eb03d55b585..45d6a1f711e6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -62,7 +62,7 @@ class DefaultCheckpointProvider implements CheckpointProvider { protected final TransformAuditor transformAuditor; protected final TransformConfig transformConfig; - // set of clusters that do not support 8.1+ checkpoint actions + // set of clusters that do not support 8.2+ checkpoint actions private final Set fallbackToBWC = new HashSet<>(); DefaultCheckpointProvider( @@ -194,7 +194,7 @@ private static void getCheckpointsFromOneClusterV2( } /** - * BWC fallback for nodes/cluster older than 8.1 + * BWC fallback for nodes/cluster older than 8.2 */ private static void getCheckpointsFromOneClusterBWC( Client client, From 2a35374fa64dab14129bec132d8ecf8abde7cdf3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 8 Feb 2022 20:01:14 +0100 Subject: [PATCH 22/28] overhaul privilige checking --- .../privilege/ClusterPrivilegeResolver.java | 4 +- .../authz/privilege/IndexPrivilege.java | 4 +- .../transform/action/GetCheckpointAction.java | 12 +++- .../action/GetCheckpointNodeAction.java | 2 +- .../xpack/security/operator/Constants.java | 4 +- .../test/multi_cluster/80_transform.yml | 4 +- .../test/remote_cluster/80_transform.yml | 59 ++++++++++++++++++- .../action/TransportGetCheckpointAction.java | 1 - 8 files changed, 78 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index ff614fc09c4b..3144f434c4c3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -90,7 +90,9 @@ public class ClusterPrivilegeResolver { "cluster:admin/data_frame/*", "cluster:monitor/data_frame/*", "cluster:monitor/transform/*", - "cluster:admin/transform/*" + "cluster:admin/transform/*", + "cluster:internal/transform/*", + "indices:internal/transform/*" ); private static final Set MANAGE_WATCHER_PATTERN = Set.of("cluster:admin/xpack/watcher/*", "cluster:monitor/xpack/watcher/*"); private static final Set TRANSPORT_CLIENT_PATTERN = Set.of("cluster:monitor/nodes/liveness", "cluster:monitor/state"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java index 0f79f7b9310a..4c94e647c44f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction; import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.core.security.support.Automatons; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import java.util.Arrays; import java.util.Collection; @@ -99,7 +100,8 @@ public final class IndexPrivilege extends Privilege { GetDataStreamAction.NAME, ResolveIndexAction.NAME, FieldCapabilitiesAction.NAME + "*", - GetRollupIndexCapsAction.NAME + "*" + GetRollupIndexCapsAction.NAME + "*", + GetCheckpointAction.NAME + "*" ); private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns( PutFollowAction.NAME, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index 81a0191887b6..38248e5eee4c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -29,15 +29,15 @@ public class GetCheckpointAction extends ActionType { - "source": { "index": ["my_remote_cluster:remote_test_index", "my_remote_cluster:remote_test_index_2"] } + "source": { "index": ["my_remote_cluster:remote_test_index*"] } } - do: headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } @@ -155,7 +155,7 @@ teardown: transform_id: "simple-remote-transform" - do: - catch: /Cannot preview transform \[simple-remote-transform\] because user bob lacks all the required permissions for indices. \[my_remote_cluster:remote_test_index, my_remote_cluster:remote_test_index_2, simple-remote-transform\]/ + catch: /Cannot preview transform \[simple-remote-transform\] because user bob lacks all the required permissions for indices. \[my_remote_cluster:remote_test_index\*, simple-remote-transform\]/ headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" } # This is bob transform.preview_transform: transform_id: "simple-remote-transform" diff --git a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml index c550f148f956..83f94a213303 100644 --- a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml +++ b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml @@ -22,7 +22,7 @@ setup: "cluster": [], "indices": [ { - "names": ["remote_test_index*"], + "names": ["remote_test_index", "remote_test_index_2"], "privileges": ["read", "view_index_metadata"] } ] @@ -139,3 +139,60 @@ teardown: - length: { aggregations.user.buckets: 2 } - match: { aggregations.user.buckets.0.key: "d" } - match: { aggregations.user.buckets.0.doc_count: 1 } + + # create a 3rd index, but for this index joe has no privileges + - do: + indices.create: + index: remote_test_index_3 + body: + settings: + index: + number_of_shards: 3 + number_of_replicas: 0 + aliases: + test_alias: {} + mappings: + properties: + time: + type: date + user: + type: keyword + stars: + type: integer + coolness: + type: integer + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "remote_test_index_3"}}' + - '{"user": "z", "stars": 2, "date" : "2018-11-29T12:12:12.123456789Z"}' + - '{"index": {"_index": "remote_test_index_3"}}' + - '{"user": "x", "stars": 1, "date" : "2018-11-29T12:14:12.123456789Z"}' + - do: + search: + rest_total_hits_as_int: true + index: remote_test_index_3 + body: + aggs: + user: + terms: + field: user + + - match: { _shards.total: 3 } + - match: { hits.total: 2 } + - length: { aggregations.user.buckets: 2 } + + # search should fail for joe + - do: + catch: /action \[indices:data/read/search\] is unauthorized for user \[joe\] .*/ + headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } + search: + rest_total_hits_as_int: true + index: remote_test_index_3 + body: + aggs: + user: + terms: + field: user diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index c79a3bd5571e..bdfa6dc8696c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -170,7 +170,6 @@ public void start() { GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( oneNodeAndItsShards.getValue() ); - nodeCheckpointsRequest.setParentTask(clusterService.localNode().getId(), task.getId()); DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); // paranoia: this should not be possible using the same cluster state From ff8ad3216ef50b1d264cb12cc9cbaab9c76ac058 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 14 Feb 2022 14:01:31 +0100 Subject: [PATCH 23/28] make GetCheckpointNodeAction an IndicesRequest --- .../privilege/ClusterPrivilegeResolver.java | 4 +- .../action/GetCheckpointNodeAction.java | 32 ++++++++++++--- .../GetCheckpointNodeActionRequestTests.java | 40 ++++++++++++++----- .../xpack/security/operator/Constants.java | 2 +- .../action/TransportGetCheckpointAction.java | 9 ++++- 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index 3144f434c4c3..ff614fc09c4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -90,9 +90,7 @@ public class ClusterPrivilegeResolver { "cluster:admin/data_frame/*", "cluster:monitor/data_frame/*", "cluster:monitor/transform/*", - "cluster:admin/transform/*", - "cluster:internal/transform/*", - "indices:internal/transform/*" + "cluster:admin/transform/*" ); private static final Set MANAGE_WATCHER_PATTERN = Set.of("cluster:admin/xpack/watcher/*", "cluster:monitor/xpack/watcher/*"); private static final Set TRANSPORT_CLIENT_PATTERN = Set.of("cluster:monitor/nodes/liveness", "cluster:monitor/state"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java index 05ebfd2328e7..e59ded094845 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java @@ -11,6 +11,9 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -28,7 +31,7 @@ public class GetCheckpointNodeAction extends ActionType shards; + private final OriginalIndices originalIndices; - public Request(Set shards) { + public Request(Set shards, OriginalIndices originalIndices) { this.shards = shards; + this.originalIndices = originalIndices; } public Request(StreamInput in) throws IOException { super(in); this.shards = Collections.unmodifiableSet(in.readSet(ShardId::new)); + this.originalIndices = OriginalIndices.readOriginalIndices(in); } @Override @@ -102,12 +108,17 @@ public ActionRequestValidationException validate() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeCollection(shards); + OriginalIndices.writeOriginalIndices(originalIndices, out); } public Set getShards() { return shards; } + public OriginalIndices getOriginalIndices() { + return originalIndices; + } + @Override public boolean equals(Object obj) { if (obj == this) { @@ -118,12 +129,23 @@ public boolean equals(Object obj) { } Request that = (Request) obj; - return Objects.equals(shards, that.shards); + return Objects.equals(shards, that.shards) && Objects.equals(originalIndices, that.originalIndices); } @Override public int hashCode() { - return Objects.hash(shards); + return Objects.hash(shards, originalIndices); + } + + @Override + public String[] indices() { + return originalIndices.indices(); } + + @Override + public IndicesOptions indicesOptions() { + return originalIndices.indicesOptions(); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java index 7e3f91942ebc..fd3573f1acae 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeActionRequestTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractWireSerializingTestCase; @@ -26,30 +28,48 @@ protected Reader instanceReader() { @Override protected Request createTestInstance() { Set shards = new HashSet<>(); + OriginalIndices originalIndices = randomOriginalIndices(randomIntBetween(0, 20)); int numberOfRandomShardIds = randomInt(10); for (int i = 0; i < numberOfRandomShardIds; ++i) { shards.add(new ShardId(randomAlphaOfLength(4) + i, randomAlphaOfLength(4), randomInt(5))); } - return new Request(shards); + return new Request(shards, originalIndices); } @Override protected Request mutateInstance(Request instance) throws IOException { - Set shards = new HashSet<>(instance.getShards()); - if (randomBoolean() && shards.size() > 0) { - ShardId firstShard = shards.iterator().next(); - shards.remove(firstShard); - if (randomBoolean()) { - shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5))); + switch (random().nextInt(1)) { + case 0 -> { + Set shards = new HashSet<>(instance.getShards()); + if (randomBoolean() && shards.size() > 0) { + ShardId firstShard = shards.iterator().next(); + shards.remove(firstShard); + if (randomBoolean()) { + shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5))); + } + } else { + shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5))); + } + return new Request(shards, instance.getOriginalIndices()); } - } else { - shards.add(new ShardId(randomAlphaOfLength(8), randomAlphaOfLength(4), randomInt(5))); + case 1 -> { + OriginalIndices originalIndices = randomOriginalIndices(instance.indices().length + 1); + return new Request(instance.getShards(), originalIndices); + } + default -> throw new IllegalStateException("The test should only allow 1 parameters mutated"); } + } - return new Request(shards); + private OriginalIndices randomOriginalIndices(int numIndices) { + String[] randomIndices = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + randomIndices[i] = randomAlphaOfLengthBetween(5, 10); + } + IndicesOptions indicesOptions = randomBoolean() ? IndicesOptions.strictExpand() : IndicesOptions.lenientExpandOpen(); + return new OriginalIndices(randomIndices, indicesOptions); } } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index e5ea33dc0a57..9d68f61c56f4 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -96,7 +96,7 @@ public class Constants { "cluster:admin/transform/upgrade", "cluster:admin/transform/validate", "indices:internal/transform/checkpoint", - "cluster:internal/transform/checkpoint[n]", + "indices:internal/transform/checkpoint[n]", // "cluster:admin/voting_config/add_exclusions", // "cluster:admin/voting_config/clear_exclusions", "cluster:admin/xpack/ccr/auto_follow_pattern/activate", diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index bdfa6dc8696c..7ee0315c964b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; @@ -88,7 +89,7 @@ protected void resolveIndicesAndGetCheckpoint(Task task, Request request, Action return; } - new AsyncGetCheckpointsFromNodesAction(state, task, nodesAndShards, listener).start(); + new AsyncGetCheckpointsFromNodesAction(state, task, nodesAndShards, new OriginalIndices(request), listener).start(); } private Map> resolveIndicesToPrimaryShards(ClusterState state, String[] concreteIndices) { @@ -120,6 +121,7 @@ protected class AsyncGetCheckpointsFromNodesAction { private final Task task; private final ActionListener listener; private final Map> nodesAndShards; + private final OriginalIndices originalIndices; private final DiscoveryNodes nodes; private final String localNodeId; @@ -127,11 +129,13 @@ protected AsyncGetCheckpointsFromNodesAction( ClusterState clusterState, Task task, Map> nodesAndShards, + OriginalIndices originalIndices, ActionListener listener ) { this.task = task; this.listener = listener; this.nodesAndShards = nodesAndShards; + this.originalIndices = originalIndices; this.nodes = clusterState.nodes(); this.localNodeId = clusterService.localNode().getId(); } @@ -168,7 +172,8 @@ public void start() { } GetCheckpointNodeAction.Request nodeCheckpointsRequest = new GetCheckpointNodeAction.Request( - oneNodeAndItsShards.getValue() + oneNodeAndItsShards.getValue(), + originalIndices ); DiscoveryNode node = nodes.get(oneNodeAndItsShards.getKey()); From 4b3d8c9ecfbabd76dc651faaa4afd944b1f8c201 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 14 Feb 2022 14:32:58 +0100 Subject: [PATCH 24/28] add logging --- .../transform/checkpoint/DefaultCheckpointProvider.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 45d6a1f711e6..0ed005f4f92e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -155,7 +155,14 @@ private void getCheckpointsFromOneCluster( if (fallbackToBWC.contains(cluster)) { getCheckpointsFromOneClusterBWC(client, headers, indices, cluster, listener); } else { - getCheckpointsFromOneClusterV2(client, headers, indices, cluster, ActionListener.wrap(listener::onResponse, e -> { + getCheckpointsFromOneClusterV2(client, headers, indices, cluster, ActionListener.wrap(response -> { + logger.debug( + "[{}] Successfully retrieved checkpoints from cluster [{}] using transform checkpoint API", + transformConfig.getId(), + cluster + ); + listener.onResponse(response); + }, e -> { Throwable unwrappedException = ExceptionsHelper.unwrapCause(e); if (unwrappedException instanceof ActionNotFoundTransportException) { // this is an implementation detail, so not necessary to audit or warn, but only report as debug From 836d3f15a71d0b702eebbbf98b347e41602dc312 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 14 Feb 2022 17:24:18 +0100 Subject: [PATCH 25/28] add code comments --- .../xpack/core/security/authz/privilege/IndexPrivilege.java | 2 +- .../xpack/core/transform/action/GetCheckpointAction.java | 5 ++++- .../xpack/core/transform/action/GetCheckpointNodeAction.java | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java index 4c94e647c44f..24589c3525f0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java @@ -101,7 +101,7 @@ public final class IndexPrivilege extends Privilege { ResolveIndexAction.NAME, FieldCapabilitiesAction.NAME + "*", GetRollupIndexCapsAction.NAME + "*", - GetCheckpointAction.NAME + "*" + GetCheckpointAction.NAME + "*" // transform internal action ); private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns( PutFollowAction.NAME, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index 38248e5eee4c..a1577646b5d4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -24,11 +24,14 @@ import java.util.Map.Entry; import java.util.Objects; +/** + * Transform internal API (no REST layer) to retrieve index checkpoints. + */ public class GetCheckpointAction extends ActionType { public static final GetCheckpointAction INSTANCE = new GetCheckpointAction(); - // note: this is an admin action, it must be called with user headers + // note: this is an index action and requires `view_index_metadata` public static final String NAME = "indices:internal/transform/checkpoint"; private GetCheckpointAction() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java index e59ded094845..341cc0a9cec0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointNodeAction.java @@ -30,7 +30,7 @@ public class GetCheckpointNodeAction extends ActionType Date: Tue, 15 Feb 2022 11:56:47 +0100 Subject: [PATCH 26/28] fix mixed version issue --- .../transform/action/TransportGetCheckpointAction.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index 7ee0315c964b..89670f1ff2e8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -27,6 +28,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; @@ -75,6 +77,8 @@ protected void doExecute(Task task, Request request, ActionListener li } protected void resolveIndicesAndGetCheckpoint(Task task, Request request, ActionListener listener, final ClusterState state) { + // note: when security is turned on, the indices are already resolved + // TODO: do a quick check and only resolve if necessary?? String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames( state, request.indicesOptions(), @@ -108,6 +112,11 @@ private Map> resolveIndicesToPrimaryShards(ClusterState sta continue; } if (shard.assignedToNode() && nodes.get(shard.currentNodeId()) != null) { + // special case: a node that holds the shard is on an old version + if (nodes.get(shard.currentNodeId()).getVersion().before(Version.V_8_2_0)) { + throw new ActionNotFoundTransportException(GetCheckpointNodeAction.NAME); + } + String nodeId = shard.currentNodeId(); nodesAndShards.computeIfAbsent(nodeId, k -> new HashSet<>()).add(shard.shardId()); } else { From 0426dbe8309afce0bcc315c5c68c7b506904eefd Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 15 Feb 2022 18:00:42 +0100 Subject: [PATCH 27/28] disable remote indizes explicitly and explain why, use shorter call for resolving indices. --- .../xpack/core/transform/action/GetCheckpointAction.java | 6 ++++++ .../transform/action/TransportGetCheckpointAction.java | 7 +------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java index a1577646b5d4..168853fa9bf7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetCheckpointAction.java @@ -99,6 +99,12 @@ public IndicesRequest indices(String... indices) { this.indices = indices; return this; } + + // this action does not allow remote indices, but they have to be resolved upfront, see {@link DefaultCheckpointProvider} + @Override + public boolean allowsRemoteIndices() { + return false; + } } public static class Response extends ActionResponse { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java index 89670f1ff2e8..0397c38ec90e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetCheckpointAction.java @@ -79,12 +79,7 @@ protected void doExecute(Task task, Request request, ActionListener li protected void resolveIndicesAndGetCheckpoint(Task task, Request request, ActionListener listener, final ClusterState state) { // note: when security is turned on, the indices are already resolved // TODO: do a quick check and only resolve if necessary?? - String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames( - state, - request.indicesOptions(), - true, // includeDataStreams - request.indices() - ); + String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames(state, request); Map> nodesAndShards = resolveIndicesToPrimaryShards(state, concreteIndices); From 1fa096fda844fbfb9e86add4899f507f61f44a63 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 15 Feb 2022 21:18:05 +0100 Subject: [PATCH 28/28] document the access control check --- .../rest-api-spec/test/multi_cluster/80_transform.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml index f6bca3baa39f..4ded3005fb4e 100644 --- a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml +++ b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml @@ -146,7 +146,8 @@ teardown: - match: { count: 1 } - match: { transforms.0.id: "simple-remote-transform" } - match: { transforms.0.state: "stopped" } - # we added test_index_2, which has 2 more docs: + # the source now includes test_index_2, which has 2 more docs + # note that test_index_3 fits the wildcard pattern, but is not authorized, this test should not return a count of 4 as this would mean broken access control - match: { transforms.0.checkpointing.operations_behind: 2 } - do: