Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cross cluster support to _terms_enum #73975

Merged
merged 2 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.termsenum;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class CCSTermsEnumIT extends AbstractMultiClustersTestCase {

@Override
protected Collection<String> remoteClusterAlias() {
return Arrays.asList("remote_cluster");
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(LocalStateCompositeXPackPlugin.class);
return plugins;
}

public void testBasic() {
Settings indexSettings = Settings.builder().put("index.number_of_replicas", 0).build();
final Client localClient = client(LOCAL_CLUSTER);
final Client remoteClient = client("remote_cluster");
String localIndex = "local_test";
assertAcked(localClient.admin().indices().prepareCreate(localIndex).setSettings(indexSettings));
localClient.prepareIndex(localIndex, "_doc").setSource("foo", "foo").get();
localClient.prepareIndex(localIndex, "_doc").setSource("foo", "foobar").get();
localClient.admin().indices().prepareRefresh(localIndex).get();

String remoteIndex = "remote_test";
assertAcked(remoteClient.admin().indices().prepareCreate(remoteIndex).setSettings(indexSettings));
remoteClient.prepareIndex(remoteIndex, "_doc").setSource("foo", "bar").get();
remoteClient.prepareIndex(remoteIndex, "_doc").setSource("foo", "foobar").get();
remoteClient.prepareIndex(remoteIndex, "_doc").setSource("foo", "zar").get();
remoteClient.admin().indices().prepareRefresh(remoteIndex).get();

// _terms_enum on a remote cluster
TermsEnumRequest req = new TermsEnumRequest("remote_cluster:remote_test")
.field("foo.keyword");
TermsEnumResponse response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(3));
assertThat(response.getTerms().get(0), equalTo("bar"));
assertThat(response.getTerms().get(1), equalTo("foobar"));
assertThat(response.getTerms().get(2), equalTo("zar"));

// _terms_enum on mixed clusters (local + remote)
req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
.field("foo.keyword");
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(4));
assertThat(response.getTerms().get(0), equalTo("bar"));
assertThat(response.getTerms().get(1), equalTo("foo"));
assertThat(response.getTerms().get(2), equalTo("foobar"));
assertThat(response.getTerms().get(3), equalTo("zar"));

req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
.field("foo.keyword")
.searchAfter("foobar");
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(1));
assertThat(response.getTerms().get(0), equalTo("zar"));

req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
.field("foo.keyword")
.searchAfter("bar");
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(3));
assertThat(response.getTerms().get(0), equalTo("foo"));
assertThat(response.getTerms().get(1), equalTo("foobar"));
assertThat(response.getTerms().get(2), equalTo("zar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
Expand All @@ -20,7 +21,7 @@
import java.util.Set;

/**
* Internal terms enum request executed directly against a specific node, querying potentially many
* Internal terms enum request executed directly against a specific node, querying potentially many
* shards in one request
*/
public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {
Expand All @@ -36,12 +37,27 @@ public class NodeTermsEnumRequest extends TransportRequest implements IndicesReq
private final QueryBuilder indexFilter;
private Set<ShardId> shardIds;
private String nodeId;


public NodeTermsEnumRequest(final String nodeId,
final Set<ShardId> shardIds,
TermsEnumRequest request,
long taskStartTimeMillis) {
this.field = request.field();
this.string = request.string();
this.searchAfter = request.searchAfter();
this.caseInsensitive = request.caseInsensitive();
this.size = request.size();
this.timeout = request.timeout().getMillis();
this.taskStartedTimeMillis = taskStartTimeMillis;
this.indexFilter = request.indexFilter();
this.nodeId = nodeId;
this.shardIds = shardIds;
}

public NodeTermsEnumRequest(StreamInput in) throws IOException {
super(in);
field = in.readString();
string = in.readString();
string = in.readOptionalString();
searchAfter = in.readOptionalString();
caseInsensitive = in.readBoolean();
size = in.readVInt();
Expand All @@ -56,36 +72,47 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException {
}
}

public NodeTermsEnumRequest(final String nodeId, final Set<ShardId> shardIds, TermsEnumRequest request) {
this.field = request.field();
this.string = request.string();
this.searchAfter = request.searchAfter();
this.caseInsensitive = request.caseInsensitive();
this.size = request.size();
this.timeout = request.timeout().getMillis();
this.taskStartedTimeMillis = request.taskStartTimeMillis;
this.indexFilter = request.indexFilter();
this.nodeId = nodeId;
this.shardIds = shardIds;
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(field);
out.writeOptionalString(string);
out.writeOptionalString(searchAfter);
out.writeBoolean(caseInsensitive);
out.writeVInt(size);
// Adjust the amount of permitted time the shard has remaining to gather terms.
long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis;
long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode);
// TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
out.writeVLong(remainingTimeForShardToUse);
out.writeVLong(taskStartedTimeMillis);
out.writeOptionalNamedWriteable(indexFilter);
out.writeString(nodeId);
out.writeVInt(shardIds.size());
for (ShardId shardId : shardIds) {
shardId.writeTo(out);
}
}

public String field() {
return field;
}

@Nullable
public String string() {
return string;
}

@Nullable
public String searchAfter() {
return searchAfter;
}

public long taskStartedTimeMillis() {
return this.taskStartedTimeMillis;
}
/**

/**
* The time this request was materialized on a node
*/
long nodeStartedTimeMillis() {
Expand All @@ -94,12 +121,12 @@ long nodeStartedTimeMillis() {
nodeStartedTimeMillis = System.currentTimeMillis();
}
return this.nodeStartedTimeMillis;
}
}

public void startTimerOnDataNode() {
nodeStartedTimeMillis = System.currentTimeMillis();
}

public Set<ShardId> shardIds() {
return Collections.unmodifiableSet(shardIds);
}
Expand All @@ -119,28 +146,6 @@ public String nodeId() {
return nodeId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(field);
out.writeString(string);
out.writeOptionalString(searchAfter);
out.writeBoolean(caseInsensitive);
out.writeVInt(size);
// Adjust the amount of permitted time the shard has remaining to gather terms.
long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis;
long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode);
// TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
out.writeVLong(remainingTimeForShardToUse);
out.writeVLong(taskStartedTimeMillis);
out.writeOptionalNamedWriteable(indexFilter);
out.writeString(nodeId);
out.writeVInt(shardIds.size());
for (ShardId shardId : shardIds) {
shardId.writeTo(out);
}
}

public QueryBuilder indexFilter() {
return indexFilter;
}
Expand All @@ -162,5 +167,4 @@ public IndicesOptions indicesOptions() {
public boolean remove(ShardId shardId) {
return shardIds.remove(shardId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class NodeTermsEnumResponse extends TransportResponse {
this.complete = complete;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(terms);
out.writeOptionalString(error);
out.writeBoolean(complete);
out.writeString(nodeId);
}

public List<TermCount> terms() {
return this.terms;
}
Expand All @@ -52,17 +60,8 @@ public String getError() {
public String getNodeId() {
return nodeId;
}

public boolean getComplete() {
return complete;
}


@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(terms);
out.writeOptionalString(error);
out.writeBoolean(complete);
out.writeString(nodeId);
public boolean isComplete() {
return complete;
}
}
Loading