From a745200e7186ee3bc311f15eb501587a26ee6a12 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Nov 2018 09:14:47 +0100 Subject: [PATCH 1/5] [CCR] Refactored auto follow coordinator * AutofollowCoordinator now fetches cluster states per remote cluster. * AutofollowCoordinator makes use of the `wait_for_metadata_version` feature in cluster state API. * Removed the poll_interval setting, because it is no longer needed. Waiting is done via cluster state api for changes in remote clusters and auto follow pattern changes in the local cluster are immediately processed. * Improved auto follow stats to keep track of last time the remote clusters were checked for changes per remote cluster. Relates to #33007 --- x-pack/plugin/ccr/qa/chain/build.gradle | 1 + .../org/elasticsearch/xpack/ccr/ChainIT.java | 72 ++++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../xpack/ccr/CcrLicenseChecker.java | 13 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 11 +- .../ccr/action/AutoFollowCoordinator.java | 314 +++++++++++------- .../TransportPutAutoFollowPatternAction.java | 4 +- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 88 ++--- .../action/AutoFollowCoordinatorTests.java | 151 ++++++++- .../action/AutoFollowStatsResponseTests.java | 5 +- .../ccr/action/AutoFollowStatsTests.java | 13 +- .../AutoFollowStatsMonitoringDocTests.java | 28 +- .../xpack/core/ccr/AutoFollowStats.java | 58 +++- .../src/main/resources/monitoring-es.json | 11 + 14 files changed, 547 insertions(+), 224 deletions(-) diff --git a/x-pack/plugin/ccr/qa/chain/build.gradle b/x-pack/plugin/ccr/qa/chain/build.gradle index f93feb4a66a1b..b9bf933d10fe1 100644 --- a/x-pack/plugin/ccr/qa/chain/build.gradle +++ b/x-pack/plugin/ccr/qa/chain/build.gradle @@ -46,6 +46,7 @@ followClusterTestCluster { numNodes = 1 clusterName = 'follow-cluster' setting 'xpack.license.self_generated.type', 'trial' + setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\"" setting 'node.name', 'follow' } diff --git a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java index e5a37aa829bbf..3fc2b34014430 100644 --- a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java +++ b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java @@ -6,9 +6,17 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.client.Request; import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + public class ChainIT extends ESCCRRestTestCase { public void testFollowIndex() throws Exception { @@ -71,4 +79,68 @@ public void testFollowIndex() throws Exception { } } + public void testAutoFollowPatterns() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + + Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + + putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + + try (RestClient leaderClient = buildLeaderClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20190101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(leaderClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true"); + } + } + + try (RestClient middleClient = buildMiddleClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20200101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(middleClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true"); + } + } + + assertBusy(() -> { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + Map autoFollowStats = (Map) response.get("auto_follow_stats"); + assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2)); + + @SuppressWarnings("unchecked") + List> trackingRemoteClusters = + (List>) autoFollowStats.get("tracking_remote_clusters"); + trackingRemoteClusters.sort(Comparator.comparing(o -> ((String) o.get("cluster_name")))); + assertThat(trackingRemoteClusters.size(), equalTo(2)); + assertThat(trackingRemoteClusters.get(0).get("cluster_name"), equalTo("leader_cluster")); + assertThat(trackingRemoteClusters.get(1).get("cluster_name"), equalTo("middle_cluster")); + + ensureYellow("logs-20190101"); + ensureYellow("logs-20200101"); + verifyDocuments("logs-20190101", 5, "filtered_field:true"); + verifyDocuments("logs-20200101", 5, "filtered_field:true"); + }); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ec196d637e1a9..e47697558a2ca 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -144,7 +144,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, System::nanoTime) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 3985b90a71b23..b5b14403cec44 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -121,7 +121,8 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client.getRemoteClusterClient(clusterAlias), request, onFailure, - leaderClusterState -> { + leaderClusterStateResponse -> { + ClusterState leaderClusterState = leaderClusterStateResponse.getState(); IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); if (leaderIndexMetaData == null) { onFailure.accept(new IndexNotFoundException(leaderIndex)); @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer) { + final Consumer leaderClusterStateConsumer) { checkRemoteClusterLicenseAndFetchClusterState( client, clusterAlias, @@ -192,7 +193,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( final Client remoteClient, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer, + final Consumer leaderClusterStateConsumer, final Function nonCompliantLicense, final Function unknownLicense) { // we have to check the license on the remote cluster @@ -204,7 +205,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { final ActionListener clusterStateListener = - ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); + ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata remoteClient.admin().cluster().state(request, clusterStateListener); } else { @@ -228,9 +229,7 @@ public void onFailure(final Exception e) { * @param onFailure the failure consumer * @param historyUUIDConsumer the leader index history uuid and consumer */ - // NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs - // in case of following a local or a remote cluster. - public void fetchLeaderHistoryUUIDs( + private void fetchLeaderHistoryUUIDs( final Client remoteClient, final IndexMetaData leaderIndexMetaData, final Consumer onFailure, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d3f5c85b4f83e..0ff81776b14e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,7 +7,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -29,12 +28,6 @@ private CcrSettings() { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); - /** - * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow - */ - public static final Setting CCR_AUTO_FOLLOW_POLL_INTERVAL = - Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope); - /** * The settings defined by CCR. * @@ -43,8 +36,8 @@ private CcrSettings() { static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING, - CCR_AUTO_FOLLOW_POLL_INTERVAL); + CCR_FOLLOWING_INDEX_SETTING + ); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 6323fb7f103db..731f86958e752 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -12,25 +12,23 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; -import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; @@ -44,28 +42,31 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; /** * A component that runs only on the elected master node and follows leader indices automatically * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. */ -public class AutoFollowCoordinator implements ClusterStateApplier { +public class AutoFollowCoordinator implements ClusterStateListener { private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; - private final TimeValue pollInterval; - private final ThreadPool threadPool; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; + private final LongSupplier relativeTimeProvider; - private volatile boolean localNodeMaster = false; + private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: private long numberOfSuccessfulIndicesAutoFollowed = 0; @@ -74,19 +75,16 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private final LinkedHashMap recentAutoFollowErrors; public AutoFollowCoordinator( - Settings settings, - Client client, - ThreadPool threadPool, - ClusterService clusterService, - CcrLicenseChecker ccrLicenseChecker) { + Client client, + ClusterService clusterService, + CcrLicenseChecker ccrLicenseChecker, + LongSupplier relativeTimeProvider) { + this.client = client; - this.threadPool = threadPool; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); - - this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); - clusterService.addStateApplier(this); - + this.relativeTimeProvider = relativeTimeProvider; + clusterService.addListener(this); this.recentAutoFollowErrors = new LinkedHashMap() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { @@ -96,11 +94,25 @@ protected boolean removeEldestEntry(final Map.Entry autoFollowers = this.autoFollowers; + final TreeMap timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>(); + for (Map.Entry entry : autoFollowers.entrySet()) { + long timeSinceLastAutoFollow = entry.getValue().lastAutoFollowTime; + if (timeSinceLastAutoFollow != -1) { + long timeSinceLastAutoFollowInMillis = + TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - timeSinceLastAutoFollow); + timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), timeSinceLastAutoFollowInMillis); + } else { + timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), -1L); + } + } + return new AutoFollowStats( numberOfFailedIndicesAutoFollowed, numberOfFailedRemoteClusterStateRequests, numberOfSuccessfulIndicesAutoFollowed, - new TreeMap<>(recentAutoFollowErrors) + new TreeMap<>(recentAutoFollowErrors), + timesSinceLastAutoFollowPerRemoteCluster ); } @@ -129,150 +141,192 @@ synchronized void updateStats(List results) { } } - private void doAutoFollow() { - if (localNodeMaster == false) { - return; - } - ClusterState followerClusterState = clusterService.state(); + void updateAutoFollowers(ClusterState followerClusterState) { AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - return; - } - - if (autoFollowMetadata.getPatterns().isEmpty()) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } if (ccrLicenseChecker.isCcrAllowed() == false) { // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr")); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } - Consumer> handler = results -> { - updateStats(results); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - }; - AutoFollower operation = new AutoFollower(handler, followerClusterState) { + final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); + Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> autoFollowers.containsKey(entry.getValue().getRemoteCluster()) == false) + .map(entry -> entry.getValue().getRemoteCluster()) + .collect(Collectors.toSet()); + + Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); + for (String remoteCluster : newRemoteClusters) { + AutoFollower autoFollower = new AutoFollower(remoteCluster, relativeTimeProvider, this::updateStats, clusterService::state) { + + @Override + void getLeaderClusterState(final String remoteCluster, + final long metadataVersion, + final BiConsumer handler) { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + request.waitForMetaDataVersion(metadataVersion); + // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( + client, + remoteCluster, + request, + e -> handler.accept(null, e), + leaderClusterResponse -> handler.accept(leaderClusterResponse, null)); + } - @Override - void getLeaderClusterState(final String remoteCluster, - final BiConsumer handler) { - final ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.metaData(true); - // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( - client, - remoteCluster, - request, - e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); - } + @Override + void createAndFollow(Map headers, + PutFollowAction.Request request, + Runnable successHandler, + Consumer failureHandler) { + Client followerClient = CcrLicenseChecker.wrapClient(client, headers); + followerClient.execute( + PutFollowAction.INSTANCE, + request, + ActionListener.wrap(r -> successHandler.run(), failureHandler) + ); + } - @Override - void createAndFollow(Map headers, - PutFollowAction.Request request, - Runnable successHandler, - Consumer failureHandler) { - Client followerClient = CcrLicenseChecker.wrapClient(client, headers); - followerClient.execute( - PutFollowAction.INSTANCE, - request, - ActionListener.wrap(r -> successHandler.run(), failureHandler) - ); - } + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { - @Override - void updateAutoFollowMetadata(Function updateFunction, - Consumer handler) { - clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateFunction.apply(currentState); + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updateFunction.apply(currentState); - } + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } - @Override - public void onFailure(String source, Exception e) { - handler.accept(e); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + }); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - handler.accept(null); - } - }); - } + }; + newAutoFollowers.put(remoteCluster, autoFollower); + autoFollower.autoFollowIndices(); + } - }; - operation.autoFollowIndices(); + List removedRemoteClusters = new ArrayList<>(); + for (String remoteCluster : autoFollowers.keySet()) { + boolean exist = autoFollowMetadata.getPatterns().values().stream() + .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); + if (exist == false) { + removedRemoteClusters.add(remoteCluster); + } + } + this.autoFollowers = autoFollowers + .copyAndPutAll(newAutoFollowers) + .copyAndRemoveAll(removedRemoteClusters); } @Override - public void applyClusterState(ClusterChangedEvent event) { - final boolean beforeLocalMasterNode = localNodeMaster; - localNodeMaster = event.localNodeMaster(); - if (beforeLocalMasterNode == false && localNodeMaster) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + updateAutoFollowers(event.state()); } } abstract static class AutoFollower { - private final Consumer> handler; - private final ClusterState followerClusterState; - private final AutoFollowMetadata autoFollowMetadata; - - private final CountDown autoFollowPatternsCountDown; - private final AtomicArray autoFollowResults; - - AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { - this.handler = handler; - this.followerClusterState = followerClusterState; - this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); - this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); - this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); + private final String remoteCluster; + private final LongSupplier relativeTimeProvider; + private final Consumer> statsUpdater; + private final Supplier followerClusterStateSupplier; + + private volatile long lastAutoFollowTime = -1; + private volatile long metadataVersion = -1; + private volatile CountDown autoFollowPatternsCountDown; + private volatile AtomicArray autoFollowResults; + + AutoFollower(final String remoteCluster, + final LongSupplier relativeTimeProvider, + final Consumer> statsUpdater, + final Supplier followerClusterStateSupplier) { + this.remoteCluster = remoteCluster; + this.relativeTimeProvider = relativeTimeProvider; + this.statsUpdater = statsUpdater; + this.followerClusterStateSupplier = followerClusterStateSupplier; } void autoFollowIndices() { - int i = 0; - for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { - final int slot = i; - final String autoFollowPattenName = entry.getKey(); - final AutoFollowPattern autoFollowPattern = entry.getValue(); - final String remoteCluster = autoFollowPattern.getRemoteCluster(); - - Map headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName); - getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { - if (leaderClusterState != null) { - assert e == null; - final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName); - final List leaderIndicesToFollow = getLeaderIndicesToFollow(remoteCluster, autoFollowPattern, - leaderClusterState, followerClusterState, followedIndices); + lastAutoFollowTime = relativeTimeProvider.getAsLong(); + final ClusterState followerClusterState = followerClusterStateSupplier.get(); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster); + return; + } + + final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (patterns.isEmpty()) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); + return; + } + + this.autoFollowPatternsCountDown = new CountDown(patterns.size()); + this.autoFollowResults = new AtomicArray<>(patterns.size()); + + getLeaderClusterState(remoteCluster, metadataVersion + 1, (leaderClusterStateResponse, e) -> { + if (leaderClusterStateResponse != null) { + assert e == null; + if (leaderClusterStateResponse.isWaitForTimedOut()) { + autoFollowIndices(); + return; + } + + ClusterState leaderClusterState = leaderClusterStateResponse.getState(); + metadataVersion = leaderClusterState.getMetaData().version(); + + int i = 0; + for (String autoFollowPatternName : patterns) { + final int slot = i; + AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName); + Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); + + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, + followerClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { - finalise(slot, new AutoFollowResult(autoFollowPattenName)); + finalise(slot, new AutoFollowResult(autoFollowPatternName)); } else { List> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns() .entrySet().stream() - .filter(item -> autoFollowPattenName.equals(item.getKey()) == false) + .filter(item -> autoFollowPatternName.equals(item.getKey()) == false) .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster())) .map(item -> new Tuple<>(item.getKey(), item.getValue())) .collect(Collectors.toList()); Consumer resultHandler = result -> finalise(slot, result); - checkAutoFollowPattern(autoFollowPattenName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, + checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, patternsForTheSameLeaderCluster, resultHandler); } - } else { - finalise(slot, new AutoFollowResult(autoFollowPattenName, e)); + i++; } - }); - i++; - } + } else { + List results = new ArrayList<>(patterns.size()); + for (String autoFollowPatternName : patterns) { + results.add(new AutoFollowResult(autoFollowPatternName, e)); + } + statsUpdater.accept(results); + } + }); } private void checkAutoFollowPattern(String autoFollowPattenName, @@ -355,12 +409,12 @@ private void finalise(int slot, AutoFollowResult result) { assert autoFollowResults.get(slot) == null; autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(autoFollowResults.asList()); + statsUpdater.accept(autoFollowResults.asList()); + autoFollowIndices(); } } - static List getLeaderIndicesToFollow(String remoteCluster, - AutoFollowPattern autoFollowPattern, + static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, ClusterState leaderClusterState, ClusterState followerClusterState, List followedIndexUUIDs) { @@ -411,12 +465,14 @@ static Function recordLeaderIndexAsFollowFunction(St /** * Fetch the cluster state from the leader with the specified cluster alias - * @param remoteCluster the name of the leader cluster - * @param handler the callback to invoke + * @param remoteCluster the name of the leader cluster + * @param metadataVersion the last seen metadata version + * @param handler the callback to invoke */ abstract void getLeaderClusterState( String remoteCluster, - BiConsumer handler + long metadataVersion, + BiConsumer handler ); abstract void createAndFollow( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index a8194fc1f0fef..7b4a80bc4e4b9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -80,7 +81,8 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Consumer consumer = remoteClusterState -> { + Consumer consumer = remoteClusterStateResponse -> { + ClusterState remoteClusterState = remoteClusterStateResponse.getState(); String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { if (e == null) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 967c3a7e8c759..303ac40ca09b5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -140,54 +140,56 @@ public void onFailure(final Exception e) { } public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCompliantLicense() throws Exception { - // Update the cluster state so that we have auto follow patterns and verify that we log a warning in case of incompatible license: - CountDownLatch latch = new CountDownLatch(1); - ClusterService clusterService = getInstanceFromNode(ClusterService.class); - clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( - Collections.singletonMap("test_alias", autoFollowPattern), - Collections.emptyMap(), - Collections.emptyMap()); - - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata) - .build()); - return newState.build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Exception e) { - latch.countDown(); - fail("unexpected error [" + e.getMessage() + "]"); - } - }); - latch.await(); - final Logger logger = LogManager.getLogger(AutoFollowCoordinator.class); final MockLogAppender appender = new MockLogAppender(); appender.start(); appender.addExpectation( - new MockLogAppender.ExceptionSeenEventExpectation( - getTestName(), - logger.getName(), - Level.WARN, - "skipping auto-follower coordination", - ElasticsearchSecurityException.class, - "current license is non-compliant for [ccr]")); - Loggers.addAppender(logger, appender); + new MockLogAppender.ExceptionSeenEventExpectation( + getTestName(), + logger.getName(), + Level.WARN, + "skipping auto-follower coordination", + ElasticsearchSecurityException.class, + "current license is non-compliant for [ccr]")); + try { - assertBusy(appender::assertAllExpectationsMatched); + // Need to add mock log appender before submitting CS update, otherwise we miss the expected log: + Loggers.addAppender(logger, appender); + // Update the cluster state so that we have auto follow patterns and verify that we log a warning + // in case of incompatible license: + CountDownLatch latch = new CountDownLatch(1); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( + Collections.singletonMap("test_alias", autoFollowPattern), + Collections.emptyMap(), + Collections.emptyMap()); + + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata) + .build()); + return newState.build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + latch.countDown(); + fail("unexpected error [" + e.getMessage() + "]"); + } + }); + latch.await(); + appender.assertAllExpectationsMatched(); } finally { Loggers.removeAppender(logger, appender); appender.stop(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 1da58cc2703db..1e8588e4a7c1d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -29,14 +30,17 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.anyString; @@ -81,12 +85,13 @@ public void testAutoFollower() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower(handler, currentState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, followerClusterStateSupplier(currentState)) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { + long metadataVersion, + BiConsumer handler) { assertThat(remoteCluster, equalTo("remote")); - handler.accept(leaderState, null); + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -140,10 +145,11 @@ public void testAutoFollowerClusterStateApiFailure() { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, () -> followerState) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { + long metadataVersion, + BiConsumer handler) { handler.accept(null, failure); } @@ -200,11 +206,12 @@ public void testAutoFollowerUpdateClusterStateFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(leaderState, null); + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -262,11 +269,12 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", () -> 1L, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(leaderState, null); + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderState, 1L, false), null); } @Override @@ -319,8 +327,7 @@ public void testGetLeaderIndicesToFollow() { .metaData(imdBuilder) .build(); - List result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, - Collections.emptyList()); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(3)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -328,7 +335,7 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(2).getName(), equalTo("metrics-4")); List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(2)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -351,12 +358,10 @@ public void testGetFollowerIndexName() { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( - Settings.EMPTY, - null, null, mock(ClusterService.class), - new CcrLicenseChecker(() -> true, () -> false) - ); + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); autoFollowCoordinator.updateStats(Collections.singletonList( new AutoFollowCoordinator.AutoFollowResult("_alias1")) @@ -408,4 +413,114 @@ public void testStats() { assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); } + public void testUpdateAutoFollowers() { + ClusterService clusterService = mock(ClusterService.class); + // Return a cluster state with no patterns so that the auto followers never really execute: + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build(); + when(clusterService.state()).thenReturn(followerState); + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + clusterService, + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + + // Add 3 patterns: + Map patterns = new HashMap<>(); + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null, + null, null, null, null, null, null, null, null)); + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue()); + + // Remove patterns 1 and 3: + patterns.remove("pattern1"); + patterns.remove("pattern3"); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue()); + + // Add pattern 4: + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null, + null, null, null, null, null, null, null, null)); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue()); + + // Remove patterns 2 and 4: + patterns.remove("pattern2"); + patterns.remove("pattern4"); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0)); + } + + public void testUpdateAutoFollowersNoPatterns() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0)); + } + + public void testUpdateAutoFollowersNoAutoFollowMetadata() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0)); + } + + private static Supplier followerClusterStateSupplier(ClusterState... states) { + final AutoFollowMetadata emptyAutoFollowMetadata = + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + final ClusterState lastState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, emptyAutoFollowMetadata)) + .build(); + + final LinkedList queue = new LinkedList<>(Arrays.asList(states)); + return () -> { + final ClusterState current = queue.poll(); + if (current != null) { + return current; + } else { + return lastState; + } + }; + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java index c651cca5b6a71..ba422426e525c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters; import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions; import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse; @@ -27,8 +28,8 @@ protected CcrStatsAction.Response createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomReadExceptions() - ); + randomReadExceptions(), + randomTrackingClusters()); FollowStatsAction.StatsResponses statsResponse = createStatsResponse(); return new CcrStatsAction.Response(autoFollowStats, statsResponse); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java index c4a61529f49a8..58b4cb2b1af78 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java @@ -34,8 +34,8 @@ protected AutoFollowStats createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomReadExceptions() - ); + randomReadExceptions(), + randomTrackingClusters()); } static NavigableMap randomReadExceptions() { @@ -47,6 +47,15 @@ static NavigableMap randomReadExceptions() { return readExceptions; } + static NavigableMap randomTrackingClusters() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put("" + i, randomLong()); + } + return readExceptions; + } + @Override protected Writeable.Reader instanceReader() { return AutoFollowStats::new; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java index ce1c0136677f6..c85decf116648 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/AutoFollowStatsMonitoringDocTests.java @@ -41,7 +41,7 @@ public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase @Before public void instantiateAutoFollowStats() { autoFollowStats = new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - Collections.emptyNavigableMap()); + Collections.emptyNavigableMap(), Collections.emptyNavigableMap()); } @Override @@ -74,8 +74,14 @@ public void testToXContent() throws IOException { new TreeMap<>(Collections.singletonMap( randomAlphaOfLength(4), new ElasticsearchException("cannot follow index"))); + + final NavigableMap trackingClusters = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + 1L)); final AutoFollowStats autoFollowStats = - new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions); + new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions, + trackingClusters); final AutoFollowStatsMonitoringDoc document = new AutoFollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, autoFollowStats); @@ -109,6 +115,12 @@ public void testToXContent() throws IOException { + "\"reason\":\"cannot follow index\"" + "}" + "}" + + "]," + + "\"tracking_remote_clusters\":[" + + "{" + + "\"cluster_name\":\"" + trackingClusters.keySet().iterator().next() + "\"," + + "\"time_since_last_auto_follow_started_millis\":" + trackingClusters.values().iterator().next() + + "}" + "]" + "}" + "}")); @@ -117,7 +129,11 @@ public void testToXContent() throws IOException { public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { final NavigableMap fetchExceptions = new TreeMap<>(Collections.singletonMap("leader_index", new ElasticsearchException("cannot follow index"))); - final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions); + final NavigableMap trackingClusters = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + 1L)); + final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions, trackingClusters); XContentBuilder builder = jsonBuilder(); builder.value(status); Map serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false); @@ -154,6 +170,12 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { assertThat(exceptionFieldMapping.size(), equalTo(2)); assertThat(XContentMapValues.extractValue("type.type", exceptionFieldMapping), equalTo("keyword")); assertThat(XContentMapValues.extractValue("reason.type", exceptionFieldMapping), equalTo("text")); + } else if (fieldName.equals("tracking_remote_clusters")) { + assertThat(fieldType, equalTo("nested")); + assertThat(((Map) fieldMapping.get("properties")).size(), equalTo(2)); + assertThat(XContentMapValues.extractValue("properties.cluster_name.type", fieldMapping), equalTo("keyword")); + assertThat(XContentMapValues.extractValue("properties.time_since_last_auto_follow_started_millis.type", fieldMapping), + equalTo("long")); } else { fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]"); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index 6f28c450f0473..1643eeacc9fee 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -33,6 +33,9 @@ public class AutoFollowStats implements Writeable, ToXContentObject { private static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); private static final ParseField LEADER_INDEX = new ParseField("leader_index"); private static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); + private static final ParseField TRACKING_REMOTE_CLUSTERS = new ParseField("tracking_remote_clusters"); + private static final ParseField CLUSTER_NAME = new ParseField("cluster_name"); + private static final ParseField TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS = new ParseField("time_since_last_auto_follow_started_millis"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", @@ -43,26 +46,39 @@ public class AutoFollowStats implements Writeable, ToXContentObject { new TreeMap<>( ((List>) args[3]) .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) - )); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), + new TreeMap<>( + ((List>) args[3]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); private static final ConstructingObjectParser, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER = new ConstructingObjectParser<>( "auto_follow_stats_errors", args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1])); + private static final ConstructingObjectParser, Void> TRACKING_REMOTE_CLUSTERS_PARSER = + new ConstructingObjectParser<>( + "tracking_remote_clusters", + args -> new AbstractMap.SimpleEntry<>((String) args[0], (Long) args[1])); + static { AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject( ConstructingObjectParser.constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), AUTO_FOLLOW_EXCEPTION); + TRACKING_REMOTE_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME); + TRACKING_REMOTE_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), + TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, RECENT_AUTO_FOLLOW_ERRORS); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), TRACKING_REMOTE_CLUSTERS_PARSER, + TRACKING_REMOTE_CLUSTERS); } public static AutoFollowStats fromXContent(final XContentParser parser) { @@ -73,24 +89,28 @@ public static AutoFollowStats fromXContent(final XContentParser parser) { private final long numberOfFailedRemoteClusterStateRequests; private final long numberOfSuccessfulFollowIndices; private final NavigableMap recentAutoFollowErrors; + private final NavigableMap trackingRemoteClusters; public AutoFollowStats( - long numberOfFailedFollowIndices, - long numberOfFailedRemoteClusterStateRequests, - long numberOfSuccessfulFollowIndices, - NavigableMap recentAutoFollowErrors + long numberOfFailedFollowIndices, + long numberOfFailedRemoteClusterStateRequests, + long numberOfSuccessfulFollowIndices, + NavigableMap recentAutoFollowErrors, + NavigableMap trackingRemoteClusters ) { this.numberOfFailedFollowIndices = numberOfFailedFollowIndices; this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices; this.recentAutoFollowErrors = recentAutoFollowErrors; + this.trackingRemoteClusters = trackingRemoteClusters; } public AutoFollowStats(StreamInput in) throws IOException { numberOfFailedFollowIndices = in.readVLong(); numberOfFailedRemoteClusterStateRequests = in.readVLong(); numberOfSuccessfulFollowIndices = in.readVLong(); - recentAutoFollowErrors= new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + recentAutoFollowErrors = new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + trackingRemoteClusters = new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readZLong)); } @Override @@ -99,6 +119,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(numberOfFailedRemoteClusterStateRequests); out.writeVLong(numberOfSuccessfulFollowIndices); out.writeMap(recentAutoFollowErrors, StreamOutput::writeString, StreamOutput::writeException); + out.writeMap(trackingRemoteClusters, StreamOutput::writeString, StreamOutput::writeZLong); } public long getNumberOfFailedFollowIndices() { @@ -117,6 +138,10 @@ public NavigableMap getRecentAutoFollowErrors() return recentAutoFollowErrors; } + public NavigableMap getTrackingRemoteClusters() { + return trackingRemoteClusters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -148,6 +173,18 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P } } builder.endArray(); + builder.startArray(TRACKING_REMOTE_CLUSTERS.getPreferredName()); + { + for (final Map.Entry entry : trackingRemoteClusters.entrySet()) { + builder.startObject(); + { + builder.field(CLUSTER_NAME.getPreferredName(), entry.getKey()); + builder.field(TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS.getPreferredName(), entry.getValue()); + } + builder.endObject(); + } + } + builder.endArray(); return builder; } @@ -165,7 +202,8 @@ public boolean equals(Object o) { * keys. */ recentAutoFollowErrors.keySet().equals(that.recentAutoFollowErrors.keySet()) && - getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)); + getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) && + Objects.equals(trackingRemoteClusters, that.trackingRemoteClusters); } @Override @@ -179,7 +217,8 @@ public int hashCode() { * messages. Note that we are relying on the fact that the auto follow exceptions are ordered by keys. */ recentAutoFollowErrors.keySet(), - getFetchExceptionMessages(this) + getFetchExceptionMessages(this), + trackingRemoteClusters ); } @@ -194,6 +233,7 @@ public String toString() { ", numberOfFailedRemoteClusterStateRequests=" + numberOfFailedRemoteClusterStateRequests + ", numberOfSuccessfulFollowIndices=" + numberOfSuccessfulFollowIndices + ", recentAutoFollowErrors=" + recentAutoFollowErrors + + ", trackingRemoteClusters=" + trackingRemoteClusters + '}'; } } diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 1e6d3ec892af7..e20f52719d462 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -1060,6 +1060,17 @@ } } } + }, + "tracking_remote_clusters": { + "type": "nested", + "properties": { + "cluster_name": { + "type": "keyword" + }, + "time_since_last_auto_follow_started_millis": { + "type": "long" + } + } } } } From 35a04c740b3640aa88c8c68e5b541c2d24d8e384 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Nov 2018 15:30:59 +0100 Subject: [PATCH 2/5] fixed checkstyle violation --- .../java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index 1643eeacc9fee..77784cb97a680 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -35,7 +35,8 @@ public class AutoFollowStats implements Writeable, ToXContentObject { private static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); private static final ParseField TRACKING_REMOTE_CLUSTERS = new ParseField("tracking_remote_clusters"); private static final ParseField CLUSTER_NAME = new ParseField("cluster_name"); - private static final ParseField TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS = new ParseField("time_since_last_auto_follow_started_millis"); + private static final ParseField TIME_SINCE_LAST_AUTO_FOLLOW_STARTED_MILLIS = + new ParseField("time_since_last_auto_follow_started_millis"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", From 0bc8ceb634ac73f4050c610fdbd48c3b02e2b7e4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Nov 2018 18:08:55 +0100 Subject: [PATCH 3/5] fixed docs test --- docs/reference/ccr/apis/get-ccr-stats.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index b8491e8a60176..f7d42b18762c1 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -106,6 +106,7 @@ The API returns the following results: "number_of_failed_remote_cluster_state_requests" : 0, "number_of_successful_follow_indices" : 1, "recent_auto_follow_errors" : [] + "tracking_remote_clusters" : [] }, "follow_stats" : { "indices" : [ @@ -151,6 +152,7 @@ The API returns the following results: // TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/] // TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/] // TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/] +// TESTRESPONSE[s/"tracking_remote_clusters" : \[\]/"tracking_remote_clusters" : $body.auto_follow_stats.tracking_remote_clusters/] // TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/] // TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/] // TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/] From 1fb3e48d1577981f61a69a9f0ee79ac259086f3c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 27 Nov 2018 07:43:23 +0100 Subject: [PATCH 4/5] oops --- docs/reference/ccr/apis/get-ccr-stats.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index f7d42b18762c1..05c47ad45bfcf 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -105,7 +105,7 @@ The API returns the following results: "number_of_failed_follow_indices" : 0, "number_of_failed_remote_cluster_state_requests" : 0, "number_of_successful_follow_indices" : 1, - "recent_auto_follow_errors" : [] + "recent_auto_follow_errors" : [], "tracking_remote_clusters" : [] }, "follow_stats" : { From 8aec97507f0214e1f1bd71fbf91deb7b1581f002 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 27 Nov 2018 11:02:49 +0100 Subject: [PATCH 5/5] start from 0 --- .../elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 731f86958e752..d0f70c52ca526 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -248,7 +248,7 @@ abstract static class AutoFollower { private final Supplier followerClusterStateSupplier; private volatile long lastAutoFollowTime = -1; - private volatile long metadataVersion = -1; + private volatile long metadataVersion = 0; private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults;