Skip to content

Commit

Permalink
Core: don't replace indices within the ActionRequest with the concret…
Browse files Browse the repository at this point in the history
…e ones, and make sure check blocks is executed on concrete indices

Concrete indices is now called multiple times when needed instead of changing what's inside the incoming request with the concrete indices. Ideally we want to keep the original aliases or indices or wildcard expressions in the request.

Also made sure that the check blocks is done against the concrete indices, which wasn't the case for delete index, delete mapping, open index, close index, types exists and indices exists.

Closes #6694
Closes #6777
  • Loading branch information
javanna committed Jul 8, 2014
1 parent caf11ff commit 8dedbd0
Show file tree
Hide file tree
Showing 20 changed files with 204 additions and 155 deletions.
8 changes: 6 additions & 2 deletions rest-api-spec/test/cluster.state/20_filtering.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ setup:

---
"Filtering the cluster state by blocks should return the blocks":
# read only index
# TODO: can this cause issues leaving it read only when deleting it in teardown
- do:
indices.put_settings:
index: testidx
Expand All @@ -42,6 +40,12 @@ setup:
- is_false: routing_nodes
- length: { blocks: 1 }

- do:
indices.put_settings:
index: testidx
body:
index.blocks.read_only: false

---
"Filtering the cluster state by nodes only should work":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ protected AliasesExistResponse newResponse() {
@Override
protected void masterOperation(GetAliasesRequest request, ClusterState state, ActionListener<AliasesExistResponse> listener) throws ElasticsearchException {
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
request.indices(concreteIndices);

boolean result = state.metaData().hasAliases(request.aliases(), request.indices());
boolean result = state.metaData().hasAliases(request.aliases(), concreteIndices);
listener.onResponse(new AliasesExistResponse(result));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ protected GetAliasesResponse newResponse() {
@Override
protected void masterOperation(GetAliasesRequest request, ClusterState state, ActionListener<GetAliasesResponse> listener) throws ElasticsearchException {
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
request.indices(concreteIndices);

@SuppressWarnings("unchecked") // ImmutableList to List results incompatible type
ImmutableOpenMap<String, List<AliasMetaData>> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), request.indices());
ImmutableOpenMap<String, List<AliasMetaData>> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), concreteIndices);
listener.onResponse(new GetAliasesResponse(result));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ protected void doExecute(CloseIndexRequest request, ActionListener<CloseIndexRes

@Override
protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state, final ActionListener<CloseIndexResponse> listener) throws ElasticsearchException {
request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(request.indices());
.indices(concreteIndices);

indexStateService.closeIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

Expand All @@ -99,7 +99,7 @@ public void onResponse(ClusterStateUpdateResponse response) {

@Override
public void onFailure(Throwable t) {
logger.debug("failed to close indices [{}]", t, request.indices());
logger.debug("failed to close indices [{}]", t, concreteIndices);
listener.onFailure(t);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ protected void doExecute(DeleteIndexRequest request, ActionListener<DeleteIndexR

@Override
protected ClusterBlockException checkBlock(DeleteIndexRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected void masterOperation(final DeleteIndexRequest request, final ClusterState state, final ActionListener<DeleteIndexResponse> listener) throws ElasticsearchException {
request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
if (request.indices().length == 0) {
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
if (concreteIndices.length == 0) {
listener.onResponse(new DeleteIndexResponse(true));
return;
}
// TODO: this API should be improved, currently, if one delete index failed, we send a failure, we should send a response array that includes all the indices that were deleted
final CountDown count = new CountDown(request.indices().length);
for (final String index : request.indices()) {
final CountDown count = new CountDown(concreteIndices.length);
for (final String index : concreteIndices) {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {

private volatile Throwable lastFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -64,16 +65,11 @@ protected IndicesExistsResponse newResponse() {
return new IndicesExistsResponse();
}

@Override
protected void doExecute(IndicesExistsRequest request, ActionListener<IndicesExistsResponse> listener) {
// don't call this since it will throw IndexMissingException
//request.indices(clusterService.state().metaData().concreteIndices(request.indices()));
super.doExecute(request, listener);
}

@Override
protected ClusterBlockException checkBlock(IndicesExistsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
//make sure through indices options that the concrete indices call never throws IndexMissingException
IndicesOptions indicesOptions = IndicesOptions.fromOptions(true, true, request.indicesOptions().expandWildcardsOpen(), request.indicesOptions().expandWildcardsClosed());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, clusterService.state().metaData().concreteIndices(indicesOptions, request.indices()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected TypesExistsResponse newResponse() {

@Override
protected ClusterBlockException checkBlock(TypesExistsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ protected void doExecute(DeleteMappingRequest request, ActionListener<DeleteMapp

@Override
protected ClusterBlockException checkBlock(DeleteMappingRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected void masterOperation(final DeleteMappingRequest request, final ClusterState state, final ActionListener<DeleteMappingResponse> listener) throws ElasticsearchException {
request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
flushAction.execute(Requests.flushRequest(request.indices()), new ActionListener<FlushResponse>() {
final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
flushAction.execute(Requests.flushRequest(concreteIndices), new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -125,7 +125,7 @@ public void onResponse(FlushResponse flushResponse) {

// get all types that need to be deleted.
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> result = clusterService.state().metaData().findMappings(
request.indices(), request.types()
concreteIndices, request.types()
);
// create OrFilter with type filters within to account for different types
BoolFilterBuilder filterBuilder = new BoolFilterBuilder();
Expand All @@ -142,7 +142,7 @@ public void onResponse(FlushResponse flushResponse) {
request.types(types.toArray(new String[types.size()]));
QuerySourceBuilder querySourceBuilder = new QuerySourceBuilder()
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filterBuilder));
deleteByQueryAction.execute(Requests.deleteByQueryRequest(request.indices()).source(querySourceBuilder), new ActionListener<DeleteByQueryResponse>() {
deleteByQueryAction.execute(Requests.deleteByQueryRequest(concreteIndices).source(querySourceBuilder), new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -155,7 +155,7 @@ public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
}
}
}
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
refreshAction.execute(Requests.refreshRequest(concreteIndices), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -174,7 +174,7 @@ public void onFailure(Throwable e) {

protected void removeMapping() {
DeleteMappingClusterStateUpdateRequest clusterStateUpdateRequest = new DeleteMappingClusterStateUpdateRequest()
.indices(request.indices()).types(request.types())
.indices(concreteIndices).types(request.types())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected void doExecute(GetFieldMappingsRequest request, final ActionListener<G
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<>(concreteIndices.length);

if (concreteIndices == null || concreteIndices.length == 0) {
if (concreteIndices.length == 0) {
listener.onResponse(new GetFieldMappingsResponse());
} else {
boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ protected GetMappingsResponse newResponse() {
}

@Override
protected void doMasterOperation(final GetMappingsRequest request, final ClusterState state, final ActionListener<GetMappingsResponse> listener) throws ElasticsearchException {
protected void doMasterOperation(final GetMappingsRequest request, String[] concreteIndices, final ClusterState state, final ActionListener<GetMappingsResponse> listener) throws ElasticsearchException {
logger.trace("serving getMapping request based on version {}", state.version());
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> result = state.metaData().findMappings(
request.indices(), request.types()
concreteIndices, request.types()
);
listener.onResponse(new GetMappingsResponse(result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,17 @@ protected PutMappingResponse newResponse() {
return new PutMappingResponse();
}

@Override
protected void doExecute(PutMappingRequest request, ActionListener<PutMappingResponse> listener) {
request.indices(clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices()));
super.doExecute(request, listener);
}

@Override
protected ClusterBlockException checkBlock(PutMappingRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) throws ElasticsearchException {
final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices());
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(request.indices()).type(request.type())
.indices(concreteIndices).type(request.type())
.source(request.source()).ignoreConflicts(request.ignoreConflicts());

metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
Expand All @@ -95,7 +90,7 @@ public void onResponse(ClusterStateUpdateResponse response) {

@Override
public void onFailure(Throwable t) {
logger.debug("failed to put mappings on indices [{}], type [{}]", t, request.indices(), request.type());
logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type());
listener.onFailure(t);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ protected void doExecute(OpenIndexRequest request, ActionListener<OpenIndexRespo

@Override
protected ClusterBlockException checkBlock(OpenIndexRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}

@Override
protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener<OpenIndexResponse> listener) throws ElasticsearchException {
request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(request.indices());
.indices(concreteIndices);

indexStateService.openIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

Expand All @@ -99,7 +99,7 @@ public void onResponse(ClusterStateUpdateResponse response) {

@Override
public void onFailure(Throwable t) {
logger.debug("failed to open indices [{}]", t, request.indices());
logger.debug("failed to open indices [{}]", t, concreteIndices);
listener.onFailure(t);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ protected GetSettingsResponse newResponse() {

@Override
protected void masterOperation(GetSettingsRequest request, ClusterState state, ActionListener<GetSettingsResponse> listener) throws ElasticsearchException {
request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
ImmutableOpenMap.Builder<String, Settings> indexToSettingsBuilder = ImmutableOpenMap.builder();
for (String concreteIndex : request.indices()) {
for (String concreteIndex : concreteIndices) {
IndexMetaData indexMetaData = state.getMetaData().index(concreteIndex);
if (indexMetaData == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,11 @@ protected UpdateSettingsResponse newResponse() {
return new UpdateSettingsResponse();
}

@Override
protected void doExecute(UpdateSettingsRequest request, ActionListener<UpdateSettingsResponse> listener) {
request.indices(clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices()));
super.doExecute(request, listener);
}

@Override
protected void masterOperation(final UpdateSettingsRequest request, final ClusterState state, final ActionListener<UpdateSettingsResponse> listener) throws ElasticsearchException {
final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices());
UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest()
.indices(request.indices())
.indices(concreteIndices)
.settings(request.settings())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
Expand All @@ -88,7 +83,7 @@ public void onResponse(ClusterStateUpdateResponse response) {

@Override
public void onFailure(Throwable t) {
logger.debug("failed to update settings on indices [{}]", t, request.indices());
logger.debug("failed to update settings on indices [{}]", t, concreteIndices);
listener.onFailure(t);
}
});
Expand Down
Loading

0 comments on commit 8dedbd0

Please sign in to comment.