From 3faee332db8fe75f61c398b5b07b559cc67724b0 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 18 Nov 2022 15:31:48 +0100 Subject: [PATCH] [Transform] Fix failure when resolving indices from CCS (#91622) properly prefix remote indices in checkpoints, fixes a failure when more than 1 cluster is used and index names clash relates #80984 fixes #91550 --- docs/changelog/91622.yaml | 6 + .../test/multi_cluster/80_transform.yml | 102 +++++++++++++++- .../test/remote_cluster/80_transform.yml | 33 ++++- .../checkpoint/DefaultCheckpointProvider.java | 26 +++- .../DefaultCheckpointProviderTests.java | 114 ++++++++++++++++++ 5 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 docs/changelog/91622.yaml diff --git a/docs/changelog/91622.yaml b/docs/changelog/91622.yaml new file mode 100644 index 0000000000000..f105f27d95c6a --- /dev/null +++ b/docs/changelog/91622.yaml @@ -0,0 +1,6 @@ +pr: 91622 +summary: Fix failure when resolving indices from CCS +area: Transform +type: bug +issues: + - 91550 diff --git a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml index 744294dc3a437..fe0b930cc1936 100644 --- a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml +++ b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml @@ -32,15 +32,15 @@ setup: "cluster": [], "indices": [ { - "names": ["test_index"], + "names": ["test_index", "same_index_local_and_remote"], "privileges": ["read", "view_index_metadata"] }, { - "names": ["simple-remote-transform*", "simple-local-remote-transform"], + "names": ["simple-remote-transform*", "simple-local-remote-transform", "same-index-local-and-remote-transform"], "privileges": ["create_index", "index", "read"] }, { - "names": ["my_remote_cluster:remote_test_i*", "my_remote_cluster:aliased_test_index"], + "names": ["my_remote_cluster:remote_test_i*", "my_remote_cluster:aliased_test_index", "my_remote_cluster:same_index_local_and_remote"], "privileges": ["read", "view_index_metadata"] } ] @@ -54,7 +54,7 @@ setup: "cluster": [], "indices": [ { - "names": ["simple-remote-transform*", "simple-local-remote-transform"], + "names": ["simple-remote-transform*", "simple-local-remote-transform", "same-index-local-and-remote-transform"], "privileges": ["create_index", "index", "read"] } ] @@ -379,3 +379,97 @@ teardown: "aggs": { "avg_stars": {"avg": {"field": "stars"}}} } } + +--- +"Batch transform local and remote index sharing the same name": + # create index with the same name on local and remote gh#91550 + - do: + indices.create: + index: same_index_local_and_remote + body: + settings: + index: + number_of_shards: 3 + number_of_replicas: 0 + aliases: + test_alias: {} + mappings: + properties: + time: + type: date + user: + type: keyword + stars: + type: integer + coolness: + type: integer + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "same_index_local_and_remote"}}' + - '{"user": "z", "stars": 1, "date" : "2018-11-29T12:12:12.123456789Z"}' + - '{"index": {"_index": "same_index_local_and_remote"}}' + - '{"user": "a", "stars": 2, "date" : "2018-11-29T12:14:12.123456789Z"}' + + - do: + headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } + transform.put_transform: + transform_id: "same-index-local-and-remote-transform" + body: > + { + "source": { "index": ["same_index_local_and_remote", "my_remote_cluster:same_index_local_and_remote"] }, + "dest": { "index": "same-index-local-and-remote-transform" }, + "pivot": { + "group_by": { "user": {"terms": {"field": "user"}}}, + "aggs": { + "avg_stars": {"avg": {"field": "stars"}}, + "count": {"value_count": {"field": "user"}} + } + } + } + - match: { acknowledged: true } + + - do: + headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } + transform.start_transform: + transform_id: "same-index-local-and-remote-transform" + - match: { acknowledged: true } + + - do: + transform.get_transform_stats: + transform_id: "same-index-local-and-remote-transform" + - match: { count: 1 } + - match: { transforms.0.id: "same-index-local-and-remote-transform" } + - match: { transforms.0.state: "/started|indexing|stopping|stopped/" } + + - do: + headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } + transform.stop_transform: + transform_id: "same-index-local-and-remote-transform" + wait_for_completion: true + wait_for_checkpoint: true + - match: { acknowledged: true } + + - do: + headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } + transform.get_transform_stats: + transform_id: "same-index-local-and-remote-transform" + - match: { count: 1 } + - match: { transforms.0.id: "same-index-local-and-remote-transform" } + - match: { transforms.0.state: "stopped" } + - match: { transforms.0.checkpointing.last.checkpoint: 1 } + + - do: + headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" } + search: + rest_total_hits_as_int: true + index: same-index-local-and-remote-transform + sort: user + + - match: { hits.total: 3 } + - match: { hits.hits.0._index: same-index-local-and-remote-transform } + - match: { hits.hits.0._source.avg_stars: 3 } + - match: { hits.hits.0._source.count: 2 } + - match: { hits.hits.0._source.user: a } diff --git a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml index 83f94a213303a..34008740036c3 100644 --- a/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml +++ b/x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml @@ -22,7 +22,7 @@ setup: "cluster": [], "indices": [ { - "names": ["remote_test_index", "remote_test_index_2"], + "names": ["remote_test_index", "remote_test_index_2", "same_index_local_and_remote"], "privileges": ["read", "view_index_metadata"] } ] @@ -196,3 +196,34 @@ teardown: user: terms: field: user + + # create index with the same name on local and remote gh#91550 + - do: + indices.create: + index: same_index_local_and_remote + body: + settings: + index: + number_of_shards: 3 + number_of_replicas: 0 + aliases: + test_alias: {} + mappings: + properties: + time: + type: date + user: + type: keyword + stars: + type: integer + coolness: + type: integer + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "same_index_local_and_remote"}}' + - '{"user": "x", "stars": 3, "date" : "2018-10-29T12:12:12.123456789Z"}' + - '{"index": {"_index": "same_index_local_and_remote"}}' + - '{"user": "a", "stars": 4, "date" : "2018-10-29T12:14:12.123456789Z"}' diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 9c7a66e1959a5..10321efbce32e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -191,14 +191,38 @@ private static void getCheckpointsFromOneClusterV2( ) { GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(indices, IndicesOptions.LENIENT_EXPAND_OPEN); + ActionListener checkpointListener; + if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) { + checkpointListener = ActionListener.wrap( + checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()), + listener::onFailure + ); + } else { + checkpointListener = ActionListener.wrap( + checkpointResponse -> listener.onResponse( + checkpointResponse.getCheckpoints() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + entry.getKey(), + entry -> entry.getValue() + ) + ) + ), + listener::onFailure + ); + } + ClientHelper.executeWithHeadersAsync( headers, ClientHelper.TRANSFORM_ORIGIN, client, GetCheckpointAction.INSTANCE, getCheckpointRequest, - ActionListener.wrap(checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()), listener::onFailure) + checkpointListener ); + } /** diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 323bcb9b5abae..d852b2ceb740b 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.MockLogAppender.LoggingExpectation; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -43,9 +44,13 @@ import java.time.ZoneId; import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -63,6 +68,9 @@ public class DefaultCheckpointProviderTests extends ESTestCase { private Clock clock; private Client client; + private Client remoteClient1; + private Client remoteClient2; + private Client remoteClient3; private IndexBasedTransformConfigManager transformConfigManager; private MockTransformAuditor transformAuditor; @@ -73,6 +81,15 @@ public void setUpMocks() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); + remoteClient1 = mock(Client.class); + when(remoteClient1.threadPool()).thenReturn(threadPool); + remoteClient2 = mock(Client.class); + when(remoteClient2.threadPool()).thenReturn(threadPool); + remoteClient3 = mock(Client.class); + when(remoteClient3.threadPool()).thenReturn(threadPool); + when(client.getRemoteClusterClient("remote-1")).thenReturn(remoteClient1); + when(client.getRemoteClusterClient("remote-2")).thenReturn(remoteClient2); + when(client.getRemoteClusterClient("remote-3")).thenReturn(remoteClient3); transformConfigManager = mock(IndexBasedTransformConfigManager.class); transformAuditor = MockTransformAuditor.createMockAuditor(); } @@ -268,6 +285,103 @@ public void testSourceHasChanged() throws InterruptedException { assertThat(exceptionHolder.get(), is(nullValue())); } + // regression test for gh#91550, testing a local and a remote the same index name + public void testCreateNextCheckpointWithRemoteClient() throws InterruptedException { + String transformId = getTestName(); + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + + GetCheckpointAction.Response checkpointResponse = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 1L, 2L, 3L })); + doAnswer(withResponse(checkpointResponse)).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); + + GetCheckpointAction.Response remoteCheckpointResponse = new GetCheckpointAction.Response( + Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L }) + ); + doAnswer(withResponse(remoteCheckpointResponse)).when(remoteClient1).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); + + RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); + + // local and remote share the same index name + when(remoteClusterResolver.resolve(any())).thenReturn( + new RemoteClusterResolver.ResolvedIndices(Map.of("remote-1", List.of("index-1")), List.of("index-1")) + ); + + DefaultCheckpointProvider provider = new DefaultCheckpointProvider( + clock, + client, + remoteClusterResolver, + transformConfigManager, + transformAuditor, + transformConfig + ); + + SetOnce checkpointHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.createNextCheckpoint( + new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L), + new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + assertThat(exceptionHolder.get(), is(nullValue())); + assertNotNull(checkpointHolder.get()); + assertThat(checkpointHolder.get().getCheckpoint(), is(equalTo(8L))); + assertThat(checkpointHolder.get().getIndicesCheckpoints().keySet(), containsInAnyOrder("index-1", "remote-1:index-1")); + } + + // regression test for gh#91550, testing 3 remotes with same index name + public void testCreateNextCheckpointWithRemoteClients() throws InterruptedException { + String transformId = getTestName(); + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + + GetCheckpointAction.Response remoteCheckpointResponse1 = new GetCheckpointAction.Response( + Map.of("index-1", new long[] { 1L, 2L, 3L }) + ); + doAnswer(withResponse(remoteCheckpointResponse1)).when(remoteClient1).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); + + GetCheckpointAction.Response remoteCheckpointResponse2 = new GetCheckpointAction.Response( + Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L }) + ); + doAnswer(withResponse(remoteCheckpointResponse2)).when(remoteClient2).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); + + GetCheckpointAction.Response remoteCheckpointResponse3 = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 9L })); + doAnswer(withResponse(remoteCheckpointResponse3)).when(remoteClient3).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); + + RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); + + // local and remote share the same index name + when(remoteClusterResolver.resolve(any())).thenReturn( + new RemoteClusterResolver.ResolvedIndices( + Map.of("remote-1", List.of("index-1"), "remote-2", List.of("index-1"), "remote-3", List.of("index-1")), + Collections.emptyList() + ) + ); + + DefaultCheckpointProvider provider = new DefaultCheckpointProvider( + clock, + client, + remoteClusterResolver, + transformConfigManager, + transformAuditor, + transformConfig + ); + + SetOnce checkpointHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.createNextCheckpoint( + new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L), + new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + assertThat(exceptionHolder.get(), is(nullValue())); + assertNotNull(checkpointHolder.get()); + assertThat(checkpointHolder.get().getCheckpoint(), is(equalTo(8L))); + assertThat( + checkpointHolder.get().getIndicesCheckpoints().keySet(), + containsInAnyOrder("remote-1:index-1", "remote-2:index-1", "remote-3:index-1") + ); + } + private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) { return new DefaultCheckpointProvider( clock,