Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Fix failure when resolving indices from CCS #91622

Merged
merged 5 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/91622.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91622
summary: Fix failure when resolving indices from CCS
area: Transform
type: bug
issues:
- 91550
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ setup:
"cluster": [],
"indices": [
{
"names": ["test_index"],
"names": ["test_index", "same_index_local_and_remote"],
"privileges": ["read", "view_index_metadata"]
},
{
"names": ["simple-remote-transform*", "simple-local-remote-transform"],
"names": ["simple-remote-transform*", "simple-local-remote-transform", "same-index-local-and-remote-transform"],
"privileges": ["create_index", "index", "read"]
},
{
"names": ["my_remote_cluster:remote_test_i*", "my_remote_cluster:aliased_test_index"],
"names": ["my_remote_cluster:remote_test_i*", "my_remote_cluster:aliased_test_index", "my_remote_cluster:same_index_local_and_remote"],
"privileges": ["read", "view_index_metadata"]
}
]
Expand All @@ -54,7 +54,7 @@ setup:
"cluster": [],
"indices": [
{
"names": ["simple-remote-transform*", "simple-local-remote-transform"],
"names": ["simple-remote-transform*", "simple-local-remote-transform", "same-index-local-and-remote-transform"],
"privileges": ["create_index", "index", "read"]
}
]
Expand Down Expand Up @@ -379,3 +379,97 @@ teardown:
"aggs": { "avg_stars": {"avg": {"field": "stars"}}}
}
}
---
"Batch transform local and remote index sharing the same name":
# create index with the same name on local and remote gh#91550
- do:
indices.create:
index: same_index_local_and_remote
body:
settings:
index:
number_of_shards: 3
number_of_replicas: 0
aliases:
test_alias: {}
mappings:
properties:
time:
type: date
user:
type: keyword
stars:
type: integer
coolness:
type: integer

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "z", "stars": 1, "date" : "2018-11-29T12:12:12.123456789Z"}'
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "a", "stars": 2, "date" : "2018-11-29T12:14:12.123456789Z"}'

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.put_transform:
transform_id: "same-index-local-and-remote-transform"
body: >
{
"source": { "index": ["same_index_local_and_remote", "my_remote_cluster:same_index_local_and_remote"] },
"dest": { "index": "same-index-local-and-remote-transform" },
"pivot": {
"group_by": { "user": {"terms": {"field": "user"}}},
"aggs": {
"avg_stars": {"avg": {"field": "stars"}},
"count": {"value_count": {"field": "user"}}
}
}
}
- match: { acknowledged: true }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.start_transform:
transform_id: "same-index-local-and-remote-transform"
- match: { acknowledged: true }

- do:
transform.get_transform_stats:
transform_id: "same-index-local-and-remote-transform"
- match: { count: 1 }
- match: { transforms.0.id: "same-index-local-and-remote-transform" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.stop_transform:
transform_id: "same-index-local-and-remote-transform"
wait_for_completion: true
wait_for_checkpoint: true
- match: { acknowledged: true }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.get_transform_stats:
transform_id: "same-index-local-and-remote-transform"
- match: { count: 1 }
- match: { transforms.0.id: "same-index-local-and-remote-transform" }
- match: { transforms.0.state: "stopped" }
- match: { transforms.0.checkpointing.last.checkpoint: 1 }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
search:
rest_total_hits_as_int: true
index: same-index-local-and-remote-transform
sort: user

- match: { hits.total: 3 }
- match: { hits.hits.0._index: same-index-local-and-remote-transform }
- match: { hits.hits.0._source.avg_stars: 3 }
- match: { hits.hits.0._source.count: 2 }
- match: { hits.hits.0._source.user: a }
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ setup:
"cluster": [],
"indices": [
{
"names": ["remote_test_index", "remote_test_index_2"],
"names": ["remote_test_index", "remote_test_index_2", "same_index_local_and_remote"],
"privileges": ["read", "view_index_metadata"]
}
]
Expand Down Expand Up @@ -196,3 +196,34 @@ teardown:
user:
terms:
field: user

# create index with the same name on local and remote gh#91550
- do:
indices.create:
index: same_index_local_and_remote
body:
settings:
index:
number_of_shards: 3
number_of_replicas: 0
aliases:
test_alias: {}
mappings:
properties:
time:
type: date
user:
type: keyword
stars:
type: integer
coolness:
type: integer

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "x", "stars": 3, "date" : "2018-10-29T12:12:12.123456789Z"}'
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "a", "stars": 4, "date" : "2018-10-29T12:14:12.123456789Z"}'
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,38 @@ private static void getCheckpointsFromOneClusterV2(
) {
GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(indices, IndicesOptions.LENIENT_EXPAND_OPEN);

ActionListener<GetCheckpointAction.Response> checkpointListener;
if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()),
listener::onFailure
);
} else {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(
checkpointResponse.getCheckpoints()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + entry.getKey(),
entry -> entry.getValue()
)
)
),
listener::onFailure
);
}

ClientHelper.executeWithHeadersAsync(
headers,
ClientHelper.TRANSFORM_ORIGIN,
client,
GetCheckpointAction.INSTANCE,
getCheckpointRequest,
ActionListener.wrap(checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()), listener::onFailure)
checkpointListener
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
Expand All @@ -43,9 +44,13 @@
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -63,6 +68,9 @@ public class DefaultCheckpointProviderTests extends ESTestCase {

private Clock clock;
private Client client;
private Client remoteClient1;
private Client remoteClient2;
private Client remoteClient3;
private IndexBasedTransformConfigManager transformConfigManager;
private MockTransformAuditor transformAuditor;

Expand All @@ -73,6 +81,15 @@ public void setUpMocks() {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
remoteClient1 = mock(Client.class);
when(remoteClient1.threadPool()).thenReturn(threadPool);
remoteClient2 = mock(Client.class);
when(remoteClient2.threadPool()).thenReturn(threadPool);
remoteClient3 = mock(Client.class);
when(remoteClient3.threadPool()).thenReturn(threadPool);
when(client.getRemoteClusterClient("remote-1")).thenReturn(remoteClient1);
when(client.getRemoteClusterClient("remote-2")).thenReturn(remoteClient2);
when(client.getRemoteClusterClient("remote-3")).thenReturn(remoteClient3);
transformConfigManager = mock(IndexBasedTransformConfigManager.class);
transformAuditor = MockTransformAuditor.createMockAuditor();
}
Expand Down Expand Up @@ -268,6 +285,103 @@ public void testSourceHasChanged() throws InterruptedException {
assertThat(exceptionHolder.get(), is(nullValue()));
}

// regression test for gh#91550, testing a local and a remote the same index name
public void testCreateNextCheckpointWithRemoteClient() throws InterruptedException {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);

GetCheckpointAction.Response checkpointResponse = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 1L, 2L, 3L }));
doAnswer(withResponse(checkpointResponse)).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

GetCheckpointAction.Response remoteCheckpointResponse = new GetCheckpointAction.Response(
Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L })
);
doAnswer(withResponse(remoteCheckpointResponse)).when(remoteClient1).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);

// local and remote share the same index name
when(remoteClusterResolver.resolve(any())).thenReturn(
new RemoteClusterResolver.ResolvedIndices(Map.of("remote-1", List.of("index-1")), List.of("index-1"))
);

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
clock,
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

SetOnce<TransformCheckpoint> checkpointHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.createNextCheckpoint(
new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L),
new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
assertThat(exceptionHolder.get(), is(nullValue()));
assertNotNull(checkpointHolder.get());
assertThat(checkpointHolder.get().getCheckpoint(), is(equalTo(8L)));
assertThat(checkpointHolder.get().getIndicesCheckpoints().keySet(), containsInAnyOrder("index-1", "remote-1:index-1"));
}

// regression test for gh#91550, testing 3 remotes with same index name
public void testCreateNextCheckpointWithRemoteClients() throws InterruptedException {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);

GetCheckpointAction.Response remoteCheckpointResponse1 = new GetCheckpointAction.Response(
Map.of("index-1", new long[] { 1L, 2L, 3L })
);
doAnswer(withResponse(remoteCheckpointResponse1)).when(remoteClient1).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

GetCheckpointAction.Response remoteCheckpointResponse2 = new GetCheckpointAction.Response(
Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L })
);
doAnswer(withResponse(remoteCheckpointResponse2)).when(remoteClient2).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

GetCheckpointAction.Response remoteCheckpointResponse3 = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 9L }));
doAnswer(withResponse(remoteCheckpointResponse3)).when(remoteClient3).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);

// local and remote share the same index name
when(remoteClusterResolver.resolve(any())).thenReturn(
new RemoteClusterResolver.ResolvedIndices(
Map.of("remote-1", List.of("index-1"), "remote-2", List.of("index-1"), "remote-3", List.of("index-1")),
Collections.emptyList()
)
);

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
clock,
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

SetOnce<TransformCheckpoint> checkpointHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.createNextCheckpoint(
new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L),
new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
assertThat(exceptionHolder.get(), is(nullValue()));
assertNotNull(checkpointHolder.get());
assertThat(checkpointHolder.get().getCheckpoint(), is(equalTo(8L)));
assertThat(
checkpointHolder.get().getIndicesCheckpoints().keySet(),
containsInAnyOrder("remote-1:index-1", "remote-2:index-1", "remote-3:index-1")
);
}

private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) {
return new DefaultCheckpointProvider(
clock,
Expand Down