Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Refactore auto follow coordinator #35895

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The API returns the following results:
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"number_of_successful_follow_indices" : 1,
"recent_auto_follow_errors" : []
"recent_auto_follow_errors" : [],
"tracking_remote_clusters" : []
},
"follow_stats" : {
"indices" : [
Expand Down Expand Up @@ -151,6 +152,7 @@ The API returns the following results:
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
// TESTRESPONSE[s/"tracking_remote_clusters" : \[\]/"tracking_remote_clusters" : $body.auto_follow_stats.tracking_remote_clusters/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/qa/chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ followClusterTestCluster {
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\""
setting 'node.name', 'follow'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;

import java.util.Comparator;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ChainIT extends ESCCRRestTestCase {

public void testFollowIndex() throws Exception {
Expand Down Expand Up @@ -71,4 +79,68 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(putPatternRequest));

putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
assertOK(client().performRequest(putPatternRequest));

try (RestClient leaderClient = buildLeaderClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20190101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(leaderClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
}
}

try (RestClient middleClient = buildMiddleClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20200101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(middleClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
}
}

assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Map<?, ?> autoFollowStats = (Map<?, ?>) response.get("auto_follow_stats");
assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2));

@SuppressWarnings("unchecked")
List<Map<String, ?>> trackingRemoteClusters =
(List<Map<String, ?>>) autoFollowStats.get("tracking_remote_clusters");
trackingRemoteClusters.sort(Comparator.comparing(o -> ((String) o.get("cluster_name"))));
assertThat(trackingRemoteClusters.size(), equalTo(2));
assertThat(trackingRemoteClusters.get(0).get("cluster_name"), equalTo("leader_cluster"));
assertThat(trackingRemoteClusters.get(1).get("cluster_name"), equalTo("middle_cluster"));

ensureYellow("logs-20190101");
ensureYellow("logs-20200101");
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, System::nanoTime)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
leaderClusterStateResponse -> {
ClusterState leaderClusterState = leaderClusterStateResponse.getState();
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex));
Expand Down Expand Up @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
Expand Down Expand Up @@ -192,7 +193,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
Expand All @@ -204,7 +205,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
Expand All @@ -228,9 +229,7 @@ public void onFailure(final Exception e) {
* @param onFailure the failure consumer
* @param historyUUIDConsumer the leader index history uuid and consumer
*/
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
// in case of following a local or a remote cluster.
public void fetchLeaderHistoryUUIDs(
private void fetchLeaderHistoryUUIDs(
final Client remoteClient,
final IndexMetaData leaderIndexMetaData,
final Consumer<Exception> onFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -29,12 +28,6 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -43,8 +36,8 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
CCR_FOLLOWING_INDEX_SETTING
);
}

}
Loading