From 0beb3c93d1ca6dd891269d902bde0e32b9983bbc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 5 Feb 2019 17:05:19 +0100 Subject: [PATCH] Clean up duplicate follow config parameter code (#37688) Introduced FollowParameters class that put follow, resume follow, put auto follow pattern requests and follow info response classes reuse. The FollowParameters class had the fields, getters etc. for the common parameters that all these APIs have. Also binary and xcontent serialization / parsing is handled by this class. The follow, resume follow, put auto follow pattern request classes originally used optional non primitive fields, so FollowParameters has that too and the follow info api can handle that now too. Also the followerIndex field can in production only be specified via the url path. If it is also specified via the request body then it must have the same value as is specified in the url path. This option only existed to xcontent testing. However the AbstractSerializingTestCase base class now also supports createXContextTestInstance() to provide a different test instance when testing xcontent, so allowing followerIndex to be specified via the request body is no longer needed. By moving the followerIndex field from Body to ResumeFollowAction.Request class and not allowing the followerIndex field to be specified via the request body the Body class is redundant and can be removed. The ResumeFollowAction.Request class can then directly use the FollowParameters class. For consistency I also removed the ability to specified followerIndex in the put follow api and the name in put auto follow pattern api via the request body. --- .../client/ccr/PutFollowRequest.java | 2 - .../client/ccr/ResumeFollowRequest.java | 3 - .../client/ccr/PutFollowRequestTests.java | 5 +- .../client/ccr/ResumeFollowRequestTests.java | 8 +- .../ccr/action/AutoFollowCoordinator.java | 26 +- .../ccr/action/TransportFollowInfoAction.java | 25 +- .../TransportPutAutoFollowPatternAction.java | 23 +- .../ccr/action/TransportPutFollowAction.java | 19 +- .../action/TransportResumeFollowAction.java | 53 +-- .../elasticsearch/xpack/CcrIntegTestCase.java | 8 +- .../xpack/CcrSingleNodeTestCase.java | 8 +- .../elasticsearch/xpack/ccr/AutoFollowIT.java | 69 ++-- .../xpack/ccr/FollowerFailOverIT.java | 30 +- .../xpack/ccr/IndexFollowingIT.java | 14 +- .../xpack/ccr/LocalIndexFollowingIT.java | 4 +- .../action/AutoFollowCoordinatorTests.java | 6 +- .../ccr/action/FollowInfoResponseTests.java | 79 +---- .../PutAutoFollowPatternRequestTests.java | 54 +-- .../action/PutFollowActionRequestTests.java | 21 +- .../ResumeFollowActionRequestTests.java | 48 ++- .../core/ccr/action/FollowInfoAction.java | 158 +-------- .../core/ccr/action/FollowParameters.java | 314 ++++++++++++++++++ .../action/PutAutoFollowPatternAction.java | 305 ++++------------- .../core/ccr/action/PutFollowAction.java | 142 +++----- .../core/ccr/action/ResumeFollowAction.java | 297 ++--------------- 25 files changed, 705 insertions(+), 1016 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowParameters.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java index 8307b04bd7087..9c9e3f92b8173 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java @@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable, static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); - static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); private final String remoteCluster; private final String leaderIndex; @@ -55,7 +54,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); - builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); toXContentFragment(builder, params); builder.endObject(); return builder; diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java index d9ceb666afd2f..972f327134749 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java @@ -26,8 +26,6 @@ import java.io.IOException; import java.util.Objects; -import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD; - public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject { private final String followerIndex; @@ -39,7 +37,6 @@ public ResumeFollowRequest(String followerIndex) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); toXContentFragment(builder, params); builder.endObject(); return builder; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/PutFollowRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/PutFollowRequestTests.java index 35353ce4a96f9..1f6a3d9f0ac28 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/PutFollowRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/PutFollowRequestTests.java @@ -31,12 +31,11 @@ public class PutFollowRequestTests extends AbstractXContentTestCase { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("test_parser", - true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2])); + true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], "followerIndex")); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD); PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT); PARSER.declareField( PutFollowRequest::setMaxReadRequestSize, @@ -82,7 +81,7 @@ protected boolean supportsUnknownFields() { @Override protected PutFollowRequest createTestInstance() { PutFollowRequest putFollowRequest = - new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4)); + new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), "followerIndex"); if (randomBoolean()) { putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE)); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java index 3f00891331839..d5d2b7e25539f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractXContentTestCase; @@ -30,11 +29,10 @@ public class ResumeFollowRequestTests extends AbstractXContentTestCase { - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("test_parser", - true, (args) -> new ResumeFollowRequest((String) args[0])); + private static final ObjectParser PARSER = new ObjectParser<>("test_parser", + true, () -> new ResumeFollowRequest("followerIndex")); static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD); PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT); PARSER.declareField( ResumeFollowRequest::setMaxReadRequestSize, @@ -79,7 +77,7 @@ protected boolean supportsUnknownFields() { @Override protected ResumeFollowRequest createTestInstance() { - ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4)); + ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("followerIndex"); if (randomBoolean()) { resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 82153e77fc35e..03e936ca8c2ea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -41,7 +41,6 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; -import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.util.ArrayList; import java.util.Collections; @@ -514,23 +513,20 @@ private void followLeaderIndex(String autoFollowPattenName, final String leaderIndexName = indexToFollow.getName(); final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); - ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request(); - followRequest.setFollowerIndex(followIndexName); - followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount()); - followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize()); - followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests()); - followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount()); - followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize()); - followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests()); - followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount()); - followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); - followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay()); - followRequest.setReadPollTimeout(pattern.getPollTimeout()); - PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster(remoteCluster); request.setLeaderIndex(indexToFollow.getName()); - request.setFollowRequest(followRequest); + request.setFollowerIndex(followIndexName); + request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount()); + request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize()); + request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests()); + request.getParameters().setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount()); + request.getParameters().setMaxWriteRequestSize(pattern.getMaxWriteRequestSize()); + request.getParameters().setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests()); + request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount()); + request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); + request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay()); + request.getParameters().setReadPollTimeout(pattern.getPollTimeout()); // Execute if the create and follow api call succeeds: Runnable successHandler = () -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java index df227639137ae..cb96b8bb29851 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java @@ -22,7 +22,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; -import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; @@ -97,18 +97,17 @@ static List getFollowInfos(List concreteFollowerIndices, C String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); if (result.isPresent()) { ShardFollowTask params = result.get(); - FollowParameters followParameters = new FollowParameters( - params.getMaxReadRequestOperationCount(), - params.getMaxReadRequestSize(), - params.getMaxOutstandingReadRequests(), - params.getMaxWriteRequestOperationCount(), - params.getMaxWriteRequestSize(), - params.getMaxOutstandingWriteRequests(), - params.getMaxWriteBufferCount(), - params.getMaxWriteBufferSize(), - params.getMaxRetryDelay(), - params.getReadPollTimeout() - ); + FollowParameters followParameters = new FollowParameters(); + followParameters.setMaxOutstandingReadRequests(params.getMaxOutstandingReadRequests()); + followParameters.setMaxOutstandingWriteRequests(params.getMaxOutstandingWriteRequests()); + followParameters.setMaxReadRequestOperationCount(params.getMaxReadRequestOperationCount()); + followParameters.setMaxWriteRequestOperationCount(params.getMaxWriteRequestOperationCount()); + followParameters.setMaxReadRequestSize(params.getMaxReadRequestSize()); + followParameters.setMaxWriteRequestSize(params.getMaxWriteRequestSize()); + followParameters.setMaxWriteBufferCount(params.getMaxWriteBufferCount()); + followParameters.setMaxWriteBufferSize(params.getMaxWriteBufferSize()); + followParameters.setMaxRetryDelay(params.getMaxRetryDelay()); + followParameters.setReadPollTimeout(params.getReadPollTimeout()); followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters)); } else { followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null)); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 8c722942d19b0..d5127cbb74d4b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -147,8 +147,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request, markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), previousPattern, followedIndexUUIDs); } else { - markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), - followedIndexUUIDs); + markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs); } if (filteredHeaders != null) { @@ -159,16 +158,16 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request, request.getRemoteCluster(), request.getLeaderIndexPatterns(), request.getFollowIndexNamePattern(), - request.getMaxReadRequestOperationCount(), - request.getMaxReadRequestSize(), - request.getMaxConcurrentReadBatches(), - request.getMaxWriteRequestOperationCount(), - request.getMaxWriteRequestSize(), - request.getMaxConcurrentWriteBatches(), - request.getMaxWriteBufferCount(), - request.getMaxWriteBufferSize(), - request.getMaxRetryDelay(), - request.getReadPollTimeout()); + request.getParameters().getMaxReadRequestOperationCount(), + request.getParameters().getMaxReadRequestSize(), + request.getParameters().getMaxOutstandingReadRequests(), + request.getParameters().getMaxWriteRequestOperationCount(), + request.getParameters().getMaxWriteRequestSize(), + request.getParameters().getMaxOutstandingWriteRequests(), + request.getParameters().getMaxWriteBufferCount(), + request.getParameters().getMaxWriteBufferSize(), + request.getParameters().getMaxRetryDelay(), + request.getParameters().getReadPollTimeout()); patterns.put(request.getName(), autoFollowPattern); ClusterState.Builder newState = ClusterState.builder(localState); newState.metaData(MetaData.builder(localState.getMetaData()) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 27f3b60fb5291..84250baaeaa21 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; @@ -126,18 +127,18 @@ private void createFollowerIndex( // soft deletes are enabled by default on indices created on 7.0.0 or later if (leaderIndexMetaData.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(leaderIndexMetaData.getSettings()).onOrAfter(Version.V_7_0_0)) == false) { - listener.onFailure( - new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not have soft deletes enabled")); + listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + + "] does not have soft deletes enabled")); return; } final Settings.Builder settingsBuilder = Settings.builder() - .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex()) + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex()) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster(); final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) .indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$") - .renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout()) + .renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout()) .indexSettings(settingsBuilder); final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders()); @@ -217,10 +218,14 @@ private void initiateFollowing( final PutFollowAction.Request request, final ActionListener listener) { assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; - activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()}, + activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()}, request.waitForActiveShards(), request.timeout(), result -> { if (result) { - client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( + FollowParameters parameters = request.getParameters(); + ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request(); + resumeFollowRequest.setFollowerIndex(request.getFollowerIndex()); + resumeFollowRequest.setParameters(new FollowParameters(parameters)); + client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap( r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), listener::onFailure )); @@ -232,6 +237,6 @@ private void initiateFollowing( @Override protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) { - return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex()); + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 0a1a22215a04b..150e1df7a3bae 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -16,8 +16,8 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -47,6 +47,7 @@ import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; @@ -177,8 +178,7 @@ void start( for (int shardId = 0; shardId < numShards; shardId++) { String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; - - final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request, + final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request.getParameters(), leaderIndexMetadata, followIndexMetadata, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId)); } @@ -190,6 +190,8 @@ static void validate( final IndexMetaData followIndex, final String[] leaderIndexHistoryUUID, final MapperService followerMapperService) { + FollowParameters parameters = request.getParameters(); + Map ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); if (ccrIndexMetadata == null) { throw new IllegalArgumentException("follow index ["+ followIndex.getIndex().getName() + "] does not have ccr metadata"); @@ -197,8 +199,8 @@ static void validate( String leaderIndexUUID = leaderIndex.getIndex().getUUID(); String recordedLeaderIndexUUID = ccrIndexMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); if (leaderIndexUUID.equals(recordedLeaderIndexUUID) == false) { - throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] should reference [" + leaderIndexUUID + - "] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index"); + throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] should reference [" + + leaderIndexUUID + "] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index"); } String[] recordedHistoryUUIDs = extractLeaderShardHistoryUUIDs(ccrIndexMetadata); @@ -219,7 +221,8 @@ static void validate( "] does not have soft deletes enabled"); } if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(followIndex.getSettings()) == false) { - throw new IllegalArgumentException("follower index [" + request.getFollowerIndex() + "] does not have soft deletes enabled"); + throw new IllegalArgumentException("follower index [" + request.getFollowerIndex() + + "] does not have soft deletes enabled"); } if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) { throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() + @@ -251,69 +254,69 @@ static void validate( private static ShardFollowTask createShardFollowTask( int shardId, String clusterAliasName, - ResumeFollowAction.Request request, + FollowParameters parameters, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, Map filteredHeaders ) { int maxReadRequestOperationCount; - if (request.getMaxReadRequestOperationCount() != null) { - maxReadRequestOperationCount = request.getMaxReadRequestOperationCount(); + if (parameters.getMaxReadRequestOperationCount() != null) { + maxReadRequestOperationCount = parameters.getMaxReadRequestOperationCount(); } else { maxReadRequestOperationCount = DEFAULT_MAX_READ_REQUEST_OPERATION_COUNT; } ByteSizeValue maxReadRequestSize; - if (request.getMaxReadRequestSize() != null) { - maxReadRequestSize = request.getMaxReadRequestSize(); + if (parameters.getMaxReadRequestSize() != null) { + maxReadRequestSize = parameters.getMaxReadRequestSize(); } else { maxReadRequestSize = DEFAULT_MAX_READ_REQUEST_SIZE; } int maxOutstandingReadRequests; - if (request.getMaxOutstandingReadRequests() != null){ - maxOutstandingReadRequests = request.getMaxOutstandingReadRequests(); + if (parameters.getMaxOutstandingReadRequests() != null){ + maxOutstandingReadRequests = parameters.getMaxOutstandingReadRequests(); } else { maxOutstandingReadRequests = DEFAULT_MAX_OUTSTANDING_READ_REQUESTS; } final int maxWriteRequestOperationCount; - if (request.getMaxWriteRequestOperationCount() != null) { - maxWriteRequestOperationCount = request.getMaxWriteRequestOperationCount(); + if (parameters.getMaxWriteRequestOperationCount() != null) { + maxWriteRequestOperationCount = parameters.getMaxWriteRequestOperationCount(); } else { maxWriteRequestOperationCount = DEFAULT_MAX_WRITE_REQUEST_OPERATION_COUNT; } final ByteSizeValue maxWriteRequestSize; - if (request.getMaxWriteRequestSize() != null) { - maxWriteRequestSize = request.getMaxWriteRequestSize(); + if (parameters.getMaxWriteRequestSize() != null) { + maxWriteRequestSize = parameters.getMaxWriteRequestSize(); } else { maxWriteRequestSize = DEFAULT_MAX_WRITE_REQUEST_SIZE; } int maxOutstandingWriteRequests; - if (request.getMaxOutstandingWriteRequests() != null) { - maxOutstandingWriteRequests = request.getMaxOutstandingWriteRequests(); + if (parameters.getMaxOutstandingWriteRequests() != null) { + maxOutstandingWriteRequests = parameters.getMaxOutstandingWriteRequests(); } else { maxOutstandingWriteRequests = DEFAULT_MAX_OUTSTANDING_WRITE_REQUESTS; } int maxWriteBufferCount; - if (request.getMaxWriteBufferCount() != null) { - maxWriteBufferCount = request.getMaxWriteBufferCount(); + if (parameters.getMaxWriteBufferCount() != null) { + maxWriteBufferCount = parameters.getMaxWriteBufferCount(); } else { maxWriteBufferCount = DEFAULT_MAX_WRITE_BUFFER_COUNT; } ByteSizeValue maxWriteBufferSize; - if (request.getMaxWriteBufferSize() != null) { - maxWriteBufferSize = request.getMaxWriteBufferSize(); + if (parameters.getMaxWriteBufferSize() != null) { + maxWriteBufferSize = parameters.getMaxWriteBufferSize(); } else { maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE; } - TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay(); - TimeValue readPollTimeout = request.getReadPollTimeout() == null ? DEFAULT_READ_POLL_TIMEOUT : request.getReadPollTimeout(); + TimeValue maxRetryDelay = parameters.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : parameters.getMaxRetryDelay(); + TimeValue readPollTimeout = parameters.getReadPollTimeout() == null ? DEFAULT_READ_POLL_TIMEOUT : parameters.getReadPollTimeout(); return new ShardFollowTask( clusterAliasName, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index d28969bc10c8e..48ea50af9990b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -426,7 +426,9 @@ public static PutFollowAction.Request putFollow(String leaderIndex, String follo PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster("leader_cluster"); request.setLeaderIndex(leaderIndex); - request.setFollowRequest(resumeFollow(followerIndex)); + request.setFollowerIndex(followerIndex); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10)); request.waitForActiveShards(waitForActiveShards); return request; } @@ -434,8 +436,8 @@ public static PutFollowAction.Request putFollow(String leaderIndex, String follo public static ResumeFollowAction.Request resumeFollow(String followerIndex) { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); request.setFollowerIndex(followerIndex); - request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); - request.setReadPollTimeout(TimeValue.timeValueMillis(10)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10)); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 48531c7d28f9a..c2760aa5efd6b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -89,8 +89,8 @@ protected AutoFollowStats getAutoFollowStats() { protected ResumeFollowAction.Request getResumeFollowRequest(String followerIndex) { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); request.setFollowerIndex(followerIndex); - request.setMaxRetryDelay(TimeValue.timeValueMillis(1)); - request.setReadPollTimeout(TimeValue.timeValueMillis(1)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(1)); + request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(1)); return request; } @@ -98,7 +98,9 @@ protected PutFollowAction.Request getPutFollowRequest(String leaderIndex, String PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster("local"); request.setLeaderIndex(leaderIndex); - request.setFollowRequest(getResumeFollowRequest(followerIndex)); + request.setFollowerIndex(followerIndex); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(1)); + request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(1)); request.waitForActiveShards(ActiveShardCount.ONE); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 4025f647cb2a6..f12dcea4af9b5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; -import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; @@ -186,41 +186,42 @@ public void testAutoFollowParameterAreDelegated() throws Exception { // Enabling auto following: PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); - request.setName("my-pattern"); request.setRemoteCluster("leader_cluster"); request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); // Need to set this, because following an index in the same cluster request.setFollowIndexNamePattern("copy-{{leader_index}}"); if (randomBoolean()) { - request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); + request.getParameters().setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); + request.getParameters().setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE)); + request.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + request.getParameters().setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); + request.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(500)); } if (randomBoolean()) { - request.setReadPollTimeout(TimeValue.timeValueMillis(500)); + request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(500)); } if (randomBoolean()) { - request.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + request.getParameters().setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); + request.getParameters().setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong())); + request.getParameters().setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong())); } + + request.setName("my-pattern"); assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); createLeaderIndex("logs-201901", leaderIndexSettings); @@ -242,35 +243,39 @@ public void testAutoFollowParameterAreDelegated() throws Exception { FollowParameters followParameters = followerInfo.getParameters(); assertThat(followParameters, notNullValue()); - if (request.getMaxWriteBufferCount() != null) { - assertThat(followParameters.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount())); + if (request.getParameters().getMaxWriteBufferCount() != null) { + assertThat(followParameters.getMaxWriteBufferCount(), equalTo(request.getParameters().getMaxWriteBufferCount())); } - if (request.getMaxWriteBufferSize() != null) { - assertThat(followParameters.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); + if (request.getParameters().getMaxWriteBufferSize() != null) { + assertThat(followParameters.getMaxWriteBufferSize(), equalTo(request.getParameters().getMaxWriteBufferSize())); } - if (request.getMaxConcurrentReadBatches() != null) { - assertThat(followParameters.getMaxOutstandingReadRequests(), equalTo(request.getMaxConcurrentReadBatches())); + if (request.getParameters().getMaxOutstandingReadRequests() != null) { + assertThat(followParameters.getMaxOutstandingReadRequests(), + equalTo(request.getParameters().getMaxOutstandingReadRequests())); } - if (request.getMaxConcurrentWriteBatches() != null) { - assertThat(followParameters.getMaxOutstandingWriteRequests(), equalTo(request.getMaxConcurrentWriteBatches())); + if (request.getParameters().getMaxOutstandingWriteRequests() != null) { + assertThat(followParameters.getMaxOutstandingWriteRequests(), + equalTo(request.getParameters().getMaxOutstandingWriteRequests())); } - if (request.getMaxReadRequestOperationCount() != null) { - assertThat(followParameters.getMaxReadRequestOperationCount(), equalTo(request.getMaxReadRequestOperationCount())); + if (request.getParameters().getMaxReadRequestOperationCount() != null) { + assertThat(followParameters.getMaxReadRequestOperationCount(), + equalTo(request.getParameters().getMaxReadRequestOperationCount())); } - if (request.getMaxReadRequestSize() != null) { - assertThat(followParameters.getMaxReadRequestSize(), equalTo(request.getMaxReadRequestSize())); + if (request.getParameters().getMaxReadRequestSize() != null) { + assertThat(followParameters.getMaxReadRequestSize(), equalTo(request.getParameters().getMaxReadRequestSize())); } - if (request.getMaxRetryDelay() != null) { - assertThat(followParameters.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay())); + if (request.getParameters().getMaxRetryDelay() != null) { + assertThat(followParameters.getMaxRetryDelay(), equalTo(request.getParameters().getMaxRetryDelay())); } - if (request.getReadPollTimeout() != null) { - assertThat(followParameters.getReadPollTimeout(), equalTo(request.getReadPollTimeout())); + if (request.getParameters().getReadPollTimeout() != null) { + assertThat(followParameters.getReadPollTimeout(), equalTo(request.getParameters().getReadPollTimeout())); } - if (request.getMaxWriteRequestOperationCount() != null) { - assertThat(followParameters.getMaxWriteRequestOperationCount(), equalTo(request.getMaxWriteRequestOperationCount())); + if (request.getParameters().getMaxWriteRequestOperationCount() != null) { + assertThat(followParameters.getMaxWriteRequestOperationCount(), + equalTo(request.getParameters().getMaxWriteRequestOperationCount())); } - if (request.getMaxWriteRequestSize() != null) { - assertThat(followParameters.getMaxWriteRequestSize(), equalTo(request.getMaxWriteRequestSize())); + if (request.getParameters().getMaxWriteRequestSize() != null) { + assertThat(followParameters.getMaxWriteRequestSize(), equalTo(request.getParameters().getMaxWriteRequestSize())); } }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index 3dd20c4385fee..707e44310b4b7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -90,13 +90,13 @@ public void testFailOverOnFollower() throws Exception { } availableDocs.release(between(100, 200)); PutFollowAction.Request follow = putFollow("leader-index", "follower-index"); - follow.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048)); - follow.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); - follow.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); - follow.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048)); - follow.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); - follow.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); - logger.info("--> follow params {}", Strings.toString(follow.getFollowRequest())); + follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048)); + follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); + follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); + follow.getParameters().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048)); + follow.getParameters().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); + follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); + logger.info("--> follow request {}", Strings.toString(follow)); followerClient().execute(PutFollowAction.INSTANCE, follow).get(); disableDelayedAllocation("follower-index"); ensureFollowerGreen("follower-index"); @@ -151,17 +151,17 @@ public void testFollowIndexAndCloseNode() throws Exception { thread.start(); PutFollowAction.Request followRequest = putFollow("index1", "index2"); - followRequest.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048)); - followRequest.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); - followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); - followRequest.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048)); - followRequest.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); - followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); + followRequest.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048)); + followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); + followRequest.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); + followRequest.getParameters().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048)); + followRequest.getParameters().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB)); + followRequest.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); disableDelayedAllocation("index2"); - logger.info("--> follow params {}", Strings.toString(followRequest.getFollowRequest())); + logger.info("--> follow request {}", Strings.toString(followRequest)); - int maxOpsPerRead = followRequest.getFollowRequest().getMaxReadRequestOperationCount(); + int maxOpsPerRead = followRequest.getParameters().getMaxReadRequestOperationCount(); int maxNumDocsReplicated = Math.min(between(50, 500), between(maxOpsPerRead, maxOpsPerRead * 10)); availableDocs.release(maxNumDocsReplicated / 2 + 1); atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated / 3); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index eee28b5875bcc..28f845fe7d463 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -180,7 +180,7 @@ public void testFollowIndex() throws Exception { } pauseFollow("index2"); - followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { @@ -446,10 +446,10 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed / 3); PutFollowAction.Request followRequest = putFollow("index1", "index2"); - followRequest.getFollowRequest().setMaxReadRequestOperationCount(maxOpsPerRead); - followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); - followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); - followRequest.getFollowRequest().setMaxWriteBufferCount(randomIntBetween(1024, 10240)); + followRequest.getParameters().setMaxReadRequestOperationCount(maxOpsPerRead); + followRequest.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10)); + followRequest.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10)); + followRequest.getParameters().setMaxWriteBufferCount(randomIntBetween(1024, 10240)); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); availableDocs.release(numDocsIndexed * 2 + bulkSize); atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed); @@ -544,7 +544,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception { } PutFollowAction.Request followRequest = putFollow("index1", "index2"); - followRequest.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(1, ByteSizeUnit.BYTES)); + followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1, ByteSizeUnit.BYTES)); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); final Map firstBatchNumDocsPerShard = new HashMap<>(); @@ -1016,7 +1016,7 @@ public void testIndexFallBehind() throws Exception { forceMergeRequest.maxNumSegments(1); leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet(); - followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); assertBusy(() -> { List statuses = getFollowTaskStatuses("index2"); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index f50f17c9e296d..0df3f4ea47f43 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -85,7 +85,7 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep followRequest.setFollowerIndex("follower-index"); PutFollowAction.Request putFollowRequest = getPutFollowRequest("leader", "follower"); putFollowRequest.setLeaderIndex("leader-index"); - putFollowRequest.setFollowRequest(followRequest); + putFollowRequest.setFollowerIndex("follower-index"); IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> client().execute(PutFollowAction.INSTANCE, putFollowRequest).actionGet()); assertThat(error.getMessage(), equalTo("leader index [leader-index] does not have soft deletes enabled")); @@ -98,7 +98,7 @@ public void testRemoveRemoteConnection() throws Exception { request.setRemoteCluster("local"); request.setLeaderIndexPatterns(Collections.singletonList("logs-*")); request.setFollowIndexNamePattern("copy-{{leader_index}}"); - request.setReadPollTimeout(TimeValue.timeValueMillis(10)); + request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10)); assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); long previousNumberOfSuccessfulFollowedIndices = getAutoFollowStats().getNumberOfSuccessfulFollowIndices(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 4d4603d022f7d..2037c7faaa7b4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -109,7 +109,7 @@ void createAndFollow(Map headers, assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); assertThat(followRequest.getRemoteCluster(), equalTo("remote")); assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101")); - assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); } @@ -227,7 +227,7 @@ void createAndFollow(Map headers, Consumer failureHandler) { assertThat(followRequest.getRemoteCluster(), equalTo("remote")); assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101")); - assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); } @@ -284,7 +284,7 @@ void createAndFollow(Map headers, Consumer failureHandler) { assertThat(followRequest.getRemoteCluster(), equalTo("remote")); assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101")); - assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101")); + assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); failureHandler.accept(failure); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java index d21098506a121..eceb37819d187 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java @@ -20,61 +20,13 @@ import java.util.List; import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD; -import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters; + +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; public class FollowInfoResponseTests extends AbstractSerializingTestCase { - static final ConstructingObjectParser PARAMETERS_PARSER = new ConstructingObjectParser<>( - "parameters_parser", - args -> { - return new FollowParameters( - (Integer) args[0], - (ByteSizeValue) args[1], - (Integer) args[2], - (Integer) args[3], - (ByteSizeValue) args[4], - (Integer) args[5], - (Integer) args[6], - (ByteSizeValue) args[7], - (TimeValue) args[8], - (TimeValue) args[9] - ); - }); - - static { - PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_READ_REQUEST_OPERATION_COUNT); - PARAMETERS_PARSER.declareField( - ConstructingObjectParser.constructorArg(), - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_READ_REQUEST_SIZE.getPreferredName()), - ShardFollowTask.MAX_READ_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_READ_REQUESTS); - PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_REQUEST_OPERATION_COUNT); - PARAMETERS_PARSER.declareField( - ConstructingObjectParser.constructorArg(), - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_REQUEST_SIZE.getPreferredName()), - ShardFollowTask.MAX_WRITE_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_WRITE_REQUESTS); - PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_COUNT); - PARAMETERS_PARSER.declareField( - ConstructingObjectParser.constructorArg(), - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName()), - ShardFollowTask.MAX_WRITE_BUFFER_SIZE, - ObjectParser.ValueType.STRING); - PARAMETERS_PARSER.declareField( - ConstructingObjectParser.constructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.MAX_RETRY_DELAY.getPreferredName()), - ShardFollowTask.MAX_RETRY_DELAY, - ObjectParser.ValueType.STRING); - PARAMETERS_PARSER.declareField( - ConstructingObjectParser.constructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.READ_POLL_TIMEOUT.getPreferredName()), - ShardFollowTask.READ_POLL_TIMEOUT, - ObjectParser.ValueType.STRING); - } - + static final ObjectParser PARAMETERS_PARSER = new ObjectParser<>("parameters_parser", FollowParameters::new); static final ConstructingObjectParser INFO_PARSER = new ConstructingObjectParser<>( "info_parser", args -> { @@ -88,6 +40,8 @@ public class FollowInfoResponseTests extends AbstractSerializingTestCase instanceReader() protected PutAutoFollowPatternAction.Request createTestInstance() { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName(randomAlphaOfLength(4)); + request.setRemoteCluster(randomAlphaOfLength(4)); request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false))); if (randomBoolean()) { request.setFollowIndexNamePattern(randomAlphaOfLength(4)); } + ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters()); + return request; + } + + @Override + protected PutAutoFollowPatternAction.Request createXContextTestInstance(XContentType xContentType) { + // follower index parameter is not part of the request body and is provided in the url path. + // So this field cannot be used for creating a test instance for xcontent testing. + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setRemoteCluster(randomAlphaOfLength(4)); + request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false))); if (randomBoolean()) { - request.setReadPollTimeout(TimeValue.timeValueMillis(500)); - } - if (randomBoolean()) { - request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); - } - if (randomBoolean()) { - request.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); - } - if (randomBoolean()) { - request.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong())); - } - if (randomBoolean()) { - request.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - request.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); - } - if (randomBoolean()) { - request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); + request.setFollowIndexNamePattern(randomAlphaOfLength(4)); } + ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters()); return request; } @@ -109,17 +91,17 @@ public void testValidate() { validationException = request.validate(); assertThat(validationException, nullValue()); - request.setMaxRetryDelay(TimeValue.ZERO); + request.getParameters().setMaxRetryDelay(TimeValue.ZERO); validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]")); - request.setMaxRetryDelay(TimeValue.timeValueMinutes(10)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(10)); validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]")); - request.setMaxRetryDelay(TimeValue.timeValueMinutes(1)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(1)); validationException = request.validate(); assertThat(validationException, nullValue()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java index d32a773ebe218..02b5eca08fa4a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -22,16 +23,32 @@ protected Writeable.Reader instanceReader() { @Override protected PutFollowAction.Request createTestInstance() { + PutFollowAction.Request request = new PutFollowAction.Request(); + request.setFollowerIndex(randomAlphaOfLength(4)); + request.waitForActiveShards(randomFrom(ActiveShardCount.DEFAULT, ActiveShardCount.NONE, ActiveShardCount.ONE, + ActiveShardCount.ALL)); + + request.setRemoteCluster(randomAlphaOfLength(4)); + request.setLeaderIndex(randomAlphaOfLength(4)); + ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters()); + return request; + } + + @Override + protected PutFollowAction.Request createXContextTestInstance(XContentType xContentType) { + // follower index parameter and wait for active shards params are not part of the request body and + // are provided in the url path. So these fields cannot be used for creating a test instance for xcontent testing. PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster(randomAlphaOfLength(4)); request.setLeaderIndex(randomAlphaOfLength(4)); - request.setFollowRequest(ResumeFollowActionRequestTests.createTestRequest()); + request.setFollowerIndex("followerIndex"); + ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters()); return request; } @Override protected PutFollowAction.Request doParseInstance(XContentParser parser) throws IOException { - return PutFollowAction.Request.fromXContent(parser, null, ActiveShardCount.DEFAULT); + return PutFollowAction.Request.fromXContent(parser, "followerIndex", ActiveShardCount.DEFAULT); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java index 3d3e869f53e8a..53efac70a7dc0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ResumeFollowActionRequestTests.java @@ -11,7 +11,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; @@ -29,7 +31,20 @@ protected Writeable.Reader instanceReader() { @Override protected ResumeFollowAction.Request createTestInstance() { - return createTestRequest(); + ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + request.setFollowerIndex(randomAlphaOfLength(4)); + + generateFollowParameters(request.getParameters()); + return request; + } + + @Override + protected ResumeFollowAction.Request createXContextTestInstance(XContentType type) { + // follower index parameter is not part of the request body and is provided in the url path. + // So this field cannot be used for creating a test instance for xcontent testing. + ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + generateFollowParameters(request.getParameters()); + return request; } @Override @@ -42,57 +57,54 @@ protected boolean supportsUnknownFields() { return false; } - static ResumeFollowAction.Request createTestRequest() { - ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setFollowerIndex(randomAlphaOfLength(4)); + static void generateFollowParameters(FollowParameters followParameters) { if (randomBoolean()) { - request.setMaxReadRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE)); + followParameters.setMaxReadRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxOutstandingReadRequests(randomIntBetween(1, Integer.MAX_VALUE)); + followParameters.setMaxOutstandingReadRequests(randomIntBetween(1, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxOutstandingWriteRequests(randomIntBetween(1, Integer.MAX_VALUE)); + followParameters.setMaxOutstandingWriteRequests(randomIntBetween(1, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); + followParameters.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE)); + followParameters.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxWriteRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE)); + followParameters.setMaxWriteRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE)); } if (randomBoolean()) { - request.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong())); + followParameters.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong())); } if (randomBoolean()) { - request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); + followParameters.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES)); } if (randomBoolean()) { - request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); + followParameters.setMaxRetryDelay(TimeValue.timeValueMillis(500)); } if (randomBoolean()) { - request.setReadPollTimeout(TimeValue.timeValueMillis(500)); + followParameters.setReadPollTimeout(TimeValue.timeValueMillis(500)); } - return request; } public void testValidate() { ResumeFollowAction.Request request = new ResumeFollowAction.Request(); request.setFollowerIndex("index2"); - request.setMaxRetryDelay(TimeValue.ZERO); + request.getParameters().setMaxRetryDelay(TimeValue.ZERO); ActionRequestValidationException validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]")); - request.setMaxRetryDelay(TimeValue.timeValueMinutes(10)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(10)); validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]")); - request.setMaxRetryDelay(TimeValue.timeValueMinutes(1)); + request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(1)); validationException = request.validate(); assertThat(validationException, nullValue()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java index 11d4f22e1b7a8..4cbd575c67b30 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java @@ -14,8 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -24,17 +22,6 @@ import java.util.List; import java.util.Objects; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_OPERATION_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_SIZE; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.READ_POLL_TIMEOUT; - public class FollowInfoAction extends Action { public static final String NAME = "cluster:monitor/ccr/follow_info"; @@ -202,7 +189,7 @@ public FollowParameters getParameters() { remoteCluster = in.readString(); leaderIndex = in.readString(); status = Status.fromString(in.readString()); - parameters = in.readOptionalWriteable(FollowParameters::new); + parameters = in.readOptionalWriteable(innerIn -> new FollowParameters(in)); } @Override @@ -224,16 +211,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (parameters != null) { builder.startObject(PARAMETERS_FIELD.getPreferredName()); { - builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxReadRequestOperationCount); - builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), parameters.maxReadRequestSize.getStringRep()); - builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), parameters.maxOutstandingReadRequests); - builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxWriteRequestOperationCount); - builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), parameters.maxWriteRequestSize.getStringRep()); - builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), parameters.maxOutstandingWriteRequests); - builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), parameters.maxWriteBufferCount); - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), parameters.maxWriteBufferSize.getStringRep()); - builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), parameters.maxRetryDelay.getStringRep()); - builder.field(READ_POLL_TIMEOUT.getPreferredName(), parameters.readPollTimeout.getStringRep()); + parameters.toXContentFragment(builder); } builder.endObject(); } @@ -263,138 +241,6 @@ public String toString() { } } - public static class FollowParameters implements Writeable { - - private final int maxReadRequestOperationCount; - private final ByteSizeValue maxReadRequestSize; - private final int maxOutstandingReadRequests; - private final int maxWriteRequestOperationCount; - private final ByteSizeValue maxWriteRequestSize; - private final int maxOutstandingWriteRequests; - private final int maxWriteBufferCount; - private final ByteSizeValue maxWriteBufferSize; - private final TimeValue maxRetryDelay; - private final TimeValue readPollTimeout; - - public FollowParameters(int maxReadRequestOperationCount, - ByteSizeValue maxReadRequestSize, int maxOutstandingReadRequests, - int maxWriteRequestOperationCount, ByteSizeValue maxWriteRequestSize, - int maxOutstandingWriteRequests, int maxWriteBufferCount, - ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue readPollTimeout) { - this.maxReadRequestOperationCount = maxReadRequestOperationCount; - this.maxReadRequestSize = maxReadRequestSize; - this.maxOutstandingReadRequests = maxOutstandingReadRequests; - this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; - this.maxWriteRequestSize = maxWriteRequestSize; - this.maxOutstandingWriteRequests = maxOutstandingWriteRequests; - this.maxWriteBufferCount = maxWriteBufferCount; - this.maxWriteBufferSize = maxWriteBufferSize; - this.maxRetryDelay = maxRetryDelay; - this.readPollTimeout = readPollTimeout; - } - - public int getMaxReadRequestOperationCount() { - return maxReadRequestOperationCount; - } - - public ByteSizeValue getMaxReadRequestSize() { - return maxReadRequestSize; - } - - public int getMaxOutstandingReadRequests() { - return maxOutstandingReadRequests; - } - - public int getMaxWriteRequestOperationCount() { - return maxWriteRequestOperationCount; - } - - public ByteSizeValue getMaxWriteRequestSize() { - return maxWriteRequestSize; - } - - public int getMaxOutstandingWriteRequests() { - return maxOutstandingWriteRequests; - } - - public int getMaxWriteBufferCount() { - return maxWriteBufferCount; - } - - public ByteSizeValue getMaxWriteBufferSize() { - return maxWriteBufferSize; - } - - public TimeValue getMaxRetryDelay() { - return maxRetryDelay; - } - - public TimeValue getReadPollTimeout() { - return readPollTimeout; - } - - FollowParameters(StreamInput in) throws IOException { - this.maxReadRequestOperationCount = in.readVInt(); - this.maxReadRequestSize = new ByteSizeValue(in); - this.maxOutstandingReadRequests = in.readVInt(); - this.maxWriteRequestOperationCount = in.readVInt(); - this.maxWriteRequestSize = new ByteSizeValue(in); - this.maxOutstandingWriteRequests = in.readVInt(); - this.maxWriteBufferCount = in.readVInt(); - this.maxWriteBufferSize = new ByteSizeValue(in); - this.maxRetryDelay = in.readTimeValue(); - this.readPollTimeout = in.readTimeValue(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(maxReadRequestOperationCount); - maxReadRequestSize.writeTo(out); - out.writeVInt(maxOutstandingReadRequests); - out.writeVLong(maxWriteRequestOperationCount); - maxWriteRequestSize.writeTo(out); - out.writeVInt(maxOutstandingWriteRequests); - out.writeVInt(maxWriteBufferCount); - maxWriteBufferSize.writeTo(out); - out.writeTimeValue(maxRetryDelay); - out.writeTimeValue(readPollTimeout); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FollowParameters that = (FollowParameters) o; - return maxReadRequestOperationCount == that.maxReadRequestOperationCount && - maxOutstandingReadRequests == that.maxOutstandingReadRequests && - maxWriteRequestOperationCount == that.maxWriteRequestOperationCount && - maxOutstandingWriteRequests == that.maxOutstandingWriteRequests && - maxWriteBufferCount == that.maxWriteBufferCount && - Objects.equals(maxReadRequestSize, that.maxReadRequestSize) && - Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) && - Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && - Objects.equals(maxRetryDelay, that.maxRetryDelay) && - Objects.equals(readPollTimeout, that.readPollTimeout); - } - - @Override - public int hashCode() { - return Objects.hash( - maxReadRequestOperationCount, - maxReadRequestSize, - maxOutstandingReadRequests, - maxWriteRequestOperationCount, - maxWriteRequestSize, - maxOutstandingWriteRequests, - maxWriteBufferCount, - maxWriteBufferSize, - maxRetryDelay, - readPollTimeout - ); - } - - } - public enum Status { ACTIVE("active"), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowParameters.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowParameters.java new file mode 100644 index 0000000000000..001a79323ab38 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowParameters.java @@ -0,0 +1,314 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.AbstractObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class FollowParameters implements Writeable { + + static final TimeValue RETRY_DELAY_MAX = TimeValue.timeValueMinutes(5); + + static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count"); + static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count"); + static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests"); + static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests"); + static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size"); + static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size"); + static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); + static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); + static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); + static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout"); + + Integer maxReadRequestOperationCount; + Integer maxWriteRequestOperationCount; + Integer maxOutstandingReadRequests; + Integer maxOutstandingWriteRequests; + ByteSizeValue maxReadRequestSize; + ByteSizeValue maxWriteRequestSize; + Integer maxWriteBufferCount; + ByteSizeValue maxWriteBufferSize; + TimeValue maxRetryDelay; + TimeValue readPollTimeout; + + public FollowParameters() { + } + + public FollowParameters(FollowParameters source) { + this.maxReadRequestOperationCount = source.maxReadRequestOperationCount; + this.maxWriteRequestOperationCount = source.maxWriteRequestOperationCount; + this.maxOutstandingReadRequests = source.maxOutstandingReadRequests; + this.maxOutstandingWriteRequests = source.maxOutstandingWriteRequests; + this.maxReadRequestSize = source.maxReadRequestSize; + this.maxWriteRequestSize = source.maxWriteRequestSize; + this.maxWriteBufferCount = source.maxWriteBufferCount; + this.maxWriteBufferSize = source.maxWriteBufferSize; + this.maxRetryDelay = source.maxRetryDelay; + this.readPollTimeout = source.readPollTimeout; + } + + public Integer getMaxReadRequestOperationCount() { + return maxReadRequestOperationCount; + } + + public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) { + this.maxReadRequestOperationCount = maxReadRequestOperationCount; + } + + public ByteSizeValue getMaxReadRequestSize() { + return maxReadRequestSize; + } + + public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) { + this.maxReadRequestSize = maxReadRequestSize; + } + + public Integer getMaxOutstandingReadRequests() { + return maxOutstandingReadRequests; + } + + public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) { + this.maxOutstandingReadRequests = maxOutstandingReadRequests; + } + + public Integer getMaxWriteRequestOperationCount() { + return maxWriteRequestOperationCount; + } + + public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) { + this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; + } + + public ByteSizeValue getMaxWriteRequestSize() { + return maxWriteRequestSize; + } + + public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) { + this.maxWriteRequestSize = maxWriteRequestSize; + } + + public Integer getMaxOutstandingWriteRequests() { + return maxOutstandingWriteRequests; + } + + public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) { + this.maxOutstandingWriteRequests = maxOutstandingWriteRequests; + } + + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { + this.maxWriteBufferCount = maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { + this.maxWriteBufferSize = maxWriteBufferSize; + } + + public TimeValue getMaxRetryDelay() { + return maxRetryDelay; + } + + public void setMaxRetryDelay(TimeValue maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + } + + public TimeValue getReadPollTimeout() { + return readPollTimeout; + } + + public void setReadPollTimeout(TimeValue readPollTimeout) { + this.readPollTimeout = readPollTimeout; + } + + public ActionRequestValidationException validate() { + ActionRequestValidationException e = null; + + if (maxReadRequestOperationCount != null && maxReadRequestOperationCount < 1) { + e = addValidationError(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e); + } + if (maxReadRequestSize != null && maxReadRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) { + e = addValidationError(MAX_READ_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e); + } + if (maxOutstandingReadRequests != null && maxOutstandingReadRequests < 1) { + e = addValidationError(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName() + " must be larger than 0", e); + } + if (maxWriteRequestOperationCount != null && maxWriteRequestOperationCount < 1) { + e = addValidationError(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e); + } + if (maxWriteRequestSize != null && maxWriteRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) { + e = addValidationError(MAX_WRITE_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e); + } + if (maxOutstandingWriteRequests != null && maxOutstandingWriteRequests < 1) { + e = addValidationError(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName() + " must be larger than 0", e); + } + if (maxWriteBufferCount != null && maxWriteBufferCount < 1) { + e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e); + } + if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) { + e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e); + } + if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) { + String message = "[" + MAX_RETRY_DELAY.getPreferredName() + "] must be positive but was [" + + maxRetryDelay.getStringRep() + "]"; + e = addValidationError(message, e); + } + if (maxRetryDelay != null && maxRetryDelay.millis() > RETRY_DELAY_MAX.millis()) { + String message = "[" + MAX_RETRY_DELAY.getPreferredName() + "] must be less than [" + RETRY_DELAY_MAX.getStringRep() + + "] but was [" + maxRetryDelay.getStringRep() + "]"; + e = addValidationError(message, e); + } + + return e; + } + + FollowParameters(StreamInput in) throws IOException { + fromStreamInput(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalVInt(maxReadRequestOperationCount); + out.writeOptionalVInt(maxOutstandingReadRequests); + out.writeOptionalWriteable(maxReadRequestSize); + out.writeOptionalVInt(maxWriteRequestOperationCount); + out.writeOptionalWriteable(maxWriteRequestSize); + out.writeOptionalVInt(maxOutstandingWriteRequests); + out.writeOptionalVInt(maxWriteBufferCount); + out.writeOptionalWriteable(maxWriteBufferSize); + out.writeOptionalTimeValue(maxRetryDelay); + out.writeOptionalTimeValue(readPollTimeout); + } + + void fromStreamInput(StreamInput in) throws IOException { + maxReadRequestOperationCount = in.readOptionalVInt(); + maxOutstandingReadRequests = in.readOptionalVInt(); + maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new); + maxWriteRequestOperationCount = in.readOptionalVInt(); + maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new); + maxOutstandingWriteRequests = in.readOptionalVInt(); + maxWriteBufferCount = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); + maxRetryDelay = in.readOptionalTimeValue(); + readPollTimeout = in.readOptionalTimeValue(); + } + + XContentBuilder toXContentFragment(final XContentBuilder builder) throws IOException { + if (maxReadRequestOperationCount != null) { + builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount); + } + if (maxWriteRequestOperationCount != null) { + builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount); + } + if (maxOutstandingReadRequests != null) { + builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests); + } + if (maxOutstandingWriteRequests != null) { + builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests); + } + if (maxReadRequestSize != null) { + builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep()); + } + if (maxWriteRequestSize != null) { + builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep()); + } + if (maxWriteBufferCount != null) { + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); + } + if (maxRetryDelay != null) { + builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); + } + if (readPollTimeout != null) { + builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep()); + } + return builder; + } + + public static

void initParser(AbstractObjectParser parser) { + parser.declareInt(FollowParameters::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT); + parser.declareInt(FollowParameters::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT); + parser.declareInt(FollowParameters::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS); + parser.declareInt(FollowParameters::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS); + parser.declareField( + FollowParameters::setMaxReadRequestSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()), + AutoFollowMetadata.AutoFollowPattern.MAX_READ_REQUEST_SIZE, + ObjectParser.ValueType.STRING); + parser.declareField( + FollowParameters::setMaxWriteRequestSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()), + AutoFollowMetadata.AutoFollowPattern.MAX_WRITE_REQUEST_SIZE, + ObjectParser.ValueType.STRING); + parser.declareInt(FollowParameters::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT); + parser.declareField( + FollowParameters::setMaxWriteBufferSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), + MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); + parser.declareField(FollowParameters::setMaxRetryDelay, + (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), + MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); + parser.declareField(FollowParameters::setReadPollTimeout, + (p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()), + READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o instanceof FollowParameters == false) return false; + FollowParameters that = (FollowParameters) o; + return Objects.equals(maxReadRequestOperationCount, that.maxReadRequestOperationCount) && + Objects.equals(maxWriteRequestOperationCount, that.maxWriteRequestOperationCount) && + Objects.equals(maxOutstandingReadRequests, that.maxOutstandingReadRequests) && + Objects.equals(maxOutstandingWriteRequests, that.maxOutstandingWriteRequests) && + Objects.equals(maxReadRequestSize, that.maxReadRequestSize) && + Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) && + Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) && + Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && + Objects.equals(maxRetryDelay, that.maxRetryDelay) && + Objects.equals(readPollTimeout, that.readPollTimeout); + } + + @Override + public int hashCode() { + return Objects.hash( + maxReadRequestOperationCount, + maxWriteRequestOperationCount, + maxOutstandingReadRequests, + maxOutstandingWriteRequests, + maxReadRequestSize, + maxWriteRequestSize, + maxWriteBufferCount, + maxWriteBufferSize, + maxRetryDelay, + readPollTimeout + ); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java index 12d30e4d9f9b1..1ae9801916bce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutAutoFollowPatternAction.java @@ -5,15 +5,14 @@ */ package org.elasticsearch.xpack.core.ccr.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -32,6 +31,7 @@ public class PutAutoFollowPatternAction extends Action { public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/put"; public static final PutAutoFollowPatternAction INSTANCE = new PutAutoFollowPatternAction(); + private static final int MAX_NAME_BYTES = 255; private PutAutoFollowPatternAction() { super(NAME); @@ -44,54 +44,27 @@ public AcknowledgedResponse newResponse() { public static class Request extends AcknowledgedRequest implements ToXContentObject { - private static final ObjectParser PARSER = new ObjectParser<>("put_auto_follow_pattern_request", Request::new); - private static final ParseField NAME_FIELD = new ParseField("name"); - private static final int MAX_NAME_BYTES = 255; + // Note that Request should be the Value class here for this parser with a 'parameters' field that maps to + // PutAutoFollowPatternParameters class. But since two minor version are already released with duplicate + // follow parameters in several APIs, PutAutoFollowPatternParameters is now the Value class here. + private static final ObjectParser PARSER = + new ObjectParser<>("put_auto_follow_pattern_request", PutAutoFollowPatternParameters::new); static { - PARSER.declareString(Request::setName, NAME_FIELD); - PARSER.declareString(Request::setRemoteCluster, REMOTE_CLUSTER_FIELD); - PARSER.declareStringArray(Request::setLeaderIndexPatterns, AutoFollowPattern.LEADER_PATTERNS_FIELD); - PARSER.declareString(Request::setFollowIndexNamePattern, AutoFollowPattern.FOLLOW_PATTERN_FIELD); - PARSER.declareInt(Request::setMaxReadRequestOperationCount, AutoFollowPattern.MAX_READ_REQUEST_OPERATION_COUNT); - PARSER.declareField( - Request::setMaxReadRequestSize, - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_READ_REQUEST_SIZE.getPreferredName()), - AutoFollowPattern.MAX_READ_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareInt(Request::setMaxConcurrentReadBatches, AutoFollowPattern.MAX_OUTSTANDING_READ_REQUESTS); - PARSER.declareInt(Request::setMaxWriteRequestOperationCount, AutoFollowPattern.MAX_WRITE_REQUEST_OPERATION_COUNT); - PARSER.declareField( - Request::setMaxWriteRequestSize, - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_REQUEST_SIZE.getPreferredName()), - AutoFollowPattern.MAX_WRITE_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_OUTSTANDING_WRITE_REQUESTS); - PARSER.declareInt(Request::setMaxWriteBufferCount, AutoFollowPattern.MAX_WRITE_BUFFER_COUNT); - PARSER.declareField( - Request::setMaxWriteBufferSize, - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName()), - AutoFollowPattern.MAX_WRITE_BUFFER_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareField(Request::setMaxRetryDelay, - (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName()), - AutoFollowPattern.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); - PARSER.declareField(Request::setReadPollTimeout, - (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.READ_POLL_TIMEOUT.getPreferredName()), - AutoFollowPattern.READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareString((params, value) -> params.remoteCluster = value, REMOTE_CLUSTER_FIELD); + PARSER.declareStringArray((params, value) -> params.leaderIndexPatterns = value, AutoFollowPattern.LEADER_PATTERNS_FIELD); + PARSER.declareString((params, value) -> params.followIndexNamePattern = value, AutoFollowPattern.FOLLOW_PATTERN_FIELD); + FollowParameters.initParser(PARSER); } public static Request fromXContent(XContentParser parser, String name) throws IOException { - Request request = PARSER.parse(parser, null); - if (name != null) { - if (request.name == null) { - request.name = name; - } else { - if (request.name.equals(name) == false) { - throw new IllegalArgumentException("provided name is not equal"); - } - } - } + PutAutoFollowPatternParameters parameters = PARSER.parse(parser, null); + Request request = new Request(); + request.setName(name); + request.setRemoteCluster(parameters.remoteCluster); + request.setLeaderIndexPatterns(parameters.leaderIndexPatterns); + request.setFollowIndexNamePattern(parameters.followIndexNamePattern); + request.setParameters(parameters); return request; } @@ -99,40 +72,28 @@ public static Request fromXContent(XContentParser parser, String name) throws IO private String remoteCluster; private List leaderIndexPatterns; private String followIndexNamePattern; - - private Integer maxReadRequestOperationCount; - private ByteSizeValue maxReadRequestSize; - private Integer maxConcurrentReadBatches; - private Integer maxWriteRequestOperationCount; - private ByteSizeValue maxWriteRequestSize; - private Integer maxConcurrentWriteBatches; - private Integer maxWriteBufferCount; - private ByteSizeValue maxWriteBufferSize; - private TimeValue maxRetryDelay; - private TimeValue readPollTimeout; + private FollowParameters parameters = new FollowParameters(); public Request() { } @Override public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; + ActionRequestValidationException validationException = parameters.validate(); if (name == null) { - validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] is missing", validationException); + validationException = addValidationError("[name] is missing", validationException); } if (name != null) { if (name.contains(",")) { - validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] name must not contain a ','", - validationException); + validationException = addValidationError("[name] name must not contain a ','", validationException); } if (name.startsWith("_")) { - validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] name must not start with '_'", - validationException); + validationException = addValidationError("[name] name must not start with '_'", validationException); } int byteCount = name.getBytes(StandardCharsets.UTF_8).length; if (byteCount > MAX_NAME_BYTES) { - validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] name is too long (" + - byteCount + " > " + MAX_NAME_BYTES + ")", validationException); + validationException = addValidationError("[name] name is too long (" + byteCount + " > " + MAX_NAME_BYTES + ")", + validationException); } } if (remoteCluster == null) { @@ -143,19 +104,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("[" + AutoFollowPattern.LEADER_PATTERNS_FIELD.getPreferredName() + "] is missing", validationException); } - if (maxRetryDelay != null) { - if (maxRetryDelay.millis() <= 0) { - String message = "[" + AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName() + "] must be positive but was [" + - maxRetryDelay.getStringRep() + "]"; - validationException = addValidationError(message, validationException); - } - if (maxRetryDelay.millis() > ResumeFollowAction.MAX_RETRY_DELAY.millis()) { - String message = "[" + AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName() + "] must be less than [" + - ResumeFollowAction.MAX_RETRY_DELAY + - "] but was [" + maxRetryDelay.getStringRep() + "]"; - validationException = addValidationError(message, validationException); - } - } return validationException; } @@ -191,84 +139,12 @@ public void setFollowIndexNamePattern(String followIndexNamePattern) { this.followIndexNamePattern = followIndexNamePattern; } - public Integer getMaxReadRequestOperationCount() { - return maxReadRequestOperationCount; - } - - public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) { - this.maxReadRequestOperationCount = maxReadRequestOperationCount; - } - - public Integer getMaxConcurrentReadBatches() { - return maxConcurrentReadBatches; - } - - public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) { - this.maxConcurrentReadBatches = maxConcurrentReadBatches; - } - - public ByteSizeValue getMaxReadRequestSize() { - return maxReadRequestSize; - } - - public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) { - this.maxReadRequestSize = maxReadRequestSize; - } - - public Integer getMaxWriteRequestOperationCount() { - return maxWriteRequestOperationCount; - } - - public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) { - this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; - } - - public ByteSizeValue getMaxWriteRequestSize() { - return maxWriteRequestSize; - } - - public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) { - this.maxWriteRequestSize = maxWriteRequestSize; - } - - public Integer getMaxConcurrentWriteBatches() { - return maxConcurrentWriteBatches; - } - - public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) { - this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; - } - - public Integer getMaxWriteBufferCount() { - return maxWriteBufferCount; - } - - public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { - this.maxWriteBufferCount = maxWriteBufferCount; - } - - public ByteSizeValue getMaxWriteBufferSize() { - return maxWriteBufferSize; - } - - public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { - this.maxWriteBufferSize = maxWriteBufferSize; - } - - public TimeValue getMaxRetryDelay() { - return maxRetryDelay; - } - - public void setMaxRetryDelay(TimeValue maxRetryDelay) { - this.maxRetryDelay = maxRetryDelay; - } - - public TimeValue getReadPollTimeout() { - return readPollTimeout; + public FollowParameters getParameters() { + return parameters; } - public void setReadPollTimeout(TimeValue readPollTimeout) { - this.readPollTimeout = readPollTimeout; + public void setParameters(FollowParameters parameters) { + this.parameters = parameters; } public Request(StreamInput in) throws IOException { @@ -277,16 +153,21 @@ public Request(StreamInput in) throws IOException { remoteCluster = in.readString(); leaderIndexPatterns = in.readStringList(); followIndexNamePattern = in.readOptionalString(); - maxReadRequestOperationCount = in.readOptionalVInt(); - maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new); - maxConcurrentReadBatches = in.readOptionalVInt(); - maxWriteRequestOperationCount = in.readOptionalVInt(); - maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new); - maxConcurrentWriteBatches = in.readOptionalVInt(); - maxWriteBufferCount = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); - maxRetryDelay = in.readOptionalTimeValue(); - readPollTimeout = in.readOptionalTimeValue(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + parameters = new FollowParameters(in); + } else { + parameters = new FollowParameters(); + parameters.maxReadRequestOperationCount = in.readOptionalVInt(); + parameters.maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new); + parameters.maxOutstandingReadRequests = in.readOptionalVInt(); + parameters.maxWriteRequestOperationCount = in.readOptionalVInt(); + parameters.maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new); + parameters.maxOutstandingWriteRequests = in.readOptionalVInt(); + parameters.maxWriteBufferCount = in.readOptionalVInt(); + parameters.maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); + parameters.maxRetryDelay = in.readOptionalTimeValue(); + parameters.readPollTimeout = in.readOptionalTimeValue(); + } } @Override @@ -296,58 +177,32 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); out.writeStringCollection(leaderIndexPatterns); out.writeOptionalString(followIndexNamePattern); - out.writeOptionalVInt(maxReadRequestOperationCount); - out.writeOptionalWriteable(maxReadRequestSize); - out.writeOptionalVInt(maxConcurrentReadBatches); - out.writeOptionalVInt(maxWriteRequestOperationCount); - out.writeOptionalWriteable(maxWriteRequestSize); - out.writeOptionalVInt(maxConcurrentWriteBatches); - out.writeOptionalVInt(maxWriteBufferCount); - out.writeOptionalWriteable(maxWriteBufferSize); - out.writeOptionalTimeValue(maxRetryDelay); - out.writeOptionalTimeValue(readPollTimeout); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + parameters.writeTo(out); + } else { + out.writeOptionalVInt(parameters.maxReadRequestOperationCount); + out.writeOptionalWriteable(parameters.maxReadRequestSize); + out.writeOptionalVInt(parameters.maxOutstandingReadRequests); + out.writeOptionalVInt(parameters.maxWriteRequestOperationCount); + out.writeOptionalWriteable(parameters.maxWriteRequestSize); + out.writeOptionalVInt(parameters.maxOutstandingWriteRequests); + out.writeOptionalVInt(parameters.maxWriteBufferCount); + out.writeOptionalWriteable(parameters.maxWriteBufferSize); + out.writeOptionalTimeValue(parameters.maxRetryDelay); + out.writeOptionalTimeValue(parameters.readPollTimeout); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); { - builder.field(NAME_FIELD.getPreferredName(), name); builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.field(AutoFollowPattern.LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns); if (followIndexNamePattern != null) { builder.field(AutoFollowPattern.FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexNamePattern); } - if (maxReadRequestOperationCount != null) { - builder.field(AutoFollowPattern.MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount); - } - if (maxReadRequestSize != null) { - builder.field(AutoFollowPattern.MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep()); - } - if (maxWriteRequestOperationCount != null) { - builder.field(AutoFollowPattern.MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount); - } - if (maxWriteRequestSize != null) { - builder.field(AutoFollowPattern.MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep()); - } - if (maxWriteBufferCount != null) { - builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); - } - if (maxWriteBufferSize != null) { - builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); - } - if (maxConcurrentReadBatches != null) { - builder.field(AutoFollowPattern.MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxConcurrentReadBatches); - } - if (maxConcurrentWriteBatches != null) { - builder.field(AutoFollowPattern.MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxConcurrentWriteBatches); - } - if (maxRetryDelay != null) { - builder.field(AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); - } - if (readPollTimeout != null) { - builder.field(AutoFollowPattern.READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep()); - } + parameters.toXContentFragment(builder); } builder.endObject(); return builder; @@ -359,39 +214,25 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; return Objects.equals(name, request.name) && - Objects.equals(remoteCluster, request.remoteCluster) && - Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) && - Objects.equals(followIndexNamePattern, request.followIndexNamePattern) && - Objects.equals(maxReadRequestOperationCount, request.maxReadRequestOperationCount) && - Objects.equals(maxReadRequestSize, request.maxReadRequestSize) && - Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && - Objects.equals(maxWriteRequestOperationCount, request.maxWriteRequestOperationCount) && - Objects.equals(maxWriteRequestSize, request.maxWriteRequestSize) && - Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && - Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) && - Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && - Objects.equals(maxRetryDelay, request.maxRetryDelay) && - Objects.equals(readPollTimeout, request.readPollTimeout); + Objects.equals(remoteCluster, request.remoteCluster) && + Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) && + Objects.equals(followIndexNamePattern, request.followIndexNamePattern) && + Objects.equals(parameters, request.parameters); } @Override public int hashCode() { - return Objects.hash( - name, - remoteCluster, - leaderIndexPatterns, - followIndexNamePattern, - maxReadRequestOperationCount, - maxReadRequestSize, - maxConcurrentReadBatches, - maxWriteRequestOperationCount, - maxWriteRequestSize, - maxConcurrentWriteBatches, - maxWriteBufferCount, - maxWriteBufferSize, - maxRetryDelay, - readPollTimeout); + return Objects.hash(name, remoteCluster, leaderIndexPatterns, followIndexNamePattern, parameters); } + + // This class only exists for reuse of the FollowParameters class, see comment above the parser field. + private static class PutAutoFollowPatternParameters extends FollowParameters { + + private String remoteCluster; + private List leaderIndexPatterns; + private String followIndexNamePattern; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 13ec8b84f81a0..89c18a9824ab4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -18,8 +18,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -29,17 +27,6 @@ import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.FOLLOWER_INDEX_FIELD; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_OPERATION_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_SIZE; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.READ_POLL_TIMEOUT; public final class PutFollowAction extends Action { @@ -65,72 +52,47 @@ public static class Request extends AcknowledgedRequest implements Indi private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); - private static final ObjectParser PARSER = new ObjectParser<>(NAME, () -> { - Request request = new Request(); - request.setFollowRequest(new ResumeFollowAction.Request()); - return request; - }); + // Note that Request should be the Value class here for this parser with a 'parameters' field that maps to + // PutFollowParameters class. But since two minor version are already released with duplicate follow parameters + // in several APIs, PutFollowParameters is now the Value class here. + private static final ObjectParser PARSER = new ObjectParser<>(NAME, PutFollowParameters::new); static { - PARSER.declareString(Request::setRemoteCluster, REMOTE_CLUSTER_FIELD); - PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD); - PARSER.declareString((req, val) -> req.followRequest.setFollowerIndex(val), FOLLOWER_INDEX_FIELD); - PARSER.declareInt((req, val) -> req.followRequest.setMaxReadRequestOperationCount(val), MAX_READ_REQUEST_OPERATION_COUNT); - PARSER.declareField( - (req, val) -> req.followRequest.setMaxReadRequestSize(val), - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()), - MAX_READ_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareInt((req, val) -> req.followRequest.setMaxOutstandingReadRequests(val), MAX_OUTSTANDING_READ_REQUESTS); - PARSER.declareInt((req, val) -> req.followRequest.setMaxWriteRequestOperationCount(val), MAX_WRITE_REQUEST_OPERATION_COUNT); - PARSER.declareField( - (req, val) -> req.followRequest.setMaxWriteRequestSize(val), - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()), - MAX_WRITE_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareInt((req, val) -> req.followRequest.setMaxOutstandingWriteRequests(val), MAX_OUTSTANDING_WRITE_REQUESTS); - PARSER.declareInt((req, val) -> req.followRequest.setMaxWriteBufferCount(val), MAX_WRITE_BUFFER_COUNT); - PARSER.declareField( - (req, val) -> req.followRequest.setMaxWriteBufferSize(val), - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), - MAX_WRITE_BUFFER_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareField( - (req, val) -> req.followRequest.setMaxRetryDelay(val), - (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), - MAX_RETRY_DELAY_FIELD, - ObjectParser.ValueType.STRING); - PARSER.declareField( - (req, val) -> req.followRequest.setReadPollTimeout(val), - (p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()), - READ_POLL_TIMEOUT, - ObjectParser.ValueType.STRING); + PARSER.declareString((putFollowParameters, value) -> putFollowParameters.remoteCluster = value, REMOTE_CLUSTER_FIELD); + PARSER.declareString((putFollowParameters, value) -> putFollowParameters.leaderIndex = value, LEADER_INDEX_FIELD); + FollowParameters.initParser(PARSER); } public static Request fromXContent(final XContentParser parser, final String followerIndex, ActiveShardCount waitForActiveShards) throws IOException { - Request request = PARSER.parse(parser, followerIndex); - if (followerIndex != null) { - if (request.getFollowRequest().getFollowerIndex() == null) { - request.getFollowRequest().setFollowerIndex(followerIndex); - } else { - if (request.getFollowRequest().getFollowerIndex().equals(followerIndex) == false) { - throw new IllegalArgumentException("provided follower_index is not equal"); - } - } - } + PutFollowParameters parameters = PARSER.parse(parser, null); + + Request request = new Request(); request.waitForActiveShards(waitForActiveShards); + request.setFollowerIndex(followerIndex); + request.setRemoteCluster(parameters.remoteCluster); + request.setLeaderIndex(parameters.leaderIndex); + request.setParameters(parameters); return request; } private String remoteCluster; private String leaderIndex; + private String followerIndex; + private FollowParameters parameters = new FollowParameters(); private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; - private ResumeFollowAction.Request followRequest; public Request() { } + public String getFollowerIndex() { + return followerIndex; + } + + public void setFollowerIndex(String followerIndex) { + this.followerIndex = followerIndex; + } + public String getRemoteCluster() { return remoteCluster; } @@ -147,6 +109,14 @@ public void setLeaderIndex(String leaderIndex) { this.leaderIndex = leaderIndex; } + public FollowParameters getParameters() { + return parameters; + } + + public void setParameters(FollowParameters parameters) { + this.parameters = parameters; + } + public ActiveShardCount waitForActiveShards() { return waitForActiveShards; } @@ -168,29 +138,24 @@ public void waitForActiveShards(ActiveShardCount waitForActiveShards) { } } - public ResumeFollowAction.Request getFollowRequest() { - return followRequest; - } - - public void setFollowRequest(ResumeFollowAction.Request followRequest) { - this.followRequest = followRequest; - } - @Override public ActionRequestValidationException validate() { - ActionRequestValidationException e = followRequest.validate(); + ActionRequestValidationException e = parameters.validate(); if (remoteCluster == null) { e = addValidationError(REMOTE_CLUSTER_FIELD.getPreferredName() + " is missing", e); } if (leaderIndex == null) { e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); } + if (followerIndex == null) { + e = addValidationError("follower_index is missing", e); + } return e; } @Override public String[] indices() { - return new String[]{followRequest.getFollowerIndex()}; + return new String[]{followerIndex}; } @Override @@ -200,12 +165,13 @@ public IndicesOptions indicesOptions() { public Request(StreamInput in) throws IOException { super(in); - remoteCluster = in.readString(); - leaderIndex = in.readString(); + this.remoteCluster = in.readString(); + this.leaderIndex = in.readString(); + this.followerIndex = in.readString(); + this.parameters = new FollowParameters(in); if (in.getVersion().onOrAfter(Version.V_6_7_0)) { waitForActiveShards(ActiveShardCount.readFrom(in)); } - followRequest = new ResumeFollowAction.Request(in); } @Override @@ -213,10 +179,11 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(remoteCluster); out.writeString(leaderIndex); + out.writeString(followerIndex); + parameters.writeTo(out); if (out.getVersion().onOrAfter(Version.V_6_7_0)) { waitForActiveShards.writeTo(out); } - followRequest.writeTo(out); } @Override @@ -225,7 +192,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws { builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); - followRequest.toXContentFragment(builder, params); + parameters.toXContentFragment(builder); } builder.endObject(); return builder; @@ -238,24 +205,23 @@ public boolean equals(Object o) { Request request = (Request) o; return Objects.equals(remoteCluster, request.remoteCluster) && Objects.equals(leaderIndex, request.leaderIndex) && - Objects.equals(waitForActiveShards, request.waitForActiveShards) && - Objects.equals(followRequest, request.followRequest); + Objects.equals(followerIndex, request.followerIndex) && + Objects.equals(parameters, request.parameters) && + Objects.equals(waitForActiveShards, request.waitForActiveShards); } @Override public int hashCode() { - return Objects.hash(remoteCluster, leaderIndex, waitForActiveShards, followRequest); + return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, waitForActiveShards); } - @Override - public String toString() { - return "PutFollowAction.Request{" + - "remoteCluster='" + remoteCluster + '\'' + - ", leaderIndex='" + leaderIndex + '\'' + - ", waitForActiveShards=" + waitForActiveShards + - ", followRequest=" + followRequest + - '}'; + // This class only exists for reuse of the FollowParameters class, see comment above the parser field. + private static class PutFollowParameters extends FollowParameters { + + private String remoteCluster; + private String leaderIndex; } + } public static class Response extends ActionResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java index 41728928e098f..547f04889a669 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java @@ -10,11 +10,8 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -30,8 +27,6 @@ public final class ResumeFollowAction extends Action { public static final ResumeFollowAction INSTANCE = new ResumeFollowAction(); public static final String NAME = "cluster:admin/xpack/ccr/resume_follow"; - public static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5); - private ResumeFollowAction() { super(NAME); } @@ -43,65 +38,28 @@ public AcknowledgedResponse newResponse() { public static class Request extends MasterNodeRequest implements ToXContentObject { - static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); - static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count"); - static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size"); - static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests"); - static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count"); - static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size"); - static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests"); - static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); - static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); - static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); - static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout"); - static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + // Note that Request should be the Value class here for this parser with a 'parameters' field that maps to FollowParameters class + // But since two minor version are already released with duplicate follow parameters in several APIs, FollowParameters + // is now the Value class here. + static final ObjectParser PARSER = new ObjectParser<>(NAME, FollowParameters::new); static { - PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD); - PARSER.declareInt(Request::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT); - PARSER.declareField( - Request::setMaxReadRequestSize, - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()), MAX_READ_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareInt(Request::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS); - PARSER.declareInt(Request::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT); - PARSER.declareField(Request::setMaxWriteRequestSize, - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()), MAX_WRITE_REQUEST_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareInt(Request::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS); - PARSER.declareInt(Request::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT); - PARSER.declareField( - Request::setMaxWriteBufferSize, - (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()), - MAX_WRITE_BUFFER_SIZE, - ObjectParser.ValueType.STRING); - PARSER.declareField( - Request::setMaxRetryDelay, - (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), - MAX_RETRY_DELAY_FIELD, - ObjectParser.ValueType.STRING); - PARSER.declareField( - Request::setReadPollTimeout, - (p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()), - READ_POLL_TIMEOUT, - ObjectParser.ValueType.STRING); + FollowParameters.initParser(PARSER); } public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException { - Request request = PARSER.parse(parser, followerIndex); - if (followerIndex != null) { - if (request.followerIndex == null) { - request.followerIndex = followerIndex; - } else { - if (request.followerIndex.equals(followerIndex) == false) { - throw new IllegalArgumentException("provided follower_index is not equal"); - } - } - } + FollowParameters parameters = PARSER.parse(parser, null); + Request request = new Request(); + request.setFollowerIndex(followerIndex); + request.setParameters(parameters); return request; } private String followerIndex; + private FollowParameters parameters = new FollowParameters(); + + public Request() { + } public String getFollowerIndex() { return followerIndex; @@ -111,261 +69,58 @@ public void setFollowerIndex(String followerIndex) { this.followerIndex = followerIndex; } - private Integer maxReadRequestOperationCount; - - public Integer getMaxReadRequestOperationCount() { - return maxReadRequestOperationCount; - } - - public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) { - this.maxReadRequestOperationCount = maxReadRequestOperationCount; - } - - private Integer maxOutstandingReadRequests; - - public Integer getMaxOutstandingReadRequests() { - return maxOutstandingReadRequests; - } - - public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) { - this.maxOutstandingReadRequests = maxOutstandingReadRequests; - } - - private ByteSizeValue maxReadRequestSize; - - public ByteSizeValue getMaxReadRequestSize() { - return maxReadRequestSize; - } - - public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) { - this.maxReadRequestSize = maxReadRequestSize; - } - - private Integer maxWriteRequestOperationCount; - - public Integer getMaxWriteRequestOperationCount() { - return maxWriteRequestOperationCount; - } - - public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) { - this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; - } - - private ByteSizeValue maxWriteRequestSize; - - public ByteSizeValue getMaxWriteRequestSize() { - return maxWriteRequestSize; - } - - public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) { - this.maxWriteRequestSize = maxWriteRequestSize; - } - - private Integer maxOutstandingWriteRequests; - - public Integer getMaxOutstandingWriteRequests() { - return maxOutstandingWriteRequests; - } - - public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) { - this.maxOutstandingWriteRequests = maxOutstandingWriteRequests; - } - - private Integer maxWriteBufferCount; - - public Integer getMaxWriteBufferCount() { - return maxWriteBufferCount; - } - - public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { - this.maxWriteBufferCount = maxWriteBufferCount; - } - - private ByteSizeValue maxWriteBufferSize; - - public ByteSizeValue getMaxWriteBufferSize() { - return maxWriteBufferSize; - } - - public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { - this.maxWriteBufferSize = maxWriteBufferSize; - } - - private TimeValue maxRetryDelay; - - public void setMaxRetryDelay(TimeValue maxRetryDelay) { - this.maxRetryDelay = maxRetryDelay; - } - - public TimeValue getMaxRetryDelay() { - return maxRetryDelay; - } - - private TimeValue readPollTimeout; - - public TimeValue getReadPollTimeout() { - return readPollTimeout; + public FollowParameters getParameters() { + return parameters; } - public void setReadPollTimeout(TimeValue readPollTimeout) { - this.readPollTimeout = readPollTimeout; - } - - public Request() { + public void setParameters(FollowParameters parameters) { + this.parameters = parameters; } @Override public ActionRequestValidationException validate() { - ActionRequestValidationException e = null; - + ActionRequestValidationException e = parameters.validate(); if (followerIndex == null) { - e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e); - } - if (maxReadRequestOperationCount != null && maxReadRequestOperationCount < 1) { - e = addValidationError(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e); - } - if (maxReadRequestSize != null && maxReadRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) { - e = addValidationError(MAX_READ_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e); - } - if (maxOutstandingReadRequests != null && maxOutstandingReadRequests < 1) { - e = addValidationError(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName() + " must be larger than 0", e); - } - if (maxWriteRequestOperationCount != null && maxWriteRequestOperationCount < 1) { - e = addValidationError(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e); + e = addValidationError("follower_index is missing", e); } - if (maxWriteRequestSize != null && maxWriteRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) { - e = addValidationError(MAX_WRITE_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e); - } - if (maxOutstandingWriteRequests != null && maxOutstandingWriteRequests < 1) { - e = addValidationError(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName() + " must be larger than 0", e); - } - if (maxWriteBufferCount != null && maxWriteBufferCount < 1) { - e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e); - } - if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) { - e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e); - } - if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) { - String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be positive but was [" + - maxRetryDelay.getStringRep() + "]"; - e = addValidationError(message, e); - } - if (maxRetryDelay != null && maxRetryDelay.millis() > ResumeFollowAction.MAX_RETRY_DELAY.millis()) { - String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be less than [" + MAX_RETRY_DELAY + - "] but was [" + maxRetryDelay.getStringRep() + "]"; - e = addValidationError(message, e); - } - return e; } public Request(StreamInput in) throws IOException { super(in); followerIndex = in.readString(); - maxReadRequestOperationCount = in.readOptionalVInt(); - maxOutstandingReadRequests = in.readOptionalVInt(); - maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new); - maxWriteRequestOperationCount = in.readOptionalVInt(); - maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new); - maxOutstandingWriteRequests = in.readOptionalVInt(); - maxWriteBufferCount = in.readOptionalVInt(); - maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new); - maxRetryDelay = in.readOptionalTimeValue(); - readPollTimeout = in.readOptionalTimeValue(); + parameters = new FollowParameters(in); } @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeString(followerIndex); - out.writeOptionalVInt(maxReadRequestOperationCount); - out.writeOptionalVInt(maxOutstandingReadRequests); - out.writeOptionalWriteable(maxReadRequestSize); - out.writeOptionalVInt(maxWriteRequestOperationCount); - out.writeOptionalWriteable(maxWriteRequestSize); - out.writeOptionalVInt(maxOutstandingWriteRequests); - out.writeOptionalVInt(maxWriteBufferCount); - out.writeOptionalWriteable(maxWriteBufferSize); - out.writeOptionalTimeValue(maxRetryDelay); - out.writeOptionalTimeValue(readPollTimeout); + parameters.writeTo(out); } @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); { - toXContentFragment(builder, params); + parameters.toXContentFragment(builder); } builder.endObject(); return builder; } - void toXContentFragment(final XContentBuilder builder, final Params params) throws IOException { - builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); - if (maxReadRequestOperationCount != null) { - builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount); - } - if (maxReadRequestSize != null) { - builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep()); - } - if (maxWriteRequestOperationCount != null) { - builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount); - } - if (maxWriteRequestSize != null) { - builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep()); - } - if (maxWriteBufferCount != null) { - builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); - } - if (maxWriteBufferSize != null) { - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); - } - if (maxOutstandingReadRequests != null) { - builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests); - } - if (maxOutstandingWriteRequests != null) { - builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests); - } - if (maxRetryDelay != null) { - builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); - } - if (readPollTimeout != null) { - builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep()); - } - } - @Override - public boolean equals(final Object o) { + public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return Objects.equals(maxReadRequestOperationCount, request.maxReadRequestOperationCount) && - Objects.equals(maxReadRequestSize, request.maxReadRequestSize) && - Objects.equals(maxOutstandingReadRequests, request.maxOutstandingReadRequests) && - Objects.equals(maxWriteRequestOperationCount, request.maxWriteRequestOperationCount) && - Objects.equals(maxWriteRequestSize, request.maxWriteRequestSize) && - Objects.equals(maxOutstandingWriteRequests, request.maxOutstandingWriteRequests) && - Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) && - Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && - Objects.equals(maxRetryDelay, request.maxRetryDelay) && - Objects.equals(readPollTimeout, request.readPollTimeout) && - Objects.equals(followerIndex, request.followerIndex); + return Objects.equals(followerIndex, request.followerIndex) && + Objects.equals(parameters, request.parameters); } @Override public int hashCode() { - return Objects.hash( - followerIndex, - maxReadRequestOperationCount, - maxReadRequestSize, - maxOutstandingReadRequests, - maxWriteRequestOperationCount, - maxWriteRequestSize, - maxOutstandingWriteRequests, - maxWriteBufferCount, - maxWriteBufferSize, - maxRetryDelay, - readPollTimeout); + return Objects.hash(followerIndex, parameters); } }