Skip to content

Commit

Permalink
Clean up duplicate follow config parameter code (elastic#37688)
Browse files Browse the repository at this point in the history
Introduced FollowParameters class that put follow, resume follow,
put auto follow pattern requests and follow info response classes reuse.

The FollowParameters class had the fields, getters etc. for the common parameters
that all these APIs have.  Also binary and xcontent serialization /
parsing is handled by this class.

The follow, resume follow, put auto follow pattern request classes originally
used optional non primitive fields, so FollowParameters has that too and the follow info api can handle that now too.

Also the followerIndex field can in production only be specified via
the url path. If it is also specified via the request body then
it must have the same value as is specified in the url path. This
option only existed to xcontent testing. However the AbstractSerializingTestCase
base class now also supports createXContextTestInstance() to provide
a different test instance when testing xcontent, so allowing followerIndex
to be specified via the request body is no longer needed.

By moving the followerIndex field from Body to ResumeFollowAction.Request
class and not allowing the followerIndex field to be specified via
the request body the Body class is redundant and can be removed. The
ResumeFollowAction.Request class can then directly use the
FollowParameters class.

For consistency I also removed the ability to specified followerIndex
in the put follow api and the name in put auto follow pattern api via
the request body.
  • Loading branch information
martijnvg committed Feb 5, 2019
1 parent bfa1a8c commit 07dab70
Show file tree
Hide file tree
Showing 26 changed files with 714 additions and 1,022 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,

static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");

private final String remoteCluster;
private final String leaderIndex;
Expand All @@ -55,7 +54,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD;

public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject {

private final String followerIndex;
Expand All @@ -39,7 +37,6 @@ public ResumeFollowRequest(String followerIndex) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowRequest> {

private static final ConstructingObjectParser<PutFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2]));
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], "followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
PutFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -82,7 +81,7 @@ protected boolean supportsUnknownFields() {
@Override
protected PutFollowRequest createTestInstance() {
PutFollowRequest putFollowRequest =
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4));
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), "followerIndex");
if (randomBoolean()) {
putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
Expand All @@ -30,11 +29,10 @@

public class ResumeFollowRequestTests extends AbstractXContentTestCase<ResumeFollowRequest> {

private static final ConstructingObjectParser<ResumeFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new ResumeFollowRequest((String) args[0]));
private static final ObjectParser<ResumeFollowRequest, Void> PARSER = new ObjectParser<>("test_parser",
true, () -> new ResumeFollowRequest("followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ResumeFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -79,7 +77,7 @@ protected boolean supportsUnknownFields() {

@Override
protected ResumeFollowRequest createTestInstance() {
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4));
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("followerIndex");
if (randomBoolean()) {
resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -511,23 +510,20 @@ private void followLeaderIndex(String autoFollowPattenName,
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize());
followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setReadPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowRequest(followRequest);
request.setFollowerIndex(followIndexName);
request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
request.getParameters().setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
request.getParameters().setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
request.getParameters().setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay());
request.getParameters().setReadPollTimeout(pattern.getPollTimeout());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected Boolean newResponse(final boolean acknowledged) {

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
String followIndex = request.getFollowRequest().getFollowerIndex();
String followIndex = request.getFollowerIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
Expand Down Expand Up @@ -112,10 +112,10 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
ClusterState updatedState = builder.build();

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex()));
.addAsNew(updatedState.metaData().index(request.getFollowerIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowerIndex() + "] created");
"follow index [" + request.getFollowerIndex() + "] created");

logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());
Expand All @@ -126,10 +126,13 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
}

private void initiateFollowing(final PutFollowAction.Request request, final ActionListener<PutFollowAction.Response> listener) {
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(request.getParameters());
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;

Expand Down Expand Up @@ -99,18 +99,17 @@ static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, C
String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
if (result.isPresent()) {
ShardFollowTask params = result.get();
FollowParameters followParameters = new FollowParameters(
params.getMaxReadRequestOperationCount(),
params.getMaxReadRequestSize(),
params.getMaxOutstandingReadRequests(),
params.getMaxWriteRequestOperationCount(),
params.getMaxWriteRequestSize(),
params.getMaxOutstandingWriteRequests(),
params.getMaxWriteBufferCount(),
params.getMaxWriteBufferSize(),
params.getMaxRetryDelay(),
params.getReadPollTimeout()
);
FollowParameters followParameters = new FollowParameters();
followParameters.setMaxOutstandingReadRequests(params.getMaxOutstandingReadRequests());
followParameters.setMaxOutstandingWriteRequests(params.getMaxOutstandingWriteRequests());
followParameters.setMaxReadRequestOperationCount(params.getMaxReadRequestOperationCount());
followParameters.setMaxWriteRequestOperationCount(params.getMaxWriteRequestOperationCount());
followParameters.setMaxReadRequestSize(params.getMaxReadRequestSize());
followParameters.setMaxWriteRequestSize(params.getMaxWriteRequestSize());
followParameters.setMaxWriteBufferCount(params.getMaxWriteBufferCount());
followParameters.setMaxWriteBufferSize(params.getMaxWriteBufferSize());
followParameters.setMaxRetryDelay(params.getMaxRetryDelay());
followParameters.setReadPollTimeout(params.getReadPollTimeout());
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters));
} else {
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
previousPattern, followedIndexUUIDs);
} else {
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
followedIndexUUIDs);
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs);
}

if (filteredHeaders != null) {
Expand All @@ -161,16 +160,16 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getMaxReadRequestOperationCount(),
request.getMaxReadRequestSize(),
request.getMaxConcurrentReadBatches(),
request.getMaxWriteRequestOperationCount(),
request.getMaxWriteRequestSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getReadPollTimeout());
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxOutstandingWriteRequests(),
request.getParameters().getMaxWriteBufferCount(),
request.getParameters().getMaxWriteBufferSize(),
request.getParameters().getMaxRetryDelay(),
request.getParameters().getReadPollTimeout());
patterns.put(request.getName(), autoFollowPattern);
ClusterState.Builder newState = ClusterState.builder(localState);
newState.metaData(MetaData.builder(localState.getMetaData())
Expand Down
Loading

0 comments on commit 07dab70

Please sign in to comment.