diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index b8491e8a60176..05c47ad45bfcf 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -105,7 +105,8 @@ The API returns the following results: "number_of_failed_follow_indices" : 0, "number_of_failed_remote_cluster_state_requests" : 0, "number_of_successful_follow_indices" : 1, - "recent_auto_follow_errors" : [] + "recent_auto_follow_errors" : [], + "tracking_remote_clusters" : [] }, "follow_stats" : { "indices" : [ @@ -151,6 +152,7 @@ The API returns the following results: // TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/] // TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/] // TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/] +// TESTRESPONSE[s/"tracking_remote_clusters" : \[\]/"tracking_remote_clusters" : $body.auto_follow_stats.tracking_remote_clusters/] // TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/] // TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/] // TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/] diff --git a/x-pack/plugin/ccr/qa/chain/build.gradle b/x-pack/plugin/ccr/qa/chain/build.gradle index f93feb4a66a1b..b9bf933d10fe1 100644 --- a/x-pack/plugin/ccr/qa/chain/build.gradle +++ b/x-pack/plugin/ccr/qa/chain/build.gradle @@ -46,6 +46,7 @@ followClusterTestCluster { numNodes = 1 clusterName = 'follow-cluster' setting 'xpack.license.self_generated.type', 'trial' + setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\"" setting 'node.name', 'follow' } diff --git a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java index e5a37aa829bbf..3fc2b34014430 100644 --- a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java +++ b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java @@ -6,9 +6,17 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.client.Request; import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + public class ChainIT extends ESCCRRestTestCase { public void testFollowIndex() throws Exception { @@ -71,4 +79,68 @@ public void testFollowIndex() throws Exception { } } + public void testAutoFollowPatterns() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + + putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + + try (RestClient leaderClient = buildLeaderClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20190101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(leaderClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true"); + } + } + + try (RestClient middleClient = buildMiddleClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20200101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(middleClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true"); + } + } + + assertBusy(() -> { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + Map autoFollowStats = (Map) response.get("auto_follow_stats"); + assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2)); + + @SuppressWarnings("unchecked") + List> trackingRemoteClusters = + (List>) autoFollowStats.get("tracking_remote_clusters"); + trackingRemoteClusters.sort(Comparator.comparing(o -> ((String) o.get("cluster_name")))); + assertThat(trackingRemoteClusters.size(), equalTo(2)); + assertThat(trackingRemoteClusters.get(0).get("cluster_name"), equalTo("leader_cluster")); + assertThat(trackingRemoteClusters.get(1).get("cluster_name"), equalTo("middle_cluster")); + + ensureYellow("logs-20190101"); + ensureYellow("logs-20200101"); + verifyDocuments("logs-20190101", 5, "filtered_field:true"); + verifyDocuments("logs-20200101", 5, "filtered_field:true"); + }); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ec196d637e1a9..e47697558a2ca 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -144,7 +144,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, System::nanoTime) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 3985b90a71b23..b5b14403cec44 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -121,7 +121,8 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client.getRemoteClusterClient(clusterAlias), request, onFailure, - leaderClusterState -> { + leaderClusterStateResponse -> { + ClusterState leaderClusterState = leaderClusterStateResponse.getState(); IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); if (leaderIndexMetaData == null) { onFailure.accept(new IndexNotFoundException(leaderIndex)); @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer) { + final Consumer leaderClusterStateConsumer) { checkRemoteClusterLicenseAndFetchClusterState( client, clusterAlias, @@ -192,7 +193,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( final Client remoteClient, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer, + final Consumer leaderClusterStateConsumer, final Function nonCompliantLicense, final Function unknownLicense) { // we have to check the license on the remote cluster @@ -204,7 +205,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { final ActionListener clusterStateListener = - ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); + ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata remoteClient.admin().cluster().state(request, clusterStateListener); } else { @@ -228,9 +229,7 @@ public void onFailure(final Exception e) { * @param onFailure the failure consumer * @param historyUUIDConsumer the leader index history uuid and consumer */ - // NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs - // in case of following a local or a remote cluster. - public void fetchLeaderHistoryUUIDs( + private void fetchLeaderHistoryUUIDs( final Client remoteClient, final IndexMetaData leaderIndexMetaData, final Consumer onFailure, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d3f5c85b4f83e..0ff81776b14e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,7 +7,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -29,12 +28,6 @@ private CcrSettings() { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); - /** - * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow - */ - public static final Setting CCR_AUTO_FOLLOW_POLL_INTERVAL = - Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope); - /** * The settings defined by CCR. * @@ -43,8 +36,8 @@ private CcrSettings() { static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING, - CCR_AUTO_FOLLOW_POLL_INTERVAL); + CCR_FOLLOWING_INDEX_SETTING + ); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 6323fb7f103db..d0f70c52ca526 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -12,25 +12,23 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; -import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; @@ -44,28 +42,31 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; /** * A component that runs only on the elected master node and follows leader indices automatically * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. */ -public class AutoFollowCoordinator implements ClusterStateApplier { +public class AutoFollowCoordinator implements ClusterStateListener { private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; - private final TimeValue pollInterval; - private final ThreadPool threadPool; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; + private final LongSupplier relativeTimeProvider; - private volatile boolean localNodeMaster = false; + private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: private long numberOfSuccessfulIndicesAutoFollowed = 0; @@ -74,19 +75,16 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private final LinkedHashMap recentAutoFollowErrors; public AutoFollowCoordinator( - Settings settings, - Client client, - ThreadPool threadPool, - ClusterService clusterService, - CcrLicenseChecker ccrLicenseChecker) { + Client client, + ClusterService clusterService, + CcrLicenseChecker ccrLicenseChecker, + LongSupplier relativeTimeProvider) { + this.client = client; - this.threadPool = threadPool; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); - - this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); - clusterService.addStateApplier(this); - + this.relativeTimeProvider = relativeTimeProvider; + clusterService.addListener(this); this.recentAutoFollowErrors = new LinkedHashMap() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { @@ -96,11 +94,25 @@ protected boolean removeEldestEntry(final Map.Entry autoFollowers = this.autoFollowers; + final TreeMap timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>(); + for (Map.Entry entry : autoFollowers.entrySet()) { + long timeSinceLastAutoFollow = entry.getValue().lastAutoFollowTime; + if (timeSinceLastAutoFollow != -1) { + long timeSinceLastAutoFollowInMillis = + TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - timeSinceLastAutoFollow); + timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), timeSinceLastAutoFollowInMillis); + } else { + timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), -1L); + } + } + return new AutoFollowStats( numberOfFailedIndicesAutoFollowed, numberOfFailedRemoteClusterStateRequests, numberOfSuccessfulIndicesAutoFollowed, - new TreeMap<>(recentAutoFollowErrors) + new TreeMap<>(recentAutoFollowErrors), + timesSinceLastAutoFollowPerRemoteCluster ); } @@ -129,150 +141,192 @@ synchronized void updateStats(List results) { } } - private void doAutoFollow() { - if (localNodeMaster == false) { - return; - } - ClusterState followerClusterState = clusterService.state(); + void updateAutoFollowers(ClusterState followerClusterState) { AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - return; - } - - if (autoFollowMetadata.getPatterns().isEmpty()) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } if (ccrLicenseChecker.isCcrAllowed() == false) { // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr")); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } - Consumer> handler = results -> { - updateStats(results); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - }; - AutoFollower operation = new AutoFollower(handler, followerClusterState) { + final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); + Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> autoFollowers.containsKey(entry.getValue().getRemoteCluster()) == false) + .map(entry -> entry.getValue().getRemoteCluster()) + .collect(Collectors.toSet()); + + Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); + for (String remoteCluster : newRemoteClusters) { + AutoFollower autoFollower = new AutoFollower(remoteCluster, relativeTimeProvider, this::updateStats, clusterService::state) { + + @Override + void getLeaderClusterState(final String remoteCluster, + final long metadataVersion, + final BiConsumer handler) { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + request.waitForMetaDataVersion(metadataVersion); + // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( + client, + remoteCluster, + request, + e -> handler.accept(null, e), + leaderClusterResponse -> handler.accept(leaderClusterResponse, null)); + } - @Override - void getLeaderClusterState(final String remoteCluster, - final BiConsumer handler) { - final ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.metaData(true); - // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( - client, - remoteCluster, - request, - e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); - } + @Override + void createAndFollow(Map headers, + PutFollowAction.Request request, + Runnable successHandler, + Consumer failureHandler) { + Client followerClient = CcrLicenseChecker.wrapClient(client, headers); + followerClient.execute( + PutFollowAction.INSTANCE, + request, + ActionListener.wrap(r -> successHandler.run(), failureHandler) + ); + } - @Override - void createAndFollow(Map headers, - PutFollowAction.Request request, - Runnable successHandler, - Consumer failureHandler) { - Client followerClient = CcrLicenseChecker.wrapClient(client, headers); - followerClient.execute( - PutFollowAction.INSTANCE, - request, - ActionListener.wrap(r -> successHandler.run(), failureHandler) - ); - } + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { - @Override - void updateAutoFollowMetadata(Function updateFunction, - Consumer handler) { - clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateFunction.apply(currentState); + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updateFunction.apply(currentState); - } + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } - @Override - public void onFailure(String source, Exception e) { - handler.accept(e); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + }); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - handler.accept(null); - } - }); - } + }; + newAutoFollowers.put(remoteCluster, autoFollower); + autoFollower.autoFollowIndices(); + } - }; - operation.autoFollowIndices(); + List removedRemoteClusters = new ArrayList<>(); + for (String remoteCluster : autoFollowers.keySet()) { + boolean exist = autoFollowMetadata.getPatterns().values().stream() + .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); + if (exist == false) { + removedRemoteClusters.add(remoteCluster); + } + } + this.autoFollowers = autoFollowers + .copyAndPutAll(newAutoFollowers) + .copyAndRemoveAll(removedRemoteClusters); } @Override - public void applyClusterState(ClusterChangedEvent event) { - final boolean beforeLocalMasterNode = localNodeMaster; - localNodeMaster = event.localNodeMaster(); - if (beforeLocalMasterNode == false && localNodeMaster) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + updateAutoFollowers(event.state()); } } abstract static class AutoFollower { - private final Consumer> handler; - private final ClusterState followerClusterState; - private final AutoFollowMetadata autoFollowMetadata; - - private final CountDown autoFollowPatternsCountDown; - private final AtomicArray autoFollowResults; - - AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { - this.handler = handler; - this.followerClusterState = followerClusterState; - this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); - this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); - this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); + private final String remoteCluster; + private final LongSupplier relativeTimeProvider; + private final Consumer> statsUpdater; + private final Supplier followerClusterStateSupplier; + + private volatile long lastAutoFollowTime = -1; + private volatile long metadataVersion = 0; + private volatile CountDown autoFollowPatternsCountDown; + private volatile AtomicArray autoFollowResults; + + AutoFollower(final String remoteCluster, + final LongSupplier relativeTimeProvider, + final Consumer> statsUpdater, + final Supplier followerClusterStateSupplier) { + this.remoteCluster = remoteCluster; + this.relativeTimeProvider = relativeTimeProvider; + this.statsUpdater = statsUpdater; + this.followerClusterStateSupplier = followerClusterStateSupplier; } void autoFollowIndices() { - int i = 0; - for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { - final int slot = i; - final String autoFollowPattenName = entry.getKey(); - final AutoFollowPattern autoFollowPattern = entry.getValue(); - final String remoteCluster = autoFollowPattern.getRemoteCluster(); - - Map headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName); - getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { - if (leaderClusterState != null) { - assert e == null; - final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName); - final List leaderIndicesToFollow = getLeaderIndicesToFollow(remoteCluster, autoFollowPattern, - leaderClusterState, followerClusterState, followedIndices); + lastAutoFollowTime = relativeTimeProvider.getAsLong(); + final ClusterState followerClusterState = followerClusterStateSupplier.get(); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster); + return; + } + + final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (patterns.isEmpty()) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); + return; + } + + this.autoFollowPatternsCountDown = new CountDown(patterns.size()); + this.autoFollowResults = new AtomicArray<>(patterns.size()); + + getLeaderClusterState(remoteCluster, metadataVersion + 1, (leaderClusterStateResponse, e) -> { + if (leaderClusterStateResponse != null) { + assert e == null; + if (leaderClusterStateResponse.isWaitForTimedOut()) { + autoFollowIndices(); + return; + } + + ClusterState leaderClusterState = leaderClusterStateResponse.getState(); + metadataVersion = leaderClusterState.getMetaData().version(); + + int i = 0; + for (String autoFollowPatternName : patterns) { + final int slot = i; + AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName); + Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); + + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, + followerClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { - finalise(slot, new AutoFollowResult(autoFollowPattenName)); + finalise(slot, new AutoFollowResult(autoFollowPatternName)); } else { List> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns() .entrySet().stream() - .filter(item -> autoFollowPattenName.equals(item.getKey()) == false) + .filter(item -> autoFollowPatternName.equals(item.getKey()) == false) .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster())) .map(item -> new Tuple<>(item.getKey(), item.getValue())) .collect(Collectors.toList()); Consumer resultHandler = result -> finalise(slot, result); - checkAutoFollowPattern(autoFollowPattenName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, + checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, patternsForTheSameLeaderCluster, resultHandler); } - } else { - finalise(slot, new AutoFollowResult(autoFollowPattenName, e)); + i++; } - }); - i++; - } + } else { + List results = new ArrayList<>(patterns.size()); + for (String autoFollowPatternName : patterns) { + results.add(new AutoFollowResult(autoFollowPatternName, e)); + } + statsUpdater.accept(results); + } + }); } private void checkAutoFollowPattern(String autoFollowPattenName, @@ -355,12 +409,12 @@ private void finalise(int slot, AutoFollowResult result) { assert autoFollowResults.get(slot) == null; autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(autoFollowResults.asList()); + statsUpdater.accept(autoFollowResults.asList()); + autoFollowIndices(); } } - static List getLeaderIndicesToFollow(String remoteCluster, - AutoFollowPattern autoFollowPattern, + static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, ClusterState leaderClusterState, ClusterState followerClusterState, List followedIndexUUIDs) { @@ -411,12 +465,14 @@ static Function recordLeaderIndexAsFollowFunction(St /** * Fetch the cluster state from the leader with the specified cluster alias - * @param remoteCluster the name of the leader cluster - * @param handler the callback to invoke + * @param remoteCluster the name of the leader cluster + * @param metadataVersion the last seen metadata version + * @param handler the callback to invoke */ abstract void getLeaderClusterState( String remoteCluster, - BiConsumer handler + long metadataVersion, + BiConsumer handler ); abstract void createAndFollow( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index a8194fc1f0fef..7b4a80bc4e4b9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -80,7 +81,8 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Consumer consumer = remoteClusterState -> { + Consumer consumer = remoteClusterStateResponse -> { + ClusterState remoteClusterState = remoteClusterStateResponse.getState(); String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { if (e == null) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 967c3a7e8c759..303ac40ca09b5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -140,54 +140,56 @@ public void onFailure(final Exception e) { } public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCompliantLicense() throws Exception { - // Update the cluster state so that we have auto follow patterns and verify that we log a warning in case of incompatible license: - CountDownLatch latch = new CountDownLatch(1); - ClusterService clusterService = getInstanceFromNode(ClusterService.class); - clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( - Collections.singletonMap("test_alias", autoFollowPattern), - Collections.emptyMap(), - Collections.emptyMap()); - - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata) - .build()); - return newState.build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Exception e) { - latch.countDown(); - fail("unexpected error [" + e.getMessage() + "]"); - } - }); - latch.await(); - final Logger logger = LogManager.getLogger(AutoFollowCoordinator.class); final MockLogAppender appender = new MockLogAppender(); appender.start(); appender.addExpectation( - new MockLogAppender.ExceptionSeenEventExpectation( - getTestName(), - logger.getName(), - Level.WARN, - "skipping auto-follower coordination", - ElasticsearchSecurityException.class, - "current license is non-compliant for [ccr]")); - Loggers.addAppender(logger, appender); + new MockLogAppender.ExceptionSeenEventExpectation( + getTestName(), + logger.getName(), + Level.WARN, + "skipping auto-follower coordination", + ElasticsearchSecurityException.class, + "current license is non-compliant for [ccr]")); + try { - assertBusy(appender::assertAllExpectationsMatched); + // Need to add mock log appender before submitting CS update, otherwise we miss the expected log: + Loggers.addAppender(logger, appender); + // Update the cluster state so that we have auto follow patterns and verify that we log a warning + // in case of incompatible license: + CountDownLatch latch = new CountDownLatch(1); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( + Collections.singletonMap("test_alias", autoFollowPattern), + Collections.emptyMap(), + Collections.emptyMap()); + + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata) + .build()); + return newState.build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + latch.countDown(); + fail("unexpected error [" + e.getMessage() + "]"); + } + }); + latch.await(); + appender.assertAllExpectationsMatched(); } finally { Loggers.removeAppender(logger, appender); appender.stop(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 1da58cc2703db..1e8588e4a7c1d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -29,14 +30,17 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.anyString; @@ -81,12 +85,13 @@ public void testAutoFollower() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower(handler, currentState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, followerClusterStateSupplier(currentState)) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { + long metadataVersion, + BiConsumer handler) { assertThat(remoteCluster, equalTo("remote")); - handler.accept(leaderState, null); + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -140,10 +145,11 @@ public void testAutoFollowerClusterStateApiFailure() { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, () -> followerState) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { + long metadataVersion, + BiConsumer handler) { handler.accept(null, failure); } @@ -200,11 +206,12 @@ public void testAutoFollowerUpdateClusterStateFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(leaderState, null); + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -262,11 +269,12 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(leaderState, null); + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -319,8 +327,7 @@ public void testGetLeaderIndicesToFollow() { .metaData(imdBuilder) .build(); - List result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, - Collections.emptyList()); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(3)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -328,7 +335,7 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(2).getName(), equalTo("metrics-4")); List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(2)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -351,12 +358,10 @@ public void testGetFollowerIndexName() { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( - Settings.EMPTY, - null, null, mock(ClusterService.class), - new CcrLicenseChecker(() -> true, () -> false) - ); + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); autoFollowCoordinator.updateStats(Collections.singletonList( new AutoFollowCoordinator.AutoFollowResult("_alias1")) @@ -408,4 +413,114 @@ public void testStats() { assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); } + public void testUpdateAutoFollowers() { + ClusterService clusterService = mock(ClusterService.class); + // Return a cluster state with no patterns so that the auto followers never really execute: + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build(); + when(clusterService.state()).thenReturn(followerState); + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + clusterService, + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + + // Add 3 patterns: + Map patterns = new HashMap<>(); + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null, + null, null, null, null, null, null, null, null)); + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue()); + + // Remove patterns 1 and 3: + patterns.remove("pattern1"); + patterns.remove("pattern3"); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue()); + + // Add pattern 4: + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null, + null, null, null, null, null, null, null, null)); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue()); + + // Remove patterns 2 and 4: + patterns.remove("pattern2"); + patterns.remove("pattern4"); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0)); + } + + public void testUpdateAutoFollowersNoPatterns() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0)); + } + + public void testUpdateAutoFollowersNoAutoFollowMetadata() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0)); + } + + private static Supplier followerClusterStateSupplier(ClusterState... states) { + final AutoFollowMetadata emptyAutoFollowMetadata = + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + final ClusterState lastState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, emptyAutoFollowMetadata)) + .build(); + + final LinkedList queue = new LinkedList<>(Arrays.asList(states)); + return () -> { + final ClusterState current = queue.poll(); + if (current != null) { + return current; + } else { + return lastState; + } + }; + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java index c651cca5b6a71..ba422426e525c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters; import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions; import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse; @@ -27,8 +28,8 @@ protected CcrStatsAction.Response createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomReadExceptions() - ); + randomReadExceptions(), + randomTrackingClusters()); FollowStatsAction.StatsResponses statsResponse = createStatsResponse(); return new CcrStatsAction.Response(autoFollowStats, statsResponse); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java index c4a61529f49a8..58b4cb2b1af78 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java @@ -34,8 +34,8 @@ protected AutoFollowStats createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomReadExceptions() - ); + randomReadExceptions(), + randomTrackingClusters()); } static NavigableMap randomReadExceptions() { @@ -47,6 +47,15 @@ static NavigableMap randomReadExceptions() { return readExceptions; } + static NavigableMap randomTrackingClusters() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put("" + i, randomLong()); + } + return readExceptions; + } + @Override protected Writeable.Reader instanceReader() { return AutoFollowStats::new; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java index ce1c0136677f6..c85decf116648 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java @@ -41,7 +41,7 @@ public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase @Before public void instantiateAutoFollowStats() { autoFollowStats = new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - Collections.emptyNavigableMap()); + Collections.emptyNavigableMap(), Collections.emptyNavigableMap()); } @Override @@ -74,8 +74,14 @@ public void testToXContent() throws IOException { new TreeMap<>(Collections.singletonMap( randomAlphaOfLength(4), new ElasticsearchException("cannot follow index"))); + + final NavigableMap trackingClusters = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + 1L)); final AutoFollowStats autoFollowStats = - new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions); + new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions, + trackingClusters); final AutoFollowStatsMonitoringDoc document = new AutoFollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, autoFollowStats); @@ -109,6 +115,12 @@ public void testToXContent() throws IOException { + "\"reason\":\"cannot follow index\"" + "}" + "}" + + "]," + + "\"tracking_remote_clusters\":[" + + "{" + + "\"cluster_name\":\"" + trackingClusters.keySet().iterator().next() + "\"," + + "\"time_since_last_auto_follow_started_millis\":" + trackingClusters.values().iterator().next() + + "}" + "]" + "}" + "}")); @@ -117,7 +129,11 @@ public void testToXContent() throws IOException { public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { final NavigableMap fetchExceptions = new TreeMap<>(Collections.singletonMap("leader_index", new ElasticsearchException("cannot follow index"))); - final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions); + final NavigableMap trackingClusters = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + 1L)); + final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions, trackingClusters); XContentBuilder builder = jsonBuilder(); builder.value(status); Map serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false); @@ -154,6 +170,12 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { assertThat(exceptionFieldMapping.size(), equalTo(2)); assertThat(XContentMapValues.extractValue("type.type", exceptionFieldMapping), equalTo("keyword")); assertThat(XContentMapValues.extractValue("reason.type", exceptionFieldMapping), equalTo("text")); + } else if (fieldName.equals("tracking_remote_clusters")) { + assertThat(fieldType, equalTo("nested")); + assertThat(((Map) fieldMapping.get("properties")).size(), equalTo(2)); + assertThat(XContentMapValues.extractValue("properties.cluster_name.type", fieldMapping), equalTo("keyword")); + assertThat(XContentMapValues.extractValue("properties.time_since_last_auto_follow_started_millis.type", fieldMapping), + equalTo("long")); } else { fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]"); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index 6f28c450f0473..77784cb97a680 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -33,6 +33,10 @@ public class AutoFollowStats implements Writeable, ToXContentObject { private static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); private static final ParseField LEADER_INDEX = new ParseField("leader_index"); private static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); + private static final ParseField TRACKING_REMOTE_CLUSTERS = new ParseField("tracking_remote_clusters"); + private static final ParseField CLUSTER_NAME = new ParseField("cluster_name"); + private static final ParseField TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS = + new ParseField("time_since_last_auto_follow_started_millis"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", @@ -43,26 +47,39 @@ public class AutoFollowStats implements Writeable, ToXContentObject { new TreeMap<>( ((List>) args[3]) .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) - )); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), + new TreeMap<>( + ((List>) args[3]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); private static final ConstructingObjectParser, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER = new ConstructingObjectParser<>( "auto_follow_stats_errors", args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1])); + private static final ConstructingObjectParser, Void> TRACKING_REMOTE_CLUSTERS_PARSER = + new ConstructingObjectParser<>( + "tracking_remote_clusters", + args -> new AbstractMap.SimpleEntry<>((String) args[0], (Long) args[1])); + static { AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject( ConstructingObjectParser.constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), AUTO_FOLLOW_EXCEPTION); + TRACKING_REMOTE_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME); + TRACKING_REMOTE_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), + TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, RECENT_AUTO_FOLLOW_ERRORS); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), TRACKING_REMOTE_CLUSTERS_PARSER, + TRACKING_REMOTE_CLUSTERS); } public static AutoFollowStats fromXContent(final XContentParser parser) { @@ -73,24 +90,28 @@ public static AutoFollowStats fromXContent(final XContentParser parser) { private final long numberOfFailedRemoteClusterStateRequests; private final long numberOfSuccessfulFollowIndices; private final NavigableMap recentAutoFollowErrors; + private final NavigableMap trackingRemoteClusters; public AutoFollowStats( - long numberOfFailedFollowIndices, - long numberOfFailedRemoteClusterStateRequests, - long numberOfSuccessfulFollowIndices, - NavigableMap recentAutoFollowErrors + long numberOfFailedFollowIndices, + long numberOfFailedRemoteClusterStateRequests, + long numberOfSuccessfulFollowIndices, + NavigableMap recentAutoFollowErrors, + NavigableMap trackingRemoteClusters ) { this.numberOfFailedFollowIndices = numberOfFailedFollowIndices; this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices; this.recentAutoFollowErrors = recentAutoFollowErrors; + this.trackingRemoteClusters = trackingRemoteClusters; } public AutoFollowStats(StreamInput in) throws IOException { numberOfFailedFollowIndices = in.readVLong(); numberOfFailedRemoteClusterStateRequests = in.readVLong(); numberOfSuccessfulFollowIndices = in.readVLong(); - recentAutoFollowErrors= new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + recentAutoFollowErrors = new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + trackingRemoteClusters = new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readZLong)); } @Override @@ -99,6 +120,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(numberOfFailedRemoteClusterStateRequests); out.writeVLong(numberOfSuccessfulFollowIndices); out.writeMap(recentAutoFollowErrors, StreamOutput::writeString, StreamOutput::writeException); + out.writeMap(trackingRemoteClusters, StreamOutput::writeString, StreamOutput::writeZLong); } public long getNumberOfFailedFollowIndices() { @@ -117,6 +139,10 @@ public NavigableMap getRecentAutoFollowErrors() return recentAutoFollowErrors; } + public NavigableMap getTrackingRemoteClusters() { + return trackingRemoteClusters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -148,6 +174,18 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P } } builder.endArray(); + builder.startArray(TRACKING_REMOTE_CLUSTERS.getPreferredName()); + { + for (final Map.Entry entry : trackingRemoteClusters.entrySet()) { + builder.startObject(); + { + builder.field(CLUSTER_NAME.getPreferredName(), entry.getKey()); + builder.field(TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS.getPreferredName(), entry.getValue()); + } + builder.endObject(); + } + } + builder.endArray(); return builder; } @@ -165,7 +203,8 @@ public boolean equals(Object o) { * keys. */ recentAutoFollowErrors.keySet().equals(that.recentAutoFollowErrors.keySet()) && - getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)); + getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) && + Objects.equals(trackingRemoteClusters, that.trackingRemoteClusters); } @Override @@ -179,7 +218,8 @@ public int hashCode() { * messages. Note that we are relying on the fact that the auto follow exceptions are ordered by keys. */ recentAutoFollowErrors.keySet(), - getFetchExceptionMessages(this) + getFetchExceptionMessages(this), + trackingRemoteClusters ); } @@ -194,6 +234,7 @@ public String toString() { ", numberOfFailedRemoteClusterStateRequests=" + numberOfFailedRemoteClusterStateRequests + ", numberOfSuccessfulFollowIndices=" + numberOfSuccessfulFollowIndices + ", recentAutoFollowErrors=" + recentAutoFollowErrors + + ", trackingRemoteClusters=" + trackingRemoteClusters + '}'; } } diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 1e6d3ec892af7..e20f52719d462 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -1060,6 +1060,17 @@ } } } + }, + "tracking_remote_clusters": { + "type": "nested", + "properties": { + "cluster_name": { + "type": "keyword" + }, + "time_since_last_auto_follow_started_millis": { + "type": "long" + } + } } } }