Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up duplicate follow config parameter code #37688

Merged
merged 13 commits into from
Feb 5, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,

static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");

private final String remoteCluster;
private final String leaderIndex;
Expand All @@ -55,7 +54,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD;

public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject {

private final String followerIndex;
Expand All @@ -39,7 +37,6 @@ public ResumeFollowRequest(String followerIndex) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowRequest> {

private static final ConstructingObjectParser<PutFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2]));
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], "followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
PutFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -82,7 +81,7 @@ protected boolean supportsUnknownFields() {
@Override
protected PutFollowRequest createTestInstance() {
PutFollowRequest putFollowRequest =
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4));
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), "followerIndex");
if (randomBoolean()) {
putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
Expand All @@ -30,11 +29,10 @@

public class ResumeFollowRequestTests extends AbstractXContentTestCase<ResumeFollowRequest> {

private static final ConstructingObjectParser<ResumeFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new ResumeFollowRequest((String) args[0]));
private static final ObjectParser<ResumeFollowRequest, Void> PARSER = new ObjectParser<>("test_parser",
true, () -> new ResumeFollowRequest("followerIndex"));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ResumeFollowRequest::setMaxReadRequestSize,
Expand Down Expand Up @@ -79,7 +77,7 @@ protected boolean supportsUnknownFields() {

@Override
protected ResumeFollowRequest createTestInstance() {
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4));
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("followerIndex");
if (randomBoolean()) {
resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -514,23 +513,20 @@ private void followLeaderIndex(String autoFollowPattenName,
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize());
followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setReadPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowRequest(followRequest);
request.setFollowerIndex(followIndexName);
request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
request.getParameters().setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
request.getParameters().setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
request.getParameters().setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay());
request.getParameters().setReadPollTimeout(pattern.getPollTimeout());

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;

Expand Down Expand Up @@ -97,18 +97,17 @@ static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, C
String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
if (result.isPresent()) {
ShardFollowTask params = result.get();
FollowParameters followParameters = new FollowParameters(
params.getMaxReadRequestOperationCount(),
params.getMaxReadRequestSize(),
params.getMaxOutstandingReadRequests(),
params.getMaxWriteRequestOperationCount(),
params.getMaxWriteRequestSize(),
params.getMaxOutstandingWriteRequests(),
params.getMaxWriteBufferCount(),
params.getMaxWriteBufferSize(),
params.getMaxRetryDelay(),
params.getReadPollTimeout()
);
FollowParameters followParameters = new FollowParameters();
followParameters.setMaxOutstandingReadRequests(params.getMaxOutstandingReadRequests());
followParameters.setMaxOutstandingWriteRequests(params.getMaxOutstandingWriteRequests());
followParameters.setMaxReadRequestOperationCount(params.getMaxReadRequestOperationCount());
followParameters.setMaxWriteRequestOperationCount(params.getMaxWriteRequestOperationCount());
followParameters.setMaxReadRequestSize(params.getMaxReadRequestSize());
followParameters.setMaxWriteRequestSize(params.getMaxWriteRequestSize());
followParameters.setMaxWriteBufferCount(params.getMaxWriteBufferCount());
followParameters.setMaxWriteBufferSize(params.getMaxWriteBufferSize());
followParameters.setMaxRetryDelay(params.getMaxRetryDelay());
followParameters.setReadPollTimeout(params.getReadPollTimeout());
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters));
} else {
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
previousPattern, followedIndexUUIDs);
} else {
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
followedIndexUUIDs);
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs);
}

if (filteredHeaders != null) {
Expand All @@ -159,16 +158,16 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getMaxReadRequestOperationCount(),
request.getMaxReadRequestSize(),
request.getMaxConcurrentReadBatches(),
request.getMaxWriteRequestOperationCount(),
request.getMaxWriteRequestSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getReadPollTimeout());
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxOutstandingWriteRequests(),
request.getParameters().getMaxWriteBufferCount(),
request.getParameters().getMaxWriteBufferSize(),
request.getParameters().getMaxRetryDelay(),
request.getParameters().getReadPollTimeout());
patterns.put(request.getName(), autoFollowPattern);
ClusterState.Builder newState = ClusterState.builder(localState);
newState.metaData(MetaData.builder(localState.getMetaData())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

Expand Down Expand Up @@ -126,18 +127,18 @@ private void createFollowerIndex(
// soft deletes are enabled by default on indices created on 7.0.0 or later
if (leaderIndexMetaData.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(),
IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(leaderIndexMetaData.getSettings()).onOrAfter(Version.V_7_0_0)) == false) {
listener.onFailure(
new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not have soft deletes enabled"));
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() +
"] does not have soft deletes enabled"));
return;
}

final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex())
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);

final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
Expand Down Expand Up @@ -217,10 +218,24 @@ private void initiateFollowing(
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.getParameters().setMaxOutstandingReadRequests(parameters.getMaxOutstandingReadRequests());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
resumeFollowRequest.getParameters().setMaxOutstandingWriteRequests(parameters.getMaxOutstandingWriteRequests());
resumeFollowRequest.getParameters().setMaxReadRequestOperationCount(parameters.getMaxReadRequestOperationCount());
resumeFollowRequest.getParameters().setMaxWriteRequestOperationCount(
parameters.getMaxWriteRequestOperationCount());
resumeFollowRequest.getParameters().setMaxReadRequestSize(parameters.getMaxReadRequestSize());
resumeFollowRequest.getParameters().setMaxWriteRequestSize(parameters.getMaxWriteRequestSize());
resumeFollowRequest.getParameters().setMaxWriteBufferCount(parameters.getMaxWriteBufferCount());
resumeFollowRequest.getParameters().setMaxWriteBufferSize(parameters.getMaxWriteBufferSize());
resumeFollowRequest.getParameters().setReadPollTimeout(parameters.getReadPollTimeout());
resumeFollowRequest.getParameters().setMaxRetryDelay(parameters.getMaxRetryDelay());
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
Expand All @@ -232,6 +247,6 @@ private void initiateFollowing(

@Override
protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex());
}
}
Loading