diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/IndicesFollowStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/IndicesFollowStats.java index 7d3af08577b16..ae6217d167447 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/IndicesFollowStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/IndicesFollowStats.java @@ -101,6 +101,7 @@ public static final class ShardFollowStats { static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes"); static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version"); static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version"); + static final ParseField FOLLOWER_ALIASES_VERSION_FIELD = new ParseField("follower_aliases_version"); static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis"); static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis"); static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests"); @@ -117,41 +118,42 @@ public static final class ShardFollowStats { @SuppressWarnings("unchecked") static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>( - "shard-follow-stats", - true, - args -> new ShardFollowStats( - (String) args[0], - (String) args[1], - (String) args[2], - (int) args[3], - (long) args[4], - (long) args[5], - (long) args[6], - (long) args[7], - (long) args[8], - (int) args[9], - (int) args[10], - (int) args[11], - (long) args[12], - (long) args[13], - (long) args[14], - (long) args[15], - (long) args[16], - (long) args[17], - (long) args[18], - (long) args[19], - (long) args[20], - (long) args[21], - (long) args[22], - (long) args[23], - (long) args[24], - (long) args[25], - new TreeMap<>( - ((List>>) args[26]) - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (ElasticsearchException) args[27])); + new ConstructingObjectParser<>( + "shard-follow-stats", + true, + args -> new ShardFollowStats( + (String) args[0], + (String) args[1], + (String) args[2], + (int) args[3], + (long) args[4], + (long) args[5], + (long) args[6], + (long) args[7], + (long) args[8], + (int) args[9], + (int) args[10], + (int) args[11], + (long) args[12], + (long) args[13], + (long) args[14], + (long) args[15], + (long) args[16], + (long) args[17], + (long) args[18], + (long) args[19], + (long) args[20], + (long) args[21], + (long) args[22], + (long) args[23], + (long) args[24], + (long) args[25], + (long) args[26], + new TreeMap<>( + ((List>>) args[27]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), + (ElasticsearchException) args[28])); static final ConstructingObjectParser>, Void> READ_EXCEPTIONS_ENTRY_PARSER = new ConstructingObjectParser<>( @@ -175,6 +177,7 @@ public static final class ShardFollowStats { PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_ALIASES_VERSION_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD); @@ -220,6 +223,7 @@ public static final class ShardFollowStats { private final long writeBufferSizeInBytes; private final long followerMappingVersion; private final long followerSettingsVersion; + private final long followerAliasesVersion; private final long totalReadTimeMillis; private final long totalReadRemoteExecTimeMillis; private final long successfulReadRequests; @@ -249,6 +253,7 @@ public static final class ShardFollowStats { long writeBufferSizeInBytes, long followerMappingVersion, long followerSettingsVersion, + long followerAliasesVersion, long totalReadTimeMillis, long totalReadRemoteExecTimeMillis, long successfulReadRequests, @@ -277,6 +282,7 @@ public static final class ShardFollowStats { this.writeBufferSizeInBytes = writeBufferSizeInBytes; this.followerMappingVersion = followerMappingVersion; this.followerSettingsVersion = followerSettingsVersion; + this.followerAliasesVersion = followerAliasesVersion; this.totalReadTimeMillis = totalReadTimeMillis; this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis; this.successfulReadRequests = successfulReadRequests; @@ -352,6 +358,10 @@ public long getFollowerSettingsVersion() { return followerSettingsVersion; } + public long getFollowerAliasesVersion() { + return followerAliasesVersion; + } + public long getTotalReadTimeMillis() { return totalReadTimeMillis; } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java index d56b762520c55..eaf6103a0ecfe 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java @@ -106,6 +106,7 @@ static FollowStatsAction.StatsResponses createStatsResponse() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), Collections.emptyNavigableMap(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); @@ -190,6 +191,8 @@ protected void assertInstances(CcrStatsAction.Response serverTestInstance, CcrSt equalTo(expectedShardFollowStats.followerMappingVersion())); assertThat(actualShardFollowStats.getFollowerSettingsVersion(), equalTo(expectedShardFollowStats.followerSettingsVersion())); + assertThat(actualShardFollowStats.getFollowerAliasesVersion(), + equalTo(expectedShardFollowStats.followerAliasesVersion())); assertThat(actualShardFollowStats.getTotalReadTimeMillis(), equalTo(expectedShardFollowStats.totalReadTimeMillis())); assertThat(actualShardFollowStats.getSuccessfulReadRequests(), diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/FollowStatsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/FollowStatsResponseTests.java index cd7257342c724..ff93c8df33eda 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/FollowStatsResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/FollowStatsResponseTests.java @@ -93,6 +93,8 @@ protected void assertInstances(FollowStatsAction.StatsResponses serverTestInstan equalTo(expectedShardFollowStats.followerMappingVersion())); assertThat(actualShardFollowStats.getFollowerSettingsVersion(), equalTo(expectedShardFollowStats.followerSettingsVersion())); + assertThat(actualShardFollowStats.getFollowerAliasesVersion(), + equalTo(expectedShardFollowStats.followerAliasesVersion())); assertThat(actualShardFollowStats.getTotalReadTimeMillis(), equalTo(expectedShardFollowStats.totalReadTimeMillis())); assertThat(actualShardFollowStats.getSuccessfulReadRequests(), diff --git a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc index 73bdd9494d1c8..db4bf910a05c6 100644 --- a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc @@ -114,6 +114,9 @@ The `shards` array consists of objects containing the following fields: `indices[].shards[].follower_settings_version`:: (long) the index settings version the follower is synced up to +`indices[].shards[].follower_aliases_version`:: + (long) the index aliases version the follower is synced up to + `indices[].shards[].total_read_time_millis`:: (long) the total time reads were outstanding, measured from the time a read was sent to the leader to the time a reply was returned to the follower @@ -217,6 +220,7 @@ The API returns the following results: "write_buffer_operation_count" : 64, "follower_mapping_version" : 4, "follower_settings_version" : 2, + "follower_aliases_version" : 8, "total_read_time_millis" : 32768, "total_read_remote_exec_time_millis" : 16384, "successful_read_requests" : 32, @@ -246,6 +250,7 @@ The API returns the following results: // TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/] // TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/] // TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/] +// TESTRESPONSE[s/"follower_aliases_version" : 8/"follower_aliases_version" : $body.indices.0.shards.0.follower_aliases_version/] // TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/] // TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/] // TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/] diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index 03f2f3eee0c0b..d1acbb0064a9f 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -126,6 +126,7 @@ The API returns the following results: "write_buffer_operation_count" : 64, "follower_mapping_version" : 4, "follower_settings_version" : 2, + "follower_aliases_version" : 8, "total_read_time_millis" : 32768, "total_read_remote_exec_time_millis" : 16384, "successful_read_requests" : 32, @@ -161,6 +162,7 @@ The API returns the following results: // TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.follow_stats.indices.0.shards.0.write_buffer_operation_count/] // TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.follow_stats.indices.0.shards.0.follower_mapping_version/] // TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.follow_stats.indices.0.shards.0.follower_settings_version/] +// TESTRESPONSE[s/"follower_aliases_version" : 8/"follower_aliases_version" : $body.follow_stats.indices.0.shards.0.follower_aliases_version/] // TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.follow_stats.indices.0.shards.0.total_read_time_millis/] // TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.follow_stats.indices.0.shards.0.total_read_remote_exec_time_millis/] // TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.follow_stats.indices.0.shards.0.successful_read_requests/] diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/AliasMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/AliasMetaData.java index 0d41d5f86b3ac..478e76b525610 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/AliasMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/AliasMetaData.java @@ -140,15 +140,13 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - AliasMetaData that = (AliasMetaData) o; + final AliasMetaData that = (AliasMetaData) o; if (alias != null ? !alias.equals(that.alias) : that.alias != null) return false; if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false; if (indexRouting != null ? !indexRouting.equals(that.indexRouting) : that.indexRouting != null) return false; - if (searchRouting != null ? !searchRouting.equals(that.searchRouting) : that.searchRouting != null) - return false; - if (writeIndex != null ? writeIndex != that.writeIndex : that.writeIndex != null) - return false; + if (searchRouting != null ? !searchRouting.equals(that.searchRouting) : that.searchRouting != null) return false; + if (writeIndex != null ? writeIndex != that.writeIndex : that.writeIndex != null) return false; return true; } diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index 33e7c2f2bf177..b555e5f441146 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -143,6 +143,7 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina int followerMaxSeqNo = 0; int followerMappingVersion = 0; int followerSettingsVersion = 0; + int followerAliasesVersion = 0; List hits = (List) XContentMapValues.extractValue("hits.hits", response); assertThat(hits.size(), greaterThanOrEqualTo(1)); @@ -164,11 +165,15 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina int foundFollowerSettingsVersion = (int) XContentMapValues.extractValue("_source.ccr_stats.follower_settings_version", hit); followerSettingsVersion = Math.max(followerSettingsVersion, foundFollowerSettingsVersion); + int foundFollowerAliasesVersion = + (int) XContentMapValues.extractValue("_source.ccr_stats.follower_aliases_version", hit); + followerAliasesVersion = Math.max(followerAliasesVersion, foundFollowerAliasesVersion); } assertThat(followerMaxSeqNo, greaterThan(0)); assertThat(followerMappingVersion, greaterThan(0)); assertThat(followerSettingsVersion, greaterThan(0)); + assertThat(followerAliasesVersion, greaterThan(0)); } protected static void verifyAutoFollowMonitoring() throws IOException { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 33b8a274431d2..6cab2820bd47b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -219,6 +220,12 @@ public long getSettingsVersion() { return settingsVersion; } + private long aliasesVersion; + + public long getAliasesVersion() { + return aliasesVersion; + } + private long globalCheckpoint; public long getGlobalCheckpoint() { @@ -256,6 +263,11 @@ public long getTookInMillis() { super(in); mappingVersion = in.readVLong(); settingsVersion = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + aliasesVersion = in.readVLong(); + } else { + aliasesVersion = 0; + } globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); maxSeqNoOfUpdatesOrDeletes = in.readZLong(); @@ -264,16 +276,17 @@ public long getTookInMillis() { } Response( - final long mappingVersion, - final long settingsVersion, - final long globalCheckpoint, - final long maxSeqNo, - final long maxSeqNoOfUpdatesOrDeletes, - final Translog.Operation[] operations, - final long tookInMillis) { - + final long mappingVersion, + final long settingsVersion, + final long aliasesVersion, + final long globalCheckpoint, + final long maxSeqNo, + final long maxSeqNoOfUpdatesOrDeletes, + final Translog.Operation[] operations, + final long tookInMillis) { this.mappingVersion = mappingVersion; this.settingsVersion = settingsVersion; + this.aliasesVersion = aliasesVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; @@ -291,6 +304,9 @@ public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(mappingVersion); out.writeVLong(settingsVersion); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeVLong(aliasesVersion); + } out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); out.writeZLong(maxSeqNoOfUpdatesOrDeletes); @@ -305,6 +321,7 @@ public boolean equals(final Object o) { final Response that = (Response) o; return mappingVersion == that.mappingVersion && settingsVersion == that.settingsVersion && + aliasesVersion == that.aliasesVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes && @@ -317,6 +334,7 @@ public int hashCode() { return Objects.hash( mappingVersion, settingsVersion, + aliasesVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, @@ -361,9 +379,11 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc final IndexMetaData indexMetaData = indexService.getMetaData(); final long mappingVersion = indexMetaData.getMappingVersion(); final long settingsVersion = indexMetaData.getSettingsVersion(); + final long aliasesVersion = indexMetaData.getAliasesVersion(); return getResponse( mappingVersion, settingsVersion, + aliasesVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, @@ -436,12 +456,14 @@ private void globalCheckpointAdvancementFailure( final long mappingVersion = indexMetaData.getMappingVersion(); final long settingsVersion = indexMetaData.getSettingsVersion(); + final long aliasesVersion = indexMetaData.getAliasesVersion(); final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); listener.onResponse( getResponse( mappingVersion, settingsVersion, + aliasesVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY, @@ -541,6 +563,7 @@ static Translog.Operation[] getOperations( static Response getResponse( final long mappingVersion, final long settingsVersion, + final long aliasesVersion, final SeqNoStats seqNoStats, final long maxSeqNoOfUpdates, final Translog.Operation[] operations, @@ -550,6 +573,7 @@ static Response getResponse( return new Response( mappingVersion, settingsVersion, + aliasesVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0ee86a6058c63..4ad0fb1dfd0d6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -78,6 +78,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private int numOutstandingWrites = 0; private long currentMappingVersion = 0; private long currentSettingsVersion = 0; + private long currentAliasesVersion = 0; private long totalReadRemoteExecTimeMillis = 0; private long totalReadTimeMillis = 0; private long successfulReadRequests = 0; @@ -154,15 +155,27 @@ void start( synchronized (ShardFollowNodeTask.this) { currentSettingsVersion = leaderSettingsVersion; } + }); + updateAliases(leaderAliasesVersion -> { + synchronized (ShardFollowNodeTask.this) { + currentAliasesVersion = leaderAliasesVersion; + } + }); + synchronized (ShardFollowNodeTask.this) { LOGGER.info( - "{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]", + "{} following leader shard {}, " + + "follower global checkpoint=[{}], " + + "mapping version=[{}], " + + "settings version=[{}], " + + "aliases version=[{}]", params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, - leaderMappingVersion, - leaderSettingsVersion); - coordinateReads(); - }); + currentMappingVersion, + currentSettingsVersion, + currentAliasesVersion); + } + coordinateReads(); }); } @@ -306,12 +319,14 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res // In order to process this read response (3), we need to check and potentially update the follow index's setting (1) and // check and potentially update the follow index's mappings (2). - // 3) handle read response: + // 4) handle read response: Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response); - // 2) update follow index mapping: + // 3) update follow index mapping: Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask); - // 1) update follow index settings: - maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask); + // 2) update follow index settings: + Runnable updateSettingsTask = () -> maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask); + // 1) update follow index aliases: + maybeUpdateAliases(response.getAliasesVersion(), updateSettingsTask); } void handleFallenBehindLeaderShard(Exception e, long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) { @@ -423,7 +438,7 @@ private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion, private synchronized void maybeUpdateSettings(final Long minimumRequiredSettingsVersion, Runnable task) { if (currentSettingsVersion >= minimumRequiredSettingsVersion) { - LOGGER.trace("{} settings version [{}] is higher or equal than minimum required mapping version [{}]", + LOGGER.trace("{} settings version [{}] is higher or equal than minimum required settings version [{}]", params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion); task.run(); } else { @@ -436,6 +451,27 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings } } + private synchronized void maybeUpdateAliases(final Long minimumRequiredAliasesVersion, final Runnable task) { + if (currentAliasesVersion >= minimumRequiredAliasesVersion) { + LOGGER.trace( + "{} aliases version [{}] is higher or equal than minimum required aliases version [{}]", + params.getFollowShardId(), + currentAliasesVersion, + minimumRequiredAliasesVersion); + task.run(); + } else { + LOGGER.trace( + "{} updating aliases, aliases version [{}] is lower than minimum required aliases version [{}]", + params.getFollowShardId(), + currentAliasesVersion, + minimumRequiredAliasesVersion); + updateAliases(aliasesVersion -> { + currentAliasesVersion = aliasesVersion; + task.run(); + }); + } + } + private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) { updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0)); } @@ -453,6 +489,14 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr innerUpdateSettings(handler, e -> handleFailure(e, retryCounter, () -> updateSettings(handler, retryCounter))); } + private void updateAliases(final LongConsumer handler) { + updateAliases(handler, new AtomicInteger()); + } + + private void updateAliases(final LongConsumer handler, final AtomicInteger retryCounter) { + innerUpdateAliases(handler, e -> handleFailure(e, retryCounter, () -> updateAliases(handler, retryCounter))); + } + private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; if (shouldRetry(params.getRemoteCluster(), e)) { @@ -511,6 +555,8 @@ static boolean shouldRetry(String remoteCluster, Exception e) { protected abstract void innerUpdateSettings(LongConsumer handler, Consumer errorHandler); + protected abstract void innerUpdateAliases(LongConsumer handler, Consumer errorHandler); + protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID, List operations, long leaderMaxSeqNoOfUpdatesOrDeletes, @@ -566,6 +612,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() { bufferSizeInBytes, currentMappingVersion, currentSettingsVersion, + currentAliasesVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 595303d0bce72..bb145cecfac5b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -26,6 +27,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -62,7 +64,9 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -200,6 +204,124 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum } } + @Override + protected void innerUpdateAliases(final LongConsumer handler, final Consumer errorHandler) { + /* + * The strategy for updating the aliases is fairly simple. We look at the aliases that exist on the leader, and those that + * exist on the follower. We partition these aliases into three sets: the aliases that exist on both the leader and the + * follower, the aliases that are on the leader only, and the aliases that are on the follower only. + * + * For the aliases that are on the leader and the follower, we compare the aliases and add an action to overwrite the + * follower view of the alias if the aliases are different. If the aliases are the same, we skip the alias. Note that the + * meaning of equals here intentionally ignores the write index. There are two reasons for this. First, follower indices + * do not receive direct writes so conceptually the write index is not useful. Additionally, there is a larger challenge. + * Suppose that we did copy over the write index from the leader to the follower. On the leader, when the write index is + * swapped from one index to another, this is done atomically. However, to do this on the follower, we would have to step + * outside the shard follow tasks framework and have a separate framework for copying aliases over. This is because if we + * try to manage the aliases including the write aliases with the shard follow tasks, we do not have a way to move the write + * index atomically (since we have a single-index view here only) and therefore we can end up in situations where we would + * try to assign the write index to two indices. Further, trying to do this outside the shard follow tasks framework has + * problems too, since it could be that the new aliases arrive on the coordinator before the write index has even been + * created on the local cluster. So there are race conditions either way. All of this put together means that we will simply + * ignore the write index. + * + * For aliases that are on the leader but not the follower, we copy those aliases over to the follower. + * + * For aliases that are on the follower but not the leader, we remove those aliases from the follower. + */ + final var leaderIndex = params.getLeaderShardId().getIndex(); + final var followerIndex = params.getFollowShardId().getIndex(); + + final var clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); + + final CheckedConsumer onResponse = clusterStateResponse -> { + final var leaderIndexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + final var followerIndexMetaData = clusterService.state().metaData().getIndexSafe(followerIndex); + + // partition the aliases into the three sets + final var aliasesOnLeaderNotOnFollower = new HashSet(); + final var aliasesInCommon = new HashSet(); + final var aliasesOnFollowerNotOnLeader = new HashSet(); + + for (final var aliasName : leaderIndexMetaData.getAliases().keys()) { + if (followerIndexMetaData.getAliases().containsKey(aliasName.value)) { + aliasesInCommon.add(aliasName.value); + } else { + aliasesOnLeaderNotOnFollower.add(aliasName.value); + } + } + + for (final var aliasName : followerIndexMetaData.getAliases().keys()) { + if (leaderIndexMetaData.getAliases().containsKey(aliasName.value)) { + assert aliasesInCommon.contains(aliasName.value) : aliasName.value; + } else { + aliasesOnFollowerNotOnLeader.add(aliasName.value); + } + } + + final var aliasActions = new ArrayList(); + + // add the aliases the follower does not have + for (final var aliasName : aliasesOnLeaderNotOnFollower) { + final var alias = leaderIndexMetaData.getAliases().get(aliasName); + // we intentionally override that the alias is not a write alias as follower indices do not receive direct writes + aliasActions.add(IndicesAliasesRequest.AliasActions.add() + .index(followerIndex.getName()) + .alias(alias.alias()) + .filter(alias.filter() == null ? null : alias.filter().toString()) + .indexRouting(alias.indexRouting()) + .searchRouting(alias.searchRouting()) + .writeIndex(false)); + } + + // update the aliases that are different (ignoring write aliases) + for (final var aliasName : aliasesInCommon) { + final var leaderAliasMetaData = leaderIndexMetaData.getAliases().get(aliasName); + // we intentionally override that the alias is not a write alias as follower indices do not receive direct writes + final var leaderAliasMetaDataWithoutWriteIndex = new AliasMetaData.Builder(aliasName) + .filter(leaderAliasMetaData.filter()) + .indexRouting(leaderAliasMetaData.indexRouting()) + .searchRouting(leaderAliasMetaData.searchRouting()) + .writeIndex(false) + .build(); + final var followerAliasMetaData = followerIndexMetaData.getAliases().get(aliasName); + if (leaderAliasMetaDataWithoutWriteIndex.equals(followerAliasMetaData)) { + // skip this alias, the leader and follower have the same modulo the write index + continue; + } + // we intentionally override that the alias is not a write alias as follower indices do not receive direct writes + aliasActions.add(IndicesAliasesRequest.AliasActions.add() + .index(followerIndex.getName()) + .alias(leaderAliasMetaData.alias()) + .filter(leaderAliasMetaData.filter() == null ? null : leaderAliasMetaData.filter().toString()) + .indexRouting(leaderAliasMetaData.indexRouting()) + .searchRouting(leaderAliasMetaData.searchRouting()) + .writeIndex(false)); + } + + // remove aliases that the leader no longer has + for (final var aliasName : aliasesOnFollowerNotOnLeader) { + aliasActions.add(IndicesAliasesRequest.AliasActions.remove().index(followerIndex.getName()).alias(aliasName)); + } + + final var request = new IndicesAliasesRequest(); + if (aliasActions.isEmpty()) { + handler.accept(leaderIndexMetaData.getAliasesVersion()); + } else { + aliasActions.forEach(request::addAliasAction); + followerClient.admin().indices().aliases( + request, + ActionListener.wrap(r -> handler.accept(leaderIndexMetaData.getAliasesVersion()), errorHandler)); + } + }; + + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } catch (final NoSuchRemoteClusterException e) { + errorHandler.accept(e); + } + } + private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, Settings updatedSettings, Runnable handler, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrAliasesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrAliasesIT.java new file mode 100644 index 0000000000000..e1e955470e44d --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrAliasesIT.java @@ -0,0 +1,413 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; +import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class CcrAliasesIT extends CcrIntegTestCase { + + public void testAliasOnIndexCreation() throws Exception { + final String aliasName = randomAlphaOfLength(16); + final String aliases; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("aliases"); + { + builder.startObject(aliasName); + { + + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + aliases = BytesReference.bytes(builder).utf8ToString(); + } + assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(aliases, XContentType.JSON)); + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, "follower"); + + // wait for the shard follow task to exist + assertBusy(() -> assertShardFollowTask(1)); + + assertAliasesExist("leader", "follower", aliasName); + } + + public void testAddAlias() throws Exception { + runAddAliasTest(null); + } + + public void testAddExplicitNotWriteAlias() throws Exception { + runAddAliasTest(false); + } + + public void testWriteAliasIsIgnored() throws Exception { + runAddAliasTest(true); + } + + private void runAddAliasTest(final Boolean isWriteAlias) throws Exception { + runAddAliasTest(isWriteAlias, aliasName -> {}); + } + + /** + * Runs an add alias test which adds a random alias to the leader exist, and then asserts that the alias is replicated to the follower. + * The specified post assertions gives the caller the opportunity to add additional assertions on the alias that is added. These + * assertions are executed after all other assertions that the alias exists. + * + * @param isWriteIndex whether or not the leader index is the write index for the alias + * @param postAssertions the post assertions to execute + * @param the type of checked exception the post assertions callback can throw + * @throws Exception if a checked exception is thrown while executing the add alias test + */ + private void runAddAliasTest( + final Boolean isWriteIndex, + final CheckedConsumer postAssertions) throws Exception { + assertAcked(leaderClient().admin().indices().prepareCreate("leader")); + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + // we set a low poll timeout so that shard changes requests are responded to quickly even without indexing + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100)); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, "follower"); + + assertBusy(() -> assertShardFollowTask(1)); + + final String aliasName = randomAlphaOfLength(16); + addRandomAlias("leader", aliasName, isWriteIndex); + + assertAliasesExist("leader", "follower", aliasName); + + postAssertions.accept(aliasName); + } + + private void addRandomAlias(final String index, final String aliasName, final Boolean isWriteIndex) { + final IndicesAliasesRequest.AliasActions add = IndicesAliasesRequest.AliasActions.add(); + add.index(index); + add.alias(aliasName); + add.writeIndex(isWriteIndex); + if (randomBoolean()) { + add.routing(randomAlphaOfLength(16)); + } else { + if (randomBoolean()) { + add.indexRouting(randomAlphaOfLength(16)); + } + if (randomBoolean()) { + add.searchRouting(randomAlphaOfLength(16)); + } + } + if (randomBoolean()) { + add.filter(termQuery(randomAlphaOfLength(16), randomAlphaOfLength(16))); + } + + assertAcked(leaderClient().admin().indices().prepareAliases().addAliasAction(add)); + } + + public void testAddMultipleAliasesAtOnce() throws Exception { + assertAcked(leaderClient().admin().indices().prepareCreate("leader")); + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + // we set a low poll timeout so that shard changes requests are responded to quickly even without indexing + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100)); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, "follower"); + + assertBusy(() -> assertShardFollowTask(1)); + + final int numberOfAliases = randomIntBetween(2, 8); + final IndicesAliasesRequestBuilder builder = leaderClient().admin().indices().prepareAliases(); + for (int i = 0; i < numberOfAliases; i++) { + builder.addAlias("leader", "alias_" + i); + } + assertAcked(builder); + + final String[] aliases = new String[numberOfAliases]; + for (int i = 0; i < numberOfAliases; i++) { + aliases[i] = "alias_" + i; + } + assertAliasesExist("leader", "follower", aliases); + } + + public void testAddMultipleAliasesSequentially() throws Exception { + assertAcked(leaderClient().admin().indices().prepareCreate("leader")); + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + // we set a low poll timeout so that shard changes requests are responded to quickly even without indexing + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100)); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, "follower"); + + assertBusy(() -> assertShardFollowTask(1)); + + final int numberOfAliases = randomIntBetween(2, 8); + for (int i = 0; i < numberOfAliases; i++) { + assertAcked(leaderClient().admin().indices().prepareAliases().addAlias("leader", "alias_" + i)); + + final String[] aliases = new String[i + 1]; + for (int j = 0; j < i + 1; j++) { + aliases[j] = "alias_" + j; + } + assertAliasesExist("leader", "follower", aliases); + } + } + + public void testUpdateExistingAlias() throws Exception { + runAddAliasTest( + null, + /* + * After the alias is added (via runAddAliasTest) we modify the alias in place, and then assert that the modification is + * eventually replicated. + */ + aliasName -> { + assertAcked(leaderClient().admin() + .indices() + .prepareAliases() + .addAlias("leader", aliasName, termQuery(randomAlphaOfLength(16), randomAlphaOfLength(16)))); + assertAliasesExist("leader", "follower", aliasName); + }); + } + + public void testRemoveExistingAlias() throws Exception { + runAddAliasTest( + false, + aliasName -> { + removeAlias(aliasName); + assertAliasExistence(aliasName, false); + } + ); + } + + private void removeAlias(final String aliasName) { + assertAcked(leaderClient().admin().indices().prepareAliases().removeAlias("leader", aliasName)); + } + + public void testStress() throws Exception { + assertAcked(leaderClient().admin().indices().prepareCreate("leader")); + final PutFollowAction.Request followRequest = putFollow("leader", "follower"); + // we set a low poll timeout so that shard changes requests are responded to quickly even without indexing + followRequest.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(100)); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final int numberOfThreads = randomIntBetween(2, 4); + final int numberOfIterations = randomIntBetween(4, 32); + final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads + 1); + final List threads = new ArrayList<>(numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < numberOfIterations; j++) { + final String action = randomFrom("create", "update", "delete"); + switch (action) { + case "create": + addRandomAlias("leader", randomAlphaOfLength(16), randomFrom(new Boolean[] { null, false, true })); + break; + case "update": + try { + final String[] aliases = getAliasesOnLeader(); + if (aliases.length == 0) { + continue; + } + final String alias = randomFrom(aliases); + /* + * Add an alias with the same name, which acts as an update (although another thread could concurrently + * remove). + */ + addRandomAlias("leader", alias, randomFrom(new Boolean[] { null, false, true })); + } catch (final Exception e) { + throw new RuntimeException(e); + } + break; + case "delete": + try { + final String[] aliases = getAliasesOnLeader(); + if (aliases.length == 0) { + continue; + } + final String alias = randomFrom(aliases); + try { + removeAlias(alias); + } catch (final AliasesNotFoundException e) { + // ignore, it could have been deleted by another thread + continue; + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + break; + default: + assert false : action; + } + } + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); + } + barrier.await(); + + barrier.await(); + + for (final Thread thread : threads) { + thread.join(); + } + + assertAliasesExist("leader", "follower", getAliasesOnLeader()); + } + + private String[] getAliasesOnLeader() throws InterruptedException, ExecutionException { + final GetAliasesResponse response = leaderClient().admin().indices().getAliases(new GetAliasesRequest().indices("leader")).get(); + return response.getAliases().get("leader").stream().map(AliasMetaData::alias).toArray(String[]::new); + } + + private void assertAliasesExist(final String leaderIndex, final String followerIndex, final String... aliases) throws Exception { + assertAliasesExist(leaderIndex, followerIndex, (alias, aliasMetaData) -> {}, aliases); + } + + private void assertAliasesExist( + final String leaderIndex, + final String followerIndex, + final CheckedBiConsumer aliasMetaDataAssertion, + final String... aliases) throws Exception { + // we must check serially because aliases exist will return true if any but not necessarily all of the requested aliases exist + for (final String alias : aliases) { + assertAliasExistence(alias, true); + } + + assertBusy(() -> { + final GetAliasesResponse followerResponse = + followerClient().admin().indices().getAliases(new GetAliasesRequest().indices(followerIndex)).get(); + assertThat( + "expected follower to have [" + aliases.length + "] aliases, but was " + followerResponse.getAliases().toString(), + followerResponse.getAliases().get(followerIndex), + hasSize(aliases.length)); + for (final String alias : aliases) { + final AliasMetaData followerAliasMetaData = getAliasMetaData(followerResponse, followerIndex, alias); + + final GetAliasesResponse leaderResponse = + leaderClient().admin().indices().getAliases(new GetAliasesRequest().indices(leaderIndex).aliases(alias)).get(); + final AliasMetaData leaderAliasMetaData = getAliasMetaData(leaderResponse, leaderIndex, alias); + + assertThat( + "alias [" + alias + "] index routing did not replicate, but was " + followerAliasMetaData.toString(), + followerAliasMetaData.indexRouting(), equalTo(leaderAliasMetaData.indexRouting())); + assertThat( + "alias [" + alias + "] search routing did not replicate, but was " + followerAliasMetaData.toString(), + followerAliasMetaData.searchRoutingValues(), equalTo(leaderAliasMetaData.searchRoutingValues())); + assertThat( + "alias [" + alias + "] filtering did not replicate, but was " + followerAliasMetaData.toString(), + followerAliasMetaData.filter(), equalTo(leaderAliasMetaData.filter())); + assertThat( + "alias [" + alias + "] should not be a write index, but was " + followerAliasMetaData.toString(), + followerAliasMetaData.writeIndex(), + equalTo(false)); + aliasMetaDataAssertion.accept(alias, followerAliasMetaData); + } + }); + } + + private void assertAliasExistence(final String alias, final boolean exists) throws Exception { + assertBusy(() -> { + // we must check serially because aliases exist will return true if any but not necessarily all of the requested aliases exist + final AliasesExistResponse response = followerClient().admin() + .indices() + .aliasesExist(new GetAliasesRequest().indices("follower").aliases(alias)) + .get(); + if (exists) { + assertTrue("alias [" + alias + "] did not exist", response.exists()); + } else { + assertFalse("alias [" + alias + "] exists", response.exists()); + } + }); + } + + private AliasMetaData getAliasMetaData(final GetAliasesResponse response, final String index, final String alias) { + final Optional maybeAliasMetaData = + response.getAliases().get(index).stream().filter(a -> a.getAlias().equals(alias)).findFirst(); + assertTrue("alias [" + alias + "] did not exist", maybeAliasMetaData.isPresent()); + return maybeAliasMetaData.get(); + } + + private CheckedRunnable assertShardFollowTask(final int numberOfPrimaryShards) { + return () -> { + final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData taskMetadata = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertNotNull("task metadata for follower should exist", taskMetadata); + + final ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + listTasksRequest.setActions(ShardFollowTask.NAME + "[c]"); + final ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).actionGet(); + assertThat("expected no node failures", listTasksResponse.getNodeFailures().size(), equalTo(0)); + assertThat("expected no task failures", listTasksResponse.getTaskFailures().size(), equalTo(0)); + + final List taskInfos = listTasksResponse.getTasks(); + assertThat("expected a task for each shard", taskInfos.size(), equalTo(numberOfPrimaryShards)); + final Collection> shardFollowTasks = + taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); + for (final PersistentTasksCustomMetaData.PersistentTask shardFollowTask : shardFollowTasks) { + TaskInfo taskInfo = null; + final String expectedId = "id=" + shardFollowTask.getId(); + for (final TaskInfo info : taskInfos) { + if (expectedId.equals(info.getDescription())) { + taskInfo = info; + break; + } + } + assertNotNull("task info for shard follow task [" + expectedId + "] should exist", taskInfo); + } + }; + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index a5b28caf9dfb2..9c785ecd22c2f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -15,6 +15,7 @@ public class ShardChangesResponseTests extends AbstractWireSerializingTestCase fromToSlot = new HashMap<>(); @Override @@ -122,6 +127,11 @@ protected void innerUpdateSettings(LongConsumer handler, Consumer err handler.accept(settingsVersion); } + @Override + protected void innerUpdateAliases(LongConsumer handler, Consumer errorHandler) { + handler.accept(aliasesVersion); + } + @Override protected void innerSendBulkShardOperationsRequest( String followerHistoryUUID, List operations, @@ -172,8 +182,15 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co assert from >= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), - new Translog.Operation[0], 1L)); + handler.accept(new ShardChangesAction.Response( + 0L, + 0L, + 0L, + globalCheckpoint, + maxSeqNo, + randomNonNegativeLong(), + new Translog.Operation[0], + 1L)); } }; threadPool.generic().execute(task); @@ -233,10 +250,16 @@ private void tearDown() { }; } - private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long startSettingsVersion, int maxOperationCount) { + private static TestRun createTestRun( + final long startSeqNo, + final long startMappingVersion, + final long startSettingsVersion, + final long startAliasesVersion, + final int maxOperationCount) { long prevGlobalCheckpoint = startSeqNo; long mappingVersion = startMappingVersion; long settingsVersion = startSettingsVersion; + long aliasesVersion = startAliasesVersion; int numResponses = randomIntBetween(16, 256); Map> responses = new HashMap<>(numResponses); for (int i = 0; i < numResponses; i++) { @@ -247,7 +270,9 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, if (sometimes()) { settingsVersion++; } - + if (sometimes()) { + aliasesVersion++; + } if (sometimes()) { List item = new ArrayList<>(); // Sometimes add a random retryable error @@ -268,6 +293,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, new ShardChangesAction.Response( mappingVersion, settingsVersion, + aliasesVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, randomNonNegativeLong(), @@ -293,6 +319,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersion, settingsVersion, + aliasesVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, randomNonNegativeLong(), @@ -312,6 +339,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersion, settingsVersion, + aliasesVersion, localLeaderGCP, localLeaderGCP, randomNonNegativeLong(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 9cac01d278e74..413960b69c834 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -59,6 +59,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomReadExceptions(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); @@ -80,6 +81,7 @@ protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInst assertThat(newInstance.writeBufferOperationCount(), equalTo(expectedInstance.writeBufferOperationCount())); assertThat(newInstance.followerMappingVersion(), equalTo(expectedInstance.followerMappingVersion())); assertThat(newInstance.followerSettingsVersion(), equalTo(expectedInstance.followerSettingsVersion())); + assertThat(newInstance.followerAliasesVersion(), equalTo(expectedInstance.followerAliasesVersion())); assertThat(newInstance.totalReadTimeMillis(), equalTo(expectedInstance.totalReadTimeMillis())); assertThat(newInstance.successfulReadRequests(), equalTo(expectedInstance.successfulReadRequests())); assertThat(newInstance.failedReadRequests(), equalTo(expectedInstance.failedReadRequests())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 09d00dc6a33ac..ef1dc43869ac4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -72,6 +72,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue mappingVersions; private Queue settingsUpdateFailures; private Queue settingsVersions; + private Queue aliasesUpdateFailures; + private Queue aliasesVersions; private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; @@ -88,7 +90,7 @@ public void testCoordinateReads() { task.coordinateReads(); assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request shardChangesRequests.clear(); - task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 60L)); + task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 1L, 60L)); assertThat(shardChangesRequests, contains(new long[][]{ {6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}} )); @@ -113,7 +115,7 @@ public void testMaxWriteBufferCount() { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); @@ -139,7 +141,7 @@ public void testMaxWriteBufferSize() { shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached ShardFollowNodeTaskStatus status = task.getStatus(); @@ -204,7 +206,7 @@ public void testTaskCancelledAfterReadLimitHasBeenReached() { task.markAsCompleted(); shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 31L)); + task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 1L, 31L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled @@ -234,7 +236,7 @@ public void testTaskCancelledAfterWriteBufferLimitHasBeenReached() { task.markAsCompleted(); shardChangesRequests.clear(); // Also invokes the coordinatesReads() method: - task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L)); + task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L)); assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled @@ -483,7 +485,7 @@ public void testHandleReadResponse() { startTask(task, 63, -1); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); task.innerHandleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -513,7 +515,7 @@ public void testReceiveLessThanRequested() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 1L, 31L); task.innerHandleReadResponse(0L, 63L, response); assertThat(shardChangesRequests.size(), equalTo(1)); @@ -542,7 +544,7 @@ public void testCancelAndReceiveLessThanRequested() { shardChangesRequests.clear(); task.markAsCompleted(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 31L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 1L, 31L); task.innerHandleReadResponse(0L, 64L, response); assertThat(shardChangesRequests.size(), equalTo(0)); @@ -568,7 +570,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L)); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 0, 100, new Translog.Operation[0], 1L)); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -591,7 +593,7 @@ public void testMappingUpdate() { mappingVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -620,7 +622,7 @@ public void testMappingUpdateRetryableError() { } mappingVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(mappingUpdateFailures.size(), equalTo(0)); @@ -645,7 +647,7 @@ public void testMappingUpdateNonRetryableError() { mappingUpdateFailures.add(new RuntimeException()); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 0L, 64L); task.handleReadResponse(0L, 64L, response); assertThat(bulkShardOperationRequests.size(), equalTo(0)); @@ -668,7 +670,7 @@ public void testSettingsUpdate() { settingsVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 0L, 63L); task.handleReadResponse(0L, 63L, response); assertThat(bulkShardOperationRequests.size(), equalTo(1)); @@ -677,6 +679,7 @@ public void testSettingsUpdate() { ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.followerMappingVersion(), equalTo(0L)); assertThat(status.followerSettingsVersion(), equalTo(1L)); + assertThat(status.followerAliasesVersion(), equalTo(0L)); assertThat(status.outstandingReadRequests(), equalTo(1)); assertThat(status.outstandingWriteRequests(), equalTo(1)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -698,15 +701,16 @@ public void testSettingsUpdateRetryableError() { } settingsVersions.add(1L); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 0L, 63L); task.handleReadResponse(0L, 63L, response); - assertThat(mappingUpdateFailures.size(), equalTo(0)); + assertThat(settingsUpdateFailures.size(), equalTo(0)); assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(task.isStopped(), equalTo(false)); ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.followerMappingVersion(), equalTo(0L)); assertThat(status.followerSettingsVersion(), equalTo(1L)); + assertThat(status.followerAliasesVersion(), equalTo(0L)); assertThat(status.outstandingReadRequests(), equalTo(1)); assertThat(status.outstandingWriteRequests(), equalTo(1)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -723,7 +727,7 @@ public void testSettingsUpdateNonRetryableError() { settingsUpdateFailures.add(new RuntimeException()); task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 0L, 64L); task.handleReadResponse(0L, 64L, response); assertThat(bulkShardOperationRequests.size(), equalTo(0)); @@ -731,6 +735,89 @@ public void testSettingsUpdateNonRetryableError() { ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.followerMappingVersion(), equalTo(0L)); assertThat(status.followerSettingsVersion(), equalTo(0L)); + assertThat(status.followerAliasesVersion(), equalTo(0L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(0)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testAliasUpdate() { + final ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + final ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + aliasesVersions.add(1L); + task.coordinateReads(); + final ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); + task.handleReadResponse(0L, 63L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); + + final ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(0L)); + assertThat(status.followerAliasesVersion(), equalTo(1L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(1)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + assertThat(status.followerGlobalCheckpoint(), equalTo(-1L)); + } + + public void testAliasUpdateRetryableError() { + final ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + final ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + int max = randomIntBetween(1, 30); + for (int i = 0; i < max; i++) { + aliasesUpdateFailures.add(new ConnectException()); + } + aliasesVersions.add(1L); + task.coordinateReads(); + final ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); + task.handleReadResponse(0L, 63L, response); + + assertThat(aliasesUpdateFailures.size(), equalTo(0)); + assertThat(bulkShardOperationRequests.size(), equalTo(1)); + assertThat(task.isStopped(), equalTo(false)); + final ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(0L)); + assertThat(status.followerAliasesVersion(), equalTo(1L)); + assertThat(status.outstandingReadRequests(), equalTo(1)); + assertThat(status.outstandingWriteRequests(), equalTo(1)); + assertThat(status.lastRequestedSeqNo(), equalTo(63L)); + assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); + } + + public void testAliasUpdateNonRetryableError() { + final ShardFollowTaskParams params = new ShardFollowTaskParams(); + params.maxReadRequestOperationCount = 64; + params.maxOutstandingReadRequests = 1; + params.maxOutstandingWriteRequests = 1; + final ShardFollowNodeTask task = createShardFollowTask(params); + startTask(task, 63, -1); + + aliasesUpdateFailures.add(new RuntimeException()); + task.coordinateReads(); + final ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 0L, 1L, 64L); + task.handleReadResponse(0L, 64L, response); + + assertThat(bulkShardOperationRequests.size(), equalTo(0)); + assertThat(task.isStopped(), equalTo(true)); + final ShardFollowNodeTaskStatus status = task.getStatus(); + assertThat(status.followerMappingVersion(), equalTo(0L)); + assertThat(status.followerSettingsVersion(), equalTo(0L)); + assertThat(status.followerAliasesVersion(), equalTo(0L)); assertThat(status.outstandingReadRequests(), equalTo(1)); assertThat(status.outstandingWriteRequests(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -752,7 +839,7 @@ public void testCoordinateWrites() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(128L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -772,7 +859,7 @@ public void testMaxOutstandingWrites() { params.maxWriteRequestOperationCount = 64; params.maxOutstandingWriteRequests = 2; ShardFollowNodeTask task = createShardFollowTask(params); - ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -785,7 +872,7 @@ public void testMaxOutstandingWrites() { params.maxOutstandingWriteRequests = 4; // change to 4 outstanding writers task = createShardFollowTask(params); - response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); + response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -804,7 +891,7 @@ public void testMaxWriteRequestCount() { params.maxWriteRequestOperationCount = 8; params.maxOutstandingWriteRequests = 32; ShardFollowNodeTask task = createShardFollowTask(params); - ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -835,7 +922,7 @@ public void testRetryableError() { for (int i = 0; i < max; i++) { writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -864,7 +951,7 @@ public void testNonRetryableError() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); writeFailures.add(new RuntimeException()); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -891,7 +978,7 @@ public void testMaxWriteRequestSize() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 64L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 64L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -914,7 +1001,7 @@ public void testHandleWriteResponse() { shardChangesRequests.clear(); followerGlobalCheckpoints.add(63L); - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L); + ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 63L, response); @@ -1013,6 +1100,8 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params) mappingVersions = new LinkedList<>(); settingsUpdateFailures = new LinkedList<>(); settingsVersions = new LinkedList<>(); + aliasesUpdateFailures = new LinkedList<>(); + aliasesVersions = new LinkedList<>(); leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); @@ -1048,6 +1137,20 @@ protected void innerUpdateSettings(LongConsumer handler, Consumer err } } + @Override + protected void innerUpdateAliases(final LongConsumer handler, final Consumer errorHandler) { + final Exception failure = aliasesUpdateFailures.poll(); + if (failure != null) { + errorHandler.accept(failure); + return; + } + + final Long aliasesVersion = aliasesVersions.poll(); + if (aliasesVersion != null) { + handler.accept(aliasesVersion); + } + } + @Override protected void innerSendBulkShardOperationsRequest( String followerHistoryUUID, final List operations, @@ -1086,6 +1189,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con final ShardChangesAction.Response response = new ShardChangesAction.Response( mappingVersions.poll(), 0L, + 0L, leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), randomNonNegativeLong(), @@ -1153,6 +1257,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro long toSeqNo, long mappingVersion, long settingsVersion, + long aliasesVersion, long leaderGlobalCheckPoint) { List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -1163,6 +1268,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro return new ShardChangesAction.Response( mappingVersion, settingsVersion, + aliasesVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, randomNonNegativeLong(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index abef313d0b017..9da7e1522d2a3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -514,6 +514,12 @@ protected void innerUpdateSettings(LongConsumer handler, Consumer err handler.accept(1L); } + @Override + protected void innerUpdateAliases(LongConsumer handler, Consumer errorHandler) { + // no-op as alias updates are not tested here + handler.accept(1L); + } + @Override protected void innerSendBulkShardOperationsRequest( final String followerHistoryUUID, @@ -544,14 +550,21 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); if (from > seqNoStats.getGlobalCheckpoint()) { - handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats, - maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L)); + handler.accept(ShardChangesAction.getResponse( + 1L, + 1L, + 1L, + seqNoStats, + maxSeqNoOfUpdatesOrDeletes, + ShardChangesAction.EMPTY_OPERATIONS_ARRAY, + 1L)); return; } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize()); // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response( + 1L, 1L, 1L, seqNoStats.getGlobalCheckpoint(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 72ba0cd70672e..4e9aadf8d82a8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -57,6 +57,7 @@ static FollowStatsAction.StatsResponses createStatsResponse() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), Collections.emptyNavigableMap(), randomLong(), randomBoolean() ? new ElasticsearchException("fatal error") : null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java index fd8904307db3e..12c251bb9d775 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java @@ -95,6 +95,7 @@ public void testToXContent() throws IOException { final long writeBufferSizeInBytes = randomNonNegativeLong(); final long followerMappingVersion = randomNonNegativeLong(); final long followerSettingsVersion = randomNonNegativeLong(); + final long followerAliasesVersion = randomNonNegativeLong(); final long totalReadTimeMillis = randomLongBetween(0, 4096); final long totalReadRemoteExecTimeMillis = randomLongBetween(0, 4096); final long successfulReadRequests = randomNonNegativeLong(); @@ -126,6 +127,7 @@ public void testToXContent() throws IOException { writeBufferSizeInBytes, followerMappingVersion, followerSettingsVersion, + followerAliasesVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, @@ -173,6 +175,7 @@ public void testToXContent() throws IOException { + "\"write_buffer_size_in_bytes\":" + writeBufferSizeInBytes + "," + "\"follower_mapping_version\":" + followerMappingVersion + "," + "\"follower_settings_version\":" + followerSettingsVersion + "," + + "\"follower_aliases_version\":" + followerAliasesVersion + "," + "\"total_read_time_millis\":" + totalReadTimeMillis + "," + "\"total_read_remote_exec_time_millis\":" + totalReadRemoteExecTimeMillis + "," + "\"successful_read_requests\":" + successfulReadRequests + "," @@ -218,6 +221,7 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { 1, 1, 1, + 1, 100, 50, 10, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 33a5d495c1631..52ac0b7bd4c4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ccr; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; @@ -49,6 +50,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { private static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes"); private static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version"); private static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version"); + private static final ParseField FOLLOWER_ALIASES_VERSION_FIELD = new ParseField("follower_aliases_version"); private static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis"); private static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis"); private static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests"); @@ -93,12 +95,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status { (long) args[22], (long) args[23], (long) args[24], + (long) args[25], new TreeMap<>( - ((List>>) args[25]) + ((List>>) args[26]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), - (long) args[26], - (ElasticsearchException) args[27])); + (long) args[27], + (ElasticsearchException) args[28])); public static final String READ_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-read-exceptions-entry"; @@ -123,6 +126,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status { STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_ALIASES_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD); @@ -243,6 +247,12 @@ public long followerSettingsVersion() { return followerSettingsVersion; } + private final long followerAliasesVersion; + + public long followerAliasesVersion() { + return followerAliasesVersion; + } + private final long totalReadTimeMillis; public long totalReadTimeMillis() { @@ -337,6 +347,7 @@ public ShardFollowNodeTaskStatus( final long writeBufferSizeInBytes, final long followerMappingVersion, final long followerSettingsVersion, + final long followerAliasesVersion, final long totalReadTimeMillis, final long totalReadRemoteExecTimeMillis, final long successfulReadRequests, @@ -365,6 +376,7 @@ public ShardFollowNodeTaskStatus( this.writeBufferSizeInBytes = writeBufferSizeInBytes; this.followerMappingVersion = followerMappingVersion; this.followerSettingsVersion = followerSettingsVersion; + this.followerAliasesVersion = followerAliasesVersion; this.totalReadTimeMillis = totalReadTimeMillis; this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis; this.successfulReadRequests = successfulReadRequests; @@ -396,6 +408,11 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException { this.writeBufferSizeInBytes = in.readVLong(); this.followerMappingVersion = in.readVLong(); this.followerSettingsVersion = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.followerAliasesVersion = in.readVLong(); + } else { + this.followerAliasesVersion = 0L; + } this.totalReadTimeMillis = in.readVLong(); this.totalReadRemoteExecTimeMillis = in.readVLong(); this.successfulReadRequests = in.readVLong(); @@ -434,6 +451,9 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(writeBufferSizeInBytes); out.writeVLong(followerMappingVersion); out.writeVLong(followerSettingsVersion); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeVLong(followerAliasesVersion); + } out.writeVLong(totalReadTimeMillis); out.writeVLong(totalReadRemoteExecTimeMillis); out.writeVLong(successfulReadRequests); @@ -484,6 +504,7 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P new ByteSizeValue(writeBufferSizeInBytes)); builder.field(FOLLOWER_MAPPING_VERSION_FIELD.getPreferredName(), followerMappingVersion); builder.field(FOLLOWER_SETTINGS_VERSION_FIELD.getPreferredName(), followerSettingsVersion); + builder.field(FOLLOWER_ALIASES_VERSION_FIELD.getPreferredName(), followerAliasesVersion); builder.humanReadableField( TOTAL_READ_TIME_MILLIS_FIELD.getPreferredName(), "total_read_time", @@ -564,7 +585,8 @@ public boolean equals(final Object o) { writeBufferOperationCount == that.writeBufferOperationCount && writeBufferSizeInBytes == that.writeBufferSizeInBytes && followerMappingVersion == that.followerMappingVersion && - followerSettingsVersion== that.followerSettingsVersion && + followerSettingsVersion == that.followerSettingsVersion && + followerAliasesVersion == that.followerAliasesVersion && totalReadTimeMillis == that.totalReadTimeMillis && totalReadRemoteExecTimeMillis == that.totalReadRemoteExecTimeMillis && successfulReadRequests == that.successfulReadRequests && @@ -604,6 +626,7 @@ public int hashCode() { writeBufferSizeInBytes, followerMappingVersion, followerSettingsVersion, + followerAliasesVersion, totalReadTimeMillis, totalReadRemoteExecTimeMillis, successfulReadRequests, diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index b47ebe79129c1..ee965bd629c7f 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -974,6 +974,9 @@ "follower_settings_version": { "type": "long" }, + "follower_aliases_version": { + "type": "long" + }, "total_read_time_millis": { "type": "long" }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java index a0ee01a240347..fe873066dc594 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java @@ -188,6 +188,7 @@ private static ShardFollowNodeTaskStatus createShardFollowTaskStatus(int shardId 0, 0, 0, + 0, Collections.emptyNavigableMap(), 0, null diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index b185a425934eb..aca20286eb377 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -66,8 +66,8 @@ public void testBasicCCRAndILMIntegration() throws Exception { putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24)); followIndex(indexName, indexName); ensureGreen(indexName); - // Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster: - client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs")); + + assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + indexName + "/_alias/logs")))); try (RestClient leaderClient = buildLeaderClient()) { index(leaderClient, indexName, "1"); @@ -226,8 +226,8 @@ public void testCcrAndIlmWithRollover() throws Exception { // Check that it got replicated to the follower assertBusy(() -> assertTrue(indexExists(indexName))); - // Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster: - client().performRequest(new Request("PUT", "/" + indexName + "/_alias/" + alias)); + // check that the alias was replicated + assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + indexName + "/_alias/" + alias)))); index(leaderClient, indexName, "1"); assertDocumentExists(leaderClient, indexName, "1"); @@ -252,7 +252,6 @@ public void testCcrAndIlmWithRollover() throws Exception { // And the old index should have a write block and indexing complete set assertThat(getIndexSetting(leaderClient, indexName, "index.blocks.write"), equalTo("true")); assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true")); - }); assertBusy(() -> { @@ -266,6 +265,8 @@ public void testCcrAndIlmWithRollover() throws Exception { assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue()); // The next index should have been created on the follower as well indexExists(nextIndexName); + // and the alias should be on the next index + assertOK(client().performRequest(new Request("HEAD", "/" + nextIndexName + "/_alias/" + alias))); }); assertBusy(() -> { @@ -281,6 +282,74 @@ public void testCcrAndIlmWithRollover() throws Exception { } } + public void testAliasReplicatedOnShrink() throws Exception { + final String indexName = "shrink-alias-test"; + final String shrunkenIndexName = "shrink-" + indexName; + final String policyName = "shrink-test-policy"; + + final int numberOfAliases = randomIntBetween(0, 4); + + if ("leader".equals(targetCluster)) { + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 0) + .put("index.lifecycle.name", policyName) // this policy won't exist on the leader, that's fine + .build(); + final StringBuilder aliases = new StringBuilder(); + boolean first = true; + for (int i = 0; i < numberOfAliases; i++) { + if (first == false) { + aliases.append(","); + } + final Boolean isWriteIndex = randomFrom(new Boolean[] { null, false, true }); + if (isWriteIndex == null) { + aliases.append("\"alias_").append(i).append("\":{}"); + } else { + aliases.append("\"alias_").append(i).append("\":{\"is_write_index\":").append(isWriteIndex).append("}"); + } + first = false; + } + createIndex(indexName, indexSettings, "", aliases.toString()); + ensureGreen(indexName); + } else if ("follow".equals(targetCluster)) { + // Create a policy with just a Shrink action on the follower + putShrinkOnlyPolicy(client(), policyName); + + // Follow the index + followIndex(indexName, indexName); + // Make sure it actually took + assertBusy(() -> assertTrue(indexExists(indexName))); + // This should now be in the "warm" phase waiting for the index to be ready to unfollow + assertBusy(() -> assertILMPolicy(client(), indexName, policyName, "warm", "unfollow", "wait-for-indexing-complete")); + + // Set the indexing_complete flag on the leader so the index will actually unfollow + try (RestClient leaderClient = buildLeaderClient()) { + updateIndexSettings(leaderClient, indexName, Settings.builder() + .put("index.lifecycle.indexing_complete", true) + .build() + ); + } + + // Wait for the setting to get replicated + assertBusy(() -> assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true"))); + + // Wait for the index to continue with its lifecycle and be shrunk + assertBusy(() -> assertTrue(indexExists(shrunkenIndexName))); + + // assert the aliases were replicated + assertBusy(() -> { + for (int i = 0; i < numberOfAliases; i++) { + assertOK(client().performRequest(new Request("HEAD", "/" + shrunkenIndexName + "/_alias/alias_" + i))); + } + }); + assertBusy(() -> assertOK(client().performRequest(new Request("HEAD", "/" + shrunkenIndexName + "/_alias/" + indexName)))); + + // Wait for the index to complete its policy + assertBusy(() -> assertILMPolicy(client(), shrunkenIndexName, policyName, "completed", "completed", "completed")); + } + } + public void testUnfollowInjectedBeforeShrink() throws Exception { final String indexName = "shrink-test"; final String shrunkenIndexName = "shrink-" + indexName;