From 8dedbd01df14c76fc8f1ac9060facc053fa2be9b Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 1 Jul 2014 15:19:04 +0200 Subject: [PATCH] Core: don't replace indices within the ActionRequest with the concrete ones, and make sure check blocks is executed on concrete indices Concrete indices is now called multiple times when needed instead of changing what's inside the incoming request with the concrete indices. Ideally we want to keep the original aliases or indices or wildcard expressions in the request. Also made sure that the check blocks is done against the concrete indices, which wasn't the case for delete index, delete mapping, open index, close index, types exists and indices exists. Closes #6694 Closes #6777 --- .../test/cluster.state/20_filtering.yaml | 8 +- .../exists/TransportAliasesExistAction.java | 4 +- .../alias/get/TransportGetAliasesAction.java | 4 +- .../close/TransportCloseIndexAction.java | 8 +- .../delete/TransportDeleteIndexAction.java | 10 +- .../indices/TransportIndicesExistsAction.java | 12 +- .../types/TransportTypesExistsAction.java | 2 +- .../delete/TransportDeleteMappingAction.java | 14 +-- .../get/TransportGetFieldMappingsAction.java | 2 +- .../get/TransportGetMappingsAction.java | 4 +- .../put/TransportPutMappingAction.java | 13 +- .../open/TransportOpenIndexAction.java | 8 +- .../get/TransportGetSettingsAction.java | 4 +- .../put/TransportUpdateSettingsAction.java | 11 +- .../delete/TransportDeleteWarmerAction.java | 16 +-- .../warmer/get/TransportGetWarmersAction.java | 4 +- .../info/TransportClusterInfoAction.java | 5 +- ...portIndicesReplicationOperationAction.java | 2 +- .../action/bulk/BulkProcessorTests.java | 119 +++++++++--------- .../cluster/BlockClusterStatsTests.java | 109 ++++++++++++---- 20 files changed, 204 insertions(+), 155 deletions(-) diff --git a/rest-api-spec/test/cluster.state/20_filtering.yaml b/rest-api-spec/test/cluster.state/20_filtering.yaml index 031d567b67f55..ee8df1a533147 100644 --- a/rest-api-spec/test/cluster.state/20_filtering.yaml +++ b/rest-api-spec/test/cluster.state/20_filtering.yaml @@ -24,8 +24,6 @@ setup: --- "Filtering the cluster state by blocks should return the blocks": -# read only index -# TODO: can this cause issues leaving it read only when deleting it in teardown - do: indices.put_settings: index: testidx @@ -42,6 +40,12 @@ setup: - is_false: routing_nodes - length: { blocks: 1 } + - do: + indices.put_settings: + index: testidx + body: + index.blocks.read_only: false + --- "Filtering the cluster state by nodes only should work": - do: diff --git a/src/main/java/org/elasticsearch/action/admin/indices/alias/exists/TransportAliasesExistAction.java b/src/main/java/org/elasticsearch/action/admin/indices/alias/exists/TransportAliasesExistAction.java index a27af48ea5f01..b9d94348b39a0 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/alias/exists/TransportAliasesExistAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/alias/exists/TransportAliasesExistAction.java @@ -62,9 +62,7 @@ protected AliasesExistResponse newResponse() { @Override protected void masterOperation(GetAliasesRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); - request.indices(concreteIndices); - - boolean result = state.metaData().hasAliases(request.aliases(), request.indices()); + boolean result = state.metaData().hasAliases(request.aliases(), concreteIndices); listener.onResponse(new AliasesExistResponse(result)); } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java b/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java index e340bf65f5c78..553448410ea76 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java @@ -65,10 +65,8 @@ protected GetAliasesResponse newResponse() { @Override protected void masterOperation(GetAliasesRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); - request.indices(concreteIndices); - @SuppressWarnings("unchecked") // ImmutableList to List results incompatible type - ImmutableOpenMap> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), request.indices()); + ImmutableOpenMap> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), concreteIndices); listener.onResponse(new GetAliasesResponse(result)); } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index b73bb494639f8..cfef1ed47c802 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -80,15 +80,15 @@ protected void doExecute(CloseIndexRequest request, ActionListener listener) throws ElasticsearchException { - request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices())); + final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest() .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) - .indices(request.indices()); + .indices(concreteIndices); indexStateService.closeIndex(updateRequest, new ActionListener() { @@ -99,7 +99,7 @@ public void onResponse(ClusterStateUpdateResponse response) { @Override public void onFailure(Throwable t) { - logger.debug("failed to close indices [{}]", t, request.indices()); + logger.debug("failed to close indices [{}]", t, concreteIndices); listener.onFailure(t); } }); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 160d5ac80b541..6b7c20f6500fc 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -80,19 +80,19 @@ protected void doExecute(DeleteIndexRequest request, ActionListener listener) throws ElasticsearchException { - request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices())); - if (request.indices().length == 0) { + String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); + if (concreteIndices.length == 0) { listener.onResponse(new DeleteIndexResponse(true)); return; } // TODO: this API should be improved, currently, if one delete index failed, we send a failure, we should send a response array that includes all the indices that were deleted - final CountDown count = new CountDown(request.indices().length); - for (final String index : request.indices()) { + final CountDown count = new CountDown(concreteIndices.length); + for (final String index : concreteIndices) { deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() { private volatile Throwable lastFailure; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java index 5fd946009e1e5..e985ca50aea53 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -64,16 +65,11 @@ protected IndicesExistsResponse newResponse() { return new IndicesExistsResponse(); } - @Override - protected void doExecute(IndicesExistsRequest request, ActionListener listener) { - // don't call this since it will throw IndexMissingException - //request.indices(clusterService.state().metaData().concreteIndices(request.indices())); - super.doExecute(request, listener); - } - @Override protected ClusterBlockException checkBlock(IndicesExistsRequest request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices()); + //make sure through indices options that the concrete indices call never throws IndexMissingException + IndicesOptions indicesOptions = IndicesOptions.fromOptions(true, true, request.indicesOptions().expandWildcardsOpen(), request.indicesOptions().expandWildcardsClosed()); + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, clusterService.state().metaData().concreteIndices(indicesOptions, request.indices())); } @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/exists/types/TransportTypesExistsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/exists/types/TransportTypesExistsAction.java index 2a3796d3733d3..1faa6913e3f7a 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/exists/types/TransportTypesExistsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/exists/types/TransportTypesExistsAction.java @@ -66,7 +66,7 @@ protected TypesExistsResponse newResponse() { @Override protected ClusterBlockException checkBlock(TypesExistsRequest request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices()); + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices())); } @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java index 549fc868477ba..46d462d594f5f 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java @@ -110,13 +110,13 @@ protected void doExecute(DeleteMappingRequest request, ActionListener listener) throws ElasticsearchException { - request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices())); - flushAction.execute(Requests.flushRequest(request.indices()), new ActionListener() { + final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); + flushAction.execute(Requests.flushRequest(concreteIndices), new ActionListener() { @Override public void onResponse(FlushResponse flushResponse) { if (logger.isTraceEnabled()) { @@ -125,7 +125,7 @@ public void onResponse(FlushResponse flushResponse) { // get all types that need to be deleted. ImmutableOpenMap> result = clusterService.state().metaData().findMappings( - request.indices(), request.types() + concreteIndices, request.types() ); // create OrFilter with type filters within to account for different types BoolFilterBuilder filterBuilder = new BoolFilterBuilder(); @@ -142,7 +142,7 @@ public void onResponse(FlushResponse flushResponse) { request.types(types.toArray(new String[types.size()])); QuerySourceBuilder querySourceBuilder = new QuerySourceBuilder() .setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filterBuilder)); - deleteByQueryAction.execute(Requests.deleteByQueryRequest(request.indices()).source(querySourceBuilder), new ActionListener() { + deleteByQueryAction.execute(Requests.deleteByQueryRequest(concreteIndices).source(querySourceBuilder), new ActionListener() { @Override public void onResponse(DeleteByQueryResponse deleteByQueryResponse) { if (logger.isTraceEnabled()) { @@ -155,7 +155,7 @@ public void onResponse(DeleteByQueryResponse deleteByQueryResponse) { } } } - refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener() { + refreshAction.execute(Requests.refreshRequest(concreteIndices), new ActionListener() { @Override public void onResponse(RefreshResponse refreshResponse) { if (logger.isTraceEnabled()) { @@ -174,7 +174,7 @@ public void onFailure(Throwable e) { protected void removeMapping() { DeleteMappingClusterStateUpdateRequest clusterStateUpdateRequest = new DeleteMappingClusterStateUpdateRequest() - .indices(request.indices()).types(request.types()) + .indices(concreteIndices).types(request.types()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java index 7b19965a75539..dfffe76cdb746 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java @@ -60,7 +60,7 @@ protected void doExecute(GetFieldMappingsRequest request, final ActionListener indexResponses = new AtomicReferenceArray<>(concreteIndices.length); - if (concreteIndices == null || concreteIndices.length == 0) { + if (concreteIndices.length == 0) { listener.onResponse(new GetFieldMappingsResponse()); } else { boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java index 0a324320ec707..88a1fa57c2040 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java @@ -56,10 +56,10 @@ protected GetMappingsResponse newResponse() { } @Override - protected void doMasterOperation(final GetMappingsRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { + protected void doMasterOperation(final GetMappingsRequest request, String[] concreteIndices, final ClusterState state, final ActionListener listener) throws ElasticsearchException { logger.trace("serving getMapping request based on version {}", state.version()); ImmutableOpenMap> result = state.metaData().findMappings( - request.indices(), request.types() + concreteIndices, request.types() ); listener.onResponse(new GetMappingsResponse(result)); } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index e89e7de23c329..90963707d36a8 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -68,22 +68,17 @@ protected PutMappingResponse newResponse() { return new PutMappingResponse(); } - @Override - protected void doExecute(PutMappingRequest request, ActionListener listener) { - request.indices(clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices())); - super.doExecute(request, listener); - } - @Override protected ClusterBlockException checkBlock(PutMappingRequest request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices()); + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices())); } @Override protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { + final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices()); PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) - .indices(request.indices()).type(request.type()) + .indices(concreteIndices).type(request.type()) .source(request.source()).ignoreConflicts(request.ignoreConflicts()); metaDataMappingService.putMapping(updateRequest, new ActionListener() { @@ -95,7 +90,7 @@ public void onResponse(ClusterStateUpdateResponse response) { @Override public void onFailure(Throwable t) { - logger.debug("failed to put mappings on indices [{}], type [{}]", t, request.indices(), request.type()); + logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type()); listener.onFailure(t); } }); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java index 9b8b3bddedb35..09e4b5c2718b7 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -80,15 +80,15 @@ protected void doExecute(OpenIndexRequest request, ActionListener listener) throws ElasticsearchException { - request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices())); + final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest() .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) - .indices(request.indices()); + .indices(concreteIndices); indexStateService.openIndex(updateRequest, new ActionListener() { @@ -99,7 +99,7 @@ public void onResponse(ClusterStateUpdateResponse response) { @Override public void onFailure(Throwable t) { - logger.debug("failed to open indices [{}]", t, request.indices()); + logger.debug("failed to open indices [{}]", t, concreteIndices); listener.onFailure(t); } }); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/get/TransportGetSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/get/TransportGetSettingsAction.java index 55e4b8bd29b70..e3975003a001a 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/get/TransportGetSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/get/TransportGetSettingsAction.java @@ -73,9 +73,9 @@ protected GetSettingsResponse newResponse() { @Override protected void masterOperation(GetSettingsRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { - request.indices(state.metaData().concreteIndices(request.indicesOptions(), request.indices())); + String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); ImmutableOpenMap.Builder indexToSettingsBuilder = ImmutableOpenMap.builder(); - for (String concreteIndex : request.indices()) { + for (String concreteIndex : concreteIndices) { IndexMetaData indexMetaData = state.getMetaData().index(concreteIndex); if (indexMetaData == null) { continue; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java index bd052dbc5c89a..4ede67affc39f 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java @@ -66,16 +66,11 @@ protected UpdateSettingsResponse newResponse() { return new UpdateSettingsResponse(); } - @Override - protected void doExecute(UpdateSettingsRequest request, ActionListener listener) { - request.indices(clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices())); - super.doExecute(request, listener); - } - @Override protected void masterOperation(final UpdateSettingsRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { + final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices()); UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest() - .indices(request.indices()) + .indices(concreteIndices) .settings(request.settings()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()); @@ -88,7 +83,7 @@ public void onResponse(ClusterStateUpdateResponse response) { @Override public void onFailure(Throwable t) { - logger.debug("failed to update settings on indices [{}]", t, request.indices()); + logger.debug("failed to update settings on indices [{}]", t, concreteIndices); listener.onFailure(t); } }); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java index d9c9b31160e6d..72fe45ba91a4e 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java @@ -73,20 +73,14 @@ protected DeleteWarmerResponse newResponse() { return new DeleteWarmerResponse(); } - @Override - protected void doExecute(DeleteWarmerRequest request, ActionListener listener) { - // update to concrete indices - request.indices(clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices())); - super.doExecute(request, listener); - } - @Override protected ClusterBlockException checkBlock(DeleteWarmerRequest request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices()); + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices())); } @Override protected void masterOperation(final DeleteWarmerRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { + final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices()); clusterService.submitStateUpdateTask("delete_warmer [" + Arrays.toString(request.names()) + "]", new AckedClusterStateUpdateTask(request, listener) { @Override @@ -96,7 +90,7 @@ protected DeleteWarmerResponse newResponse(boolean acknowledged) { @Override public void onFailure(String source, Throwable t) { - logger.debug("failed to delete warmer [{}] on indices [{}]", t, Arrays.toString(request.names()), request.indices()); + logger.debug("failed to delete warmer [{}] on indices [{}]", t, Arrays.toString(request.names()), concreteIndices); super.onFailure(source, t); } @@ -105,7 +99,7 @@ public ClusterState execute(ClusterState currentState) { MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); boolean globalFoundAtLeastOne = false; - for (String index : request.indices()) { + for (String index : concreteIndices) { IndexMetaData indexMetaData = currentState.metaData().index(index); if (indexMetaData == null) { throw new IndexMissingException(new Index(index)); @@ -141,7 +135,7 @@ public ClusterState execute(ClusterState currentState) { } if (logger.isInfoEnabled()) { - for (String index : request.indices()) { + for (String index : concreteIndices) { IndexMetaData indexMetaData = currentState.metaData().index(index); if (indexMetaData == null) { throw new IndexMissingException(new Index(index)); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/get/TransportGetWarmersAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/get/TransportGetWarmersAction.java index 716f93b17048c..7a588950f3158 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/get/TransportGetWarmersAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/get/TransportGetWarmersAction.java @@ -57,9 +57,9 @@ protected GetWarmersResponse newResponse() { } @Override - protected void doMasterOperation(final GetWarmersRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { + protected void doMasterOperation(final GetWarmersRequest request, String[] concreteIndices, final ClusterState state, final ActionListener listener) throws ElasticsearchException { ImmutableOpenMap> result = state.metaData().findWarmers( - request.indices(), request.types(), request.warmers() + concreteIndices, request.types(), request.warmers() ); listener.onResponse(new GetWarmersResponse(result)); } diff --git a/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java b/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java index 2ea6dfaf1adc4..23fef24091862 100644 --- a/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java +++ b/src/main/java/org/elasticsearch/action/support/master/info/TransportClusterInfoAction.java @@ -45,9 +45,8 @@ protected String executor() { @Override protected final void masterOperation(final Request request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices()); - request.indices(concreteIndices); - doMasterOperation(request, state, listener); + doMasterOperation(request, concreteIndices, state, listener); } - protected abstract void doMasterOperation(Request request, ClusterState state, final ActionListener listener) throws ElasticsearchException; + protected abstract void doMasterOperation(Request request, String[] concreteIndices, ClusterState state, final ActionListener listener) throws ElasticsearchException; } diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index 3dca9200b896c..39e65910b4815 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -86,7 +86,7 @@ protected void doExecute(final Request request, final ActionListener l final long startTimeInMillis = System.currentTimeMillis(); Map> routingMap = resolveRouting(clusterState, request); - if (concreteIndices == null || concreteIndices.length == 0) { + if (concreteIndices.length == 0) { listener.onResponse(newResponseInstance(request, indexResponses)); } else { for (final String index : concreteIndices) { diff --git a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index a195d3155c3ce..89aa88c029de5 100644 --- a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -192,70 +192,75 @@ public void testBulkProcessorConcurrentRequestsNoNodeAvailableException() throws @Test public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception { createIndex("test-ro"); - assertAcked(client().admin().indices().prepareUpdateSettings("test-ro") - .setSettings(ImmutableSettings.builder().put("index.blocks.read_only", true))); - ensureGreen(); - - int bulkActions = randomIntBetween(10, 100); - int numDocs = randomIntBetween(bulkActions, bulkActions + 100); - int concurrentRequests = randomIntBetween(0, 10); - - int expectedBulkActions = numDocs / bulkActions; - - final CountDownLatch latch = new CountDownLatch(expectedBulkActions); - int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1; - final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions); - - int testDocs = 0; - int testReadOnlyDocs = 0; - MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); - BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); + try { + assertAcked(client().admin().indices().prepareUpdateSettings("test-ro") + .setSettings(ImmutableSettings.builder().put("index.blocks.read_only", true))); + ensureGreen(); + + int bulkActions = randomIntBetween(10, 100); + int numDocs = randomIntBetween(bulkActions, bulkActions + 100); + int concurrentRequests = randomIntBetween(0, 10); + + int expectedBulkActions = numDocs / bulkActions; + + final CountDownLatch latch = new CountDownLatch(expectedBulkActions); + int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1; + final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions); + + int testDocs = 0; + int testReadOnlyDocs = 0; + MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); + + try (BulkProcessor processor = BulkProcessor.builder(client(), listener) + .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) + //set interval and size to high values + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + + for (int i = 1; i <= numDocs; i++) { + if (randomBoolean()) { + testDocs++; + processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)).source("field", "value")); + multiGetRequestBuilder.add("test", "test", Integer.toString(testDocs)); + } else { + testReadOnlyDocs++; + processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)).source("field", "value")); + } + } + } - try (BulkProcessor processor = BulkProcessor.builder(client(), listener) - .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) - //set interval and size to high values - .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + closeLatch.await(); - for (int i = 1; i <= numDocs; i++) { - if (randomBoolean()) { - testDocs++; - processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)).source("field", "value")); - multiGetRequestBuilder.add("test", "test", Integer.toString(testDocs)); + assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions)); + assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs)); + + Set ids = new HashSet<>(); + Set readOnlyIds = new HashSet<>(); + for (BulkItemResponse bulkItemResponse : listener.bulkItems) { + assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro"))); + assertThat(bulkItemResponse.getType(), equalTo("test")); + if (bulkItemResponse.getIndex().equals("test")) { + assertThat(bulkItemResponse.isFailed(), equalTo(false)); + //with concurrent requests > 1 we can't rely on the order of the bulk requests + assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs))); + //we do want to check that we don't get duplicate ids back + assertThat(ids.add(bulkItemResponse.getId()), equalTo(true)); } else { - testReadOnlyDocs++; - processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)).source("field", "value")); + assertThat(bulkItemResponse.isFailed(), equalTo(true)); + //with concurrent requests > 1 we can't rely on the order of the bulk requests + assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs))); + //we do want to check that we don't get duplicate ids back + assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true)); } } - } - closeLatch.await(); - - assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions)); - assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions)); - assertThat(listener.bulkFailures.size(), equalTo(0)); - assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs)); - - Set ids = new HashSet<>(); - Set readOnlyIds = new HashSet<>(); - for (BulkItemResponse bulkItemResponse : listener.bulkItems) { - assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro"))); - assertThat(bulkItemResponse.getType(), equalTo("test")); - if (bulkItemResponse.getIndex().equals("test")) { - assertThat(bulkItemResponse.isFailed(), equalTo(false)); - //with concurrent requests > 1 we can't rely on the order of the bulk requests - assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs))); - //we do want to check that we don't get duplicate ids back - assertThat(ids.add(bulkItemResponse.getId()), equalTo(true)); - } else { - assertThat(bulkItemResponse.isFailed(), equalTo(true)); - //with concurrent requests > 1 we can't rely on the order of the bulk requests - assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs))); - //we do want to check that we don't get duplicate ids back - assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true)); - } + assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs); + } finally { + assertAcked(client().admin().indices().prepareUpdateSettings("test-ro") + .setSettings(ImmutableSettings.builder().put("index.blocks.read_only", false))); } - - assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs); } private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) { diff --git a/src/test/java/org/elasticsearch/cluster/BlockClusterStatsTests.java b/src/test/java/org/elasticsearch/cluster/BlockClusterStatsTests.java index 6962e7fd098c9..ab0e2af92d039 100644 --- a/src/test/java/org/elasticsearch/cluster/BlockClusterStatsTests.java +++ b/src/test/java/org/elasticsearch/cluster/BlockClusterStatsTests.java @@ -20,40 +20,105 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.junit.Test; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.*; /** * Scoped as test, because the if the test with cluster read only block fails, all other tests fail as well, as this is not cleaned up properly */ -@ClusterScope(scope= ElasticsearchIntegrationTest.Scope.TEST) +@ClusterScope(scope= Scope.TEST) public class BlockClusterStatsTests extends ElasticsearchIntegrationTest { @Test public void testBlocks() throws Exception { - createIndex("foo"); - ClusterUpdateSettingsResponse updateSettingsResponse = client().admin().cluster().prepareUpdateSettings().setTransientSettings( - ImmutableSettings.settingsBuilder().put("cluster.blocks.read_only", true).build()).get(); - assertThat(updateSettingsResponse.isAcknowledged(), is(true)); - UpdateSettingsResponse indexSettingsResponse = client().admin().indices().prepareUpdateSettings("foo").setSettings( - ImmutableSettings.settingsBuilder().put("index.blocks.read_only", true)).get(); - assertThat(indexSettingsResponse.isAcknowledged(), is(true)); - - ClusterStateResponse clusterStateResponseUnfiltered = client().admin().cluster().prepareState().clear().setBlocks(true).get(); - assertThat(clusterStateResponseUnfiltered.getState().blocks().global(), hasSize(1)); - assertThat(clusterStateResponseUnfiltered.getState().blocks().indices().size(), is(1)); - - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().get(); - assertThat(clusterStateResponse.getState().blocks().global(), hasSize(0)); - assertThat(clusterStateResponse.getState().blocks().indices().size(), is(0)); - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( - ImmutableSettings.settingsBuilder().put("cluster.blocks.read_only", false).build()).get()); + assertAcked(prepareCreate("foo").addAlias(new Alias("foo-alias"))); + try { + ClusterUpdateSettingsResponse updateSettingsResponse = client().admin().cluster().prepareUpdateSettings().setTransientSettings( + ImmutableSettings.settingsBuilder().put("cluster.blocks.read_only", true).build()).get(); + assertThat(updateSettingsResponse.isAcknowledged(), is(true)); + assertAcked(client().admin().indices().prepareUpdateSettings("foo").setSettings( + ImmutableSettings.settingsBuilder().put("index.blocks.read_only", true))); + + ClusterStateResponse clusterStateResponseUnfiltered = client().admin().cluster().prepareState().clear().setBlocks(true).get(); + assertThat(clusterStateResponseUnfiltered.getState().blocks().global(), hasSize(1)); + assertThat(clusterStateResponseUnfiltered.getState().blocks().indices().size(), is(1)); + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().get(); + assertThat(clusterStateResponse.getState().blocks().global(), hasSize(0)); + assertThat(clusterStateResponse.getState().blocks().indices().size(), is(0)); + + try { + client().admin().indices().prepareClose("foo-alias").get(); + fail("close index should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + try { + client().admin().indices().prepareDeleteMapping("foo-alias").setType("test").get(); + fail("delete mapping should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + try { + client().admin().indices().preparePutMapping("foo-alias").setType("type1").setSource("field1", "type=string").get(); + fail("put mapping should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + try { + client().admin().indices().preparePutWarmer("foo-alias").setSearchRequest(Requests.searchRequest("foo-alias")).get(); + fail("put warmer should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + try { + client().admin().indices().prepareDeleteWarmer().setIndices("foo-alias").setNames("warmer1").get(); + fail("delete warmer should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + try { + client().admin().indices().prepareTypesExists("foo-alias").setTypes("test").get(); + fail("types exists should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + try { + client().admin().indices().prepareExists("foo-alias").get(); + fail("indices exists should have failed"); + } catch(ClusterBlockException e) { + assertClusterAndIndexBlocks(e); + } + + } finally { + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( + ImmutableSettings.settingsBuilder().put("cluster.blocks.read_only", false).build()).get()); + assertAcked(client().admin().indices().prepareUpdateSettings("foo").setSettings( + ImmutableSettings.settingsBuilder().put("index.blocks.read_only", false))); + } + } + + private void assertClusterAndIndexBlocks(ClusterBlockException e) { + assertThat(e.blocks().size(), equalTo(2)); + for (ClusterBlock clusterBlock : e.blocks()) { + assertThat(clusterBlock.status(), equalTo(RestStatus.FORBIDDEN)); + assertThat(clusterBlock.id(), either(equalTo(5)).or(equalTo(6))); + assertThat(clusterBlock.description(), either(containsString("cluster read-only (api)")).or(containsString("index read-only (api)"))); + } } }