Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Added more validation to follow index api. #31068

Merged
merged 4 commits into from
Jun 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,37 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -151,16 +165,18 @@ public static class TransportAction extends HandledTransportAction<Request, Resp
private final ClusterService clusterService;
private final RemoteClusterService remoteClusterService;
private final PersistentTasksService persistentTasksService;
private final IndicesService indicesService;

@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService,
PersistentTasksService persistentTasksService) {
PersistentTasksService persistentTasksService, IndicesService indicesService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.persistentTasksService = persistentTasksService;
this.indicesService = indicesService;
}

@Override
Expand All @@ -173,7 +189,12 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex);
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
try {
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
} catch (IOException e) {
listener.onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a return method here just in case? I don't like this construct but can't think of how to improve it (all other options I see suck too)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is really needed, because this were the if code block ends and below it is an else code block and that is it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's not needed now but it's a bug waiting to happen when someone adds a line below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it 👍

return;
}
} else {
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
assert remoteClusterIndices.size() == 1;
Expand Down Expand Up @@ -206,81 +227,168 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
* </ul>
*/
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) {
validate (leaderIndexMetadata ,followIndexMetadata , request);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}

ActionListener<Response> handler) throws IOException {
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
responses.set(shardId, task);
finalizeResponse();
}

void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}

void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}

if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
}
}
}
);
}
}
}

private static final Set<Setting<?>> WHITELISTED_SETTINGS;

static {
Set<Setting<?>> whiteListedSettings = new HashSet<>();
whiteListedSettings.add(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING);
whiteListedSettings.add(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING);

whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING);
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING);
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING);
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING);
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING);
whiteListedSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING);

whiteListedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING);
whiteListedSettings.add(IndexSettings.MAX_RESULT_WINDOW_SETTING);
whiteListedSettings.add(IndexSettings.INDEX_WARMER_ENABLED_SETTING);
whiteListedSettings.add(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING);
whiteListedSettings.add(IndexSettings.MAX_RESCORE_WINDOW_SETTING);
whiteListedSettings.add(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING);
whiteListedSettings.add(IndexSettings.DEFAULT_FIELD_SETTING);
whiteListedSettings.add(IndexSettings.QUERY_STRING_LENIENT_SETTING);
whiteListedSettings.add(IndexSettings.QUERY_STRING_ANALYZE_WILDCARD);
whiteListedSettings.add(IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD);
whiteListedSettings.add(IndexSettings.ALLOW_UNMAPPED);
whiteListedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER);
whiteListedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);

whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING);
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING);

whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING);
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);

WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);
}

static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, MapperService followerMapperService) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}

if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
}

if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
}
// TODO: other validation checks
if (leaderIndex.getRoutingNumShards() != followIndex.getRoutingNumShards()) {
throw new IllegalArgumentException("leader index number_of_routing_shards [" + leaderIndex.getRoutingNumShards() +
"] does not match with the number_of_routing_shards of the follow index [" + followIndex.getRoutingNumShards() + "]");
}
if (leaderIndex.getState() != IndexMetaData.State.OPEN || followIndex.getState() != IndexMetaData.State.OPEN) {
throw new IllegalArgumentException("leader and follow index must be open");
}

// Make a copy, remove settings that are allowed to be different and then compare if the settings are equal.
Settings leaderSettings = filter(leaderIndex.getSettings());
Settings followerSettings = filter(followIndex.getSettings());
if (leaderSettings.equals(followerSettings) == false) {
throw new IllegalArgumentException("the leader and follower index settings must be identical");
}

// Validates if the current follower mapping is mergable with the leader mapping.
// This also validates for example whether specific mapper plugins have been installed
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
}

private static Settings filter(Settings originalSettings) {
Settings.Builder settings = Settings.builder().put(originalSettings);
// Remove settings that are always going to be different between leader and follow index:
settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());
settings.remove(IndexMetaData.SETTING_INDEX_UUID);
settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME);
settings.remove(IndexMetaData.SETTING_CREATION_DATE);

Iterator<String> iterator = settings.keys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
for (Setting<?> whitelistedSetting : WHITELISTED_SETTINGS) {
if (whitelistedSetting.match(key)) {
iterator.remove();
break;
}
}
}
return settings.build();
}

}
Loading