Skip to content

Commit

Permalink
Clean up duplicate follow config parameter code (#37688)
Browse files Browse the repository at this point in the history
Introduced FollowParameters class that put follow, resume follow,
put auto follow pattern requests and follow info response classes reuse.

The FollowParameters class had the fields, getters etc. for the common parameters
that all these APIs have.  Also binary and xcontent serialization /
parsing is handled by this class.

The follow, resume follow, put auto follow pattern request classes originally
used optional non primitive fields, so FollowParameters has that too and the follow info api can handle that now too.

Also the followerIndex field can in production only be specified via
the url path. If it is also specified via the request body then
it must have the same value as is specified in the url path. This
option only existed to xcontent testing. However the AbstractSerializingTestCase
base class now also supports createXContextTestInstance() to provide
a different test instance when testing xcontent, so allowing followerIndex
to be specified via the request body is no longer needed.

By moving the followerIndex field from Body to ResumeFollowAction.Request
class and not allowing the followerIndex field to be specified via
the request body the Body class is redundant and can be removed. The
ResumeFollowAction.Request class can then directly use the
FollowParameters class.

For consistency I also removed the ability to specified followerIndex
in the put follow api and the name in put auto follow pattern api via
the request body.
  • Loading branch information
martijnvg authored Feb 5, 2019
1 parent 2f6afd2 commit 0beb3c9
Show file tree
Hide file tree
Showing 25 changed files with 705 additions and 1,016 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,

static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");

private final String remoteCluster;
private final String leaderIndex;
Expand All @@ -55,7 +54,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD;

public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject {

private final String followerIndex;
Expand All @@ -39,7 +37,6 @@ public ResumeFollowRequest(String followerIndex) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowRequest> {

private static final ConstructingObjectParser<PutFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2]));
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], "followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
PutFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -82,7 +81,7 @@ protected boolean supportsUnknownFields() {
@Override
protected PutFollowRequest createTestInstance() {
PutFollowRequest putFollowRequest =
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4));
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), "followerIndex");
if (randomBoolean()) {
putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
Expand All @@ -30,11 +29,10 @@

public class ResumeFollowRequestTests extends AbstractXContentTestCase<ResumeFollowRequest> {

private static final ConstructingObjectParser<ResumeFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new ResumeFollowRequest((String) args[0]));
private static final ObjectParser<ResumeFollowRequest, Void> PARSER = new ObjectParser<>("test_parser",
true, () -> new ResumeFollowRequest("followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ResumeFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -79,7 +77,7 @@ protected boolean supportsUnknownFields() {

@Override
protected ResumeFollowRequest createTestInstance() {
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4));
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("followerIndex");
if (randomBoolean()) {
resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -514,23 +513,20 @@ private void followLeaderIndex(String autoFollowPattenName,
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize());
followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setReadPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowRequest(followRequest);
request.setFollowerIndex(followIndexName);
request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
request.getParameters().setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
request.getParameters().setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
request.getParameters().setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay());
request.getParameters().setReadPollTimeout(pattern.getPollTimeout());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;

Expand Down Expand Up @@ -97,18 +97,17 @@ static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, C
String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
if (result.isPresent()) {
ShardFollowTask params = result.get();
FollowParameters followParameters = new FollowParameters(
params.getMaxReadRequestOperationCount(),
params.getMaxReadRequestSize(),
params.getMaxOutstandingReadRequests(),
params.getMaxWriteRequestOperationCount(),
params.getMaxWriteRequestSize(),
params.getMaxOutstandingWriteRequests(),
params.getMaxWriteBufferCount(),
params.getMaxWriteBufferSize(),
params.getMaxRetryDelay(),
params.getReadPollTimeout()
);
FollowParameters followParameters = new FollowParameters();
followParameters.setMaxOutstandingReadRequests(params.getMaxOutstandingReadRequests());
followParameters.setMaxOutstandingWriteRequests(params.getMaxOutstandingWriteRequests());
followParameters.setMaxReadRequestOperationCount(params.getMaxReadRequestOperationCount());
followParameters.setMaxWriteRequestOperationCount(params.getMaxWriteRequestOperationCount());
followParameters.setMaxReadRequestSize(params.getMaxReadRequestSize());
followParameters.setMaxWriteRequestSize(params.getMaxWriteRequestSize());
followParameters.setMaxWriteBufferCount(params.getMaxWriteBufferCount());
followParameters.setMaxWriteBufferSize(params.getMaxWriteBufferSize());
followParameters.setMaxRetryDelay(params.getMaxRetryDelay());
followParameters.setReadPollTimeout(params.getReadPollTimeout());
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters));
} else {
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
previousPattern, followedIndexUUIDs);
} else {
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
followedIndexUUIDs);
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs);
}

if (filteredHeaders != null) {
Expand All @@ -159,16 +158,16 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getMaxReadRequestOperationCount(),
request.getMaxReadRequestSize(),
request.getMaxConcurrentReadBatches(),
request.getMaxWriteRequestOperationCount(),
request.getMaxWriteRequestSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getReadPollTimeout());
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxOutstandingWriteRequests(),
request.getParameters().getMaxWriteBufferCount(),
request.getParameters().getMaxWriteBufferSize(),
request.getParameters().getMaxRetryDelay(),
request.getParameters().getReadPollTimeout());
patterns.put(request.getName(), autoFollowPattern);
ClusterState.Builder newState = ClusterState.builder(localState);
newState.metaData(MetaData.builder(localState.getMetaData())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

Expand Down Expand Up @@ -126,18 +127,18 @@ private void createFollowerIndex(
// soft deletes are enabled by default on indices created on 7.0.0 or later
if (leaderIndexMetaData.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(),
IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(leaderIndexMetaData.getSettings()).onOrAfter(Version.V_7_0_0)) == false) {
listener.onFailure(
new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not have soft deletes enabled"));
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() +
"] does not have soft deletes enabled"));
return;
}

final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex())
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);

final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
Expand Down Expand Up @@ -217,10 +218,14 @@ private void initiateFollowing(
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(new FollowParameters(parameters));
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
Expand All @@ -232,6 +237,6 @@ private void initiateFollowing(

@Override
protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex());
}
}
Loading

0 comments on commit 0beb3c9

Please sign in to comment.