Skip to content

Commit

Permalink
Replicate aliases in cross-cluster replication (elastic#41815)
Browse files Browse the repository at this point in the history
This commit adds functionality so that aliases that are manipulated on
leader indices are replicated by the shard follow tasks to the follower
indices. Note that we ignore write indices. This is due to the fact that
follower indices do not receive direct writes so the concept is not
useful.
  • Loading branch information
jasontedor authored Jun 4, 2019
1 parent 998419c commit 6a98eeb
Show file tree
Hide file tree
Showing 22 changed files with 983 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public static final class ShardFollowStats {
static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes");
static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version");
static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version");
static final ParseField FOLLOWER_ALIASES_VERSION_FIELD = new ParseField("follower_aliases_version");
static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis");
static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis");
static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests");
Expand All @@ -117,41 +118,42 @@ public static final class ShardFollowStats {

@SuppressWarnings("unchecked")
static final ConstructingObjectParser<ShardFollowStats, Void> PARSER =
new ConstructingObjectParser<>(
"shard-follow-stats",
true,
args -> new ShardFollowStats(
(String) args[0],
(String) args[1],
(String) args[2],
(int) args[3],
(long) args[4],
(long) args[5],
(long) args[6],
(long) args[7],
(long) args[8],
(int) args[9],
(int) args[10],
(int) args[11],
(long) args[12],
(long) args[13],
(long) args[14],
(long) args[15],
(long) args[16],
(long) args[17],
(long) args[18],
(long) args[19],
(long) args[20],
(long) args[21],
(long) args[22],
(long) args[23],
(long) args[24],
(long) args[25],
new TreeMap<>(
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[26])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(ElasticsearchException) args[27]));
new ConstructingObjectParser<>(
"shard-follow-stats",
true,
args -> new ShardFollowStats(
(String) args[0],
(String) args[1],
(String) args[2],
(int) args[3],
(long) args[4],
(long) args[5],
(long) args[6],
(long) args[7],
(long) args[8],
(int) args[9],
(int) args[10],
(int) args[11],
(long) args[12],
(long) args[13],
(long) args[14],
(long) args[15],
(long) args[16],
(long) args[17],
(long) args[18],
(long) args[19],
(long) args[20],
(long) args[21],
(long) args[22],
(long) args[23],
(long) args[24],
(long) args[25],
(long) args[26],
new TreeMap<>(
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[27])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(ElasticsearchException) args[28]));

static final ConstructingObjectParser<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>, Void> READ_EXCEPTIONS_ENTRY_PARSER =
new ConstructingObjectParser<>(
Expand All @@ -175,6 +177,7 @@ public static final class ShardFollowStats {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_ALIASES_VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD);
Expand Down Expand Up @@ -220,6 +223,7 @@ public static final class ShardFollowStats {
private final long writeBufferSizeInBytes;
private final long followerMappingVersion;
private final long followerSettingsVersion;
private final long followerAliasesVersion;
private final long totalReadTimeMillis;
private final long totalReadRemoteExecTimeMillis;
private final long successfulReadRequests;
Expand Down Expand Up @@ -249,6 +253,7 @@ public static final class ShardFollowStats {
long writeBufferSizeInBytes,
long followerMappingVersion,
long followerSettingsVersion,
long followerAliasesVersion,
long totalReadTimeMillis,
long totalReadRemoteExecTimeMillis,
long successfulReadRequests,
Expand Down Expand Up @@ -277,6 +282,7 @@ public static final class ShardFollowStats {
this.writeBufferSizeInBytes = writeBufferSizeInBytes;
this.followerMappingVersion = followerMappingVersion;
this.followerSettingsVersion = followerSettingsVersion;
this.followerAliasesVersion = followerAliasesVersion;
this.totalReadTimeMillis = totalReadTimeMillis;
this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis;
this.successfulReadRequests = successfulReadRequests;
Expand Down Expand Up @@ -352,6 +358,10 @@ public long getFollowerSettingsVersion() {
return followerSettingsVersion;
}

public long getFollowerAliasesVersion() {
return followerAliasesVersion;
}

public long getTotalReadTimeMillis() {
return totalReadTimeMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ static FollowStatsAction.StatsResponses createStatsResponse() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
Expand Down Expand Up @@ -190,6 +191,8 @@ protected void assertInstances(CcrStatsAction.Response serverTestInstance, CcrSt
equalTo(expectedShardFollowStats.followerMappingVersion()));
assertThat(actualShardFollowStats.getFollowerSettingsVersion(),
equalTo(expectedShardFollowStats.followerSettingsVersion()));
assertThat(actualShardFollowStats.getFollowerAliasesVersion(),
equalTo(expectedShardFollowStats.followerAliasesVersion()));
assertThat(actualShardFollowStats.getTotalReadTimeMillis(),
equalTo(expectedShardFollowStats.totalReadTimeMillis()));
assertThat(actualShardFollowStats.getSuccessfulReadRequests(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ protected void assertInstances(FollowStatsAction.StatsResponses serverTestInstan
equalTo(expectedShardFollowStats.followerMappingVersion()));
assertThat(actualShardFollowStats.getFollowerSettingsVersion(),
equalTo(expectedShardFollowStats.followerSettingsVersion()));
assertThat(actualShardFollowStats.getFollowerAliasesVersion(),
equalTo(expectedShardFollowStats.followerAliasesVersion()));
assertThat(actualShardFollowStats.getTotalReadTimeMillis(),
equalTo(expectedShardFollowStats.totalReadTimeMillis()));
assertThat(actualShardFollowStats.getSuccessfulReadRequests(),
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/ccr/apis/follow/get-follow-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ The `shards` array consists of objects containing the following fields:
`indices[].shards[].follower_settings_version`::
(long) the index settings version the follower is synced up to

`indices[].shards[].follower_aliases_version`::
(long) the index aliases version the follower is synced up to

`indices[].shards[].total_read_time_millis`::
(long) the total time reads were outstanding, measured from the time a read
was sent to the leader to the time a reply was returned to the follower
Expand Down Expand Up @@ -217,6 +220,7 @@ The API returns the following results:
"write_buffer_operation_count" : 64,
"follower_mapping_version" : 4,
"follower_settings_version" : 2,
"follower_aliases_version" : 8,
"total_read_time_millis" : 32768,
"total_read_remote_exec_time_millis" : 16384,
"successful_read_requests" : 32,
Expand Down Expand Up @@ -246,6 +250,7 @@ The API returns the following results:
// TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/]
// TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/]
// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/]
// TESTRESPONSE[s/"follower_aliases_version" : 8/"follower_aliases_version" : $body.indices.0.shards.0.follower_aliases_version/]
// TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/]
// TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/]
// TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/]
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The API returns the following results:
"write_buffer_operation_count" : 64,
"follower_mapping_version" : 4,
"follower_settings_version" : 2,
"follower_aliases_version" : 8,
"total_read_time_millis" : 32768,
"total_read_remote_exec_time_millis" : 16384,
"successful_read_requests" : 32,
Expand Down Expand Up @@ -161,6 +162,7 @@ The API returns the following results:
// TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.follow_stats.indices.0.shards.0.write_buffer_operation_count/]
// TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.follow_stats.indices.0.shards.0.follower_mapping_version/]
// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.follow_stats.indices.0.shards.0.follower_settings_version/]
// TESTRESPONSE[s/"follower_aliases_version" : 8/"follower_aliases_version" : $body.follow_stats.indices.0.shards.0.follower_aliases_version/]
// TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.follow_stats.indices.0.shards.0.total_read_time_millis/]
// TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.follow_stats.indices.0.shards.0.total_read_remote_exec_time_millis/]
// TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.follow_stats.indices.0.shards.0.successful_read_requests/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,13 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

AliasMetaData that = (AliasMetaData) o;
final AliasMetaData that = (AliasMetaData) o;

if (alias != null ? !alias.equals(that.alias) : that.alias != null) return false;
if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
if (indexRouting != null ? !indexRouting.equals(that.indexRouting) : that.indexRouting != null) return false;
if (searchRouting != null ? !searchRouting.equals(that.searchRouting) : that.searchRouting != null)
return false;
if (writeIndex != null ? writeIndex != that.writeIndex : that.writeIndex != null)
return false;
if (searchRouting != null ? !searchRouting.equals(that.searchRouting) : that.searchRouting != null) return false;
if (writeIndex != null ? writeIndex != that.writeIndex : that.writeIndex != null) return false;

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina
int followerMaxSeqNo = 0;
int followerMappingVersion = 0;
int followerSettingsVersion = 0;
int followerAliasesVersion = 0;

List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), greaterThanOrEqualTo(1));
Expand All @@ -164,11 +165,15 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina
int foundFollowerSettingsVersion =
(int) XContentMapValues.extractValue("_source.ccr_stats.follower_settings_version", hit);
followerSettingsVersion = Math.max(followerSettingsVersion, foundFollowerSettingsVersion);
int foundFollowerAliasesVersion =
(int) XContentMapValues.extractValue("_source.ccr_stats.follower_aliases_version", hit);
followerAliasesVersion = Math.max(followerAliasesVersion, foundFollowerAliasesVersion);
}

assertThat(followerMaxSeqNo, greaterThan(0));
assertThat(followerMappingVersion, greaterThan(0));
assertThat(followerSettingsVersion, greaterThan(0));
assertThat(followerAliasesVersion, greaterThan(0));
}

protected static void verifyAutoFollowMonitoring() throws IOException {
Expand Down
Loading

0 comments on commit 6a98eeb

Please sign in to comment.