Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Add time since last auto follow fetch to auto follow stats #36542

Merged
4 changes: 3 additions & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The API returns the following results:
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"number_of_successful_follow_indices" : 1,
"recent_auto_follow_errors" : []
"recent_auto_follow_errors" : [],
"tracking_remote_clusters" : []
},
"follow_stats" : {
"indices" : [
Expand Down Expand Up @@ -151,6 +152,7 @@ The API returns the following results:
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
// TESTRESPONSE[s/"tracking_remote_clusters" : \[\]/"tracking_remote_clusters" : $body.auto_follow_stats.tracking_remote_clusters/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, System::nanoTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use ThreadPool::relativeTimeInMillis.

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -67,6 +69,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final Client client;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeTimeProvider;

private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();

Expand All @@ -79,10 +82,13 @@ public class AutoFollowCoordinator implements ClusterStateListener {
public AutoFollowCoordinator(
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
CcrLicenseChecker ccrLicenseChecker,
LongSupplier relativeTimeProvider) {

this.client = client;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
this.relativeTimeProvider = relativeTimeProvider;
clusterService.addListener(this);
this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
@Override
Expand All @@ -93,11 +99,25 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
}

public synchronized AutoFollowStats getStats() {
final Map<String, AutoFollower> autoFollowers = this.autoFollowers;
final TreeMap<String, Long> timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>();
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
long timeSinceLastAutoFollow = entry.getValue().lastAutoFollowTime;
if (timeSinceLastAutoFollow != -1) {
long timeSinceLastAutoFollowInMillis =
TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - timeSinceLastAutoFollow);
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), timeSinceLastAutoFollowInMillis);
} else {
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), -1L);
}
}

return new AutoFollowStats(
numberOfFailedIndicesAutoFollowed,
numberOfFailedRemoteClusterStateRequests,
numberOfSuccessfulIndicesAutoFollowed,
new TreeMap<>(recentAutoFollowErrors)
new TreeMap<>(recentAutoFollowErrors),
timesSinceLastAutoFollowPerRemoteCluster
);
}

Expand Down Expand Up @@ -146,7 +166,7 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeTimeProvider) {

@Override
void getRemoteClusterState(final String remoteCluster,
Expand Down Expand Up @@ -239,20 +259,25 @@ abstract static class AutoFollower {
private final String remoteCluster;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;
private final LongSupplier relativeTimeProvider;

private volatile long lastAutoFollowTime = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the time unit to the variable names (relativeTimeProvider and lastAutoFollowTime)?

private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
final Supplier<ClusterState> followerClusterStateSupplier,
LongSupplier relativeTimeProvider) {
this.remoteCluster = remoteCluster;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
this.relativeTimeProvider = relativeTimeProvider;
}

void start() {
lastAutoFollowTime = relativeTimeProvider.getAsLong();
final ClusterState clusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testAutoFollower() {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), nullValue());
};
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState)) {
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testAutoFollowerClusterStateApiFailure() {
assertThat(results.get(0).clusterStateFetchException, sameInstance(failure));
assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0));
};
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) {
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testAutoFollowerUpdateClusterStateFailure() {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), sameInstance(failure));
};
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) {
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), sameInstance(failure));
};
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) {
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
Expand Down Expand Up @@ -532,8 +532,8 @@ public void testStats() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
null,
mock(ClusterService.class),
new CcrLicenseChecker(() -> true, () -> false)
);
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);

autoFollowCoordinator.updateStats(Collections.singletonList(
new AutoFollowCoordinator.AutoFollowResult("_alias1"))
Expand Down Expand Up @@ -585,6 +585,92 @@ public void testStats() {
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error"));
}

public void testUpdateAutoFollowers() {
ClusterService clusterService = mock(ClusterService.class);
// Return a cluster state with no patterns so that the auto followers never really execute:
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
.build();
when(clusterService.state()).thenReturn(followerState);
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
null,
clusterService,
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
// Add 3 patterns:
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null,
null, null, null, null, null, null, null, null));
patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null,
null, null, null, null, null, null, null, null));
patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null,
null, null, null, null, null, null, null, null));
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
.build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(2));
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote1"), notNullValue());
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue());
// Remove patterns 1 and 3:
patterns.remove("pattern1");
patterns.remove("pattern3");
clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
.build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(1));
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue());
// Add pattern 4:
patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null,
null, null, null, null, null, null, null, null));
clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
.build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(2));
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote1"), notNullValue());
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().get("remote2"), notNullValue());
// Remove patterns 2 and 4:
patterns.remove("pattern2");
patterns.remove("pattern4");
clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
.build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0));
}

public void testUpdateAutoFollowersNoPatterns() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
null,
mock(ClusterService.class),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
.build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0));
}

public void testUpdateAutoFollowersNoAutoFollowMetadata() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
null,
mock(ClusterService.class),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
autoFollowCoordinator.updateAutoFollowers(clusterState);
assertThat(autoFollowCoordinator.getStats().getTrackingRemoteClusters().size(), equalTo(0));
}

public void testWaitForMetadataVersion() {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
Expand All @@ -611,7 +697,7 @@ public void testWaitForMetadataVersion() {

List<AutoFollowCoordinator.AutoFollowResult> allResults = new ArrayList<>();
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = allResults::addAll;
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) {
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L) {

long previousRequestedMetadataVersion = 0;

Expand Down Expand Up @@ -669,7 +755,7 @@ public void testWaitForTimeOut() {
fail("should not be invoked");
};
AtomicInteger counter = new AtomicInteger();
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) {
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L) {

long previousRequestedMetadataVersion = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;

import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions;
import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters;
import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse;

public class AutoFollowStatsResponseTests extends AbstractWireSerializingTestCase<CcrStatsAction.Response> {
Expand All @@ -27,7 +28,8 @@ protected CcrStatsAction.Response createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions()
randomReadExceptions(),
randomTrackingClusters()
);
FollowStatsAction.StatsResponses statsResponse = createStatsResponse();
return new CcrStatsAction.Response(autoFollowStats, statsResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ protected AutoFollowStats createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions()
randomReadExceptions(),
randomTrackingClusters()
);
}

Expand All @@ -47,6 +48,15 @@ static NavigableMap<String, ElasticsearchException> randomReadExceptions() {
return readExceptions;
}

static NavigableMap<String, Long> randomTrackingClusters() {
final int count = randomIntBetween(0, 16);
final NavigableMap<String, Long> readExceptions = new TreeMap<>();
for (int i = 0; i < count; i++) {
readExceptions.put("" + i, randomLong());
}
return readExceptions;
}

@Override
protected Writeable.Reader<AutoFollowStats> instanceReader() {
return AutoFollowStats::new;
Expand Down
Loading