Skip to content

Commit

Permalink
RoutingMissingException in more like this (elastic#33974)
Browse files Browse the repository at this point in the history
More like this query allows to provide identifiers of documents to be retrieved as like/unlike items. 
It can happen that at retrieval time an error is thrown, for instance caused by missing routing value when `_routing` is set required in the mapping. 
Instead of ignoring such error and returning no documents for the query, the error should be re-thrown and returned to users. As part of this 
change also mget and mtermvectors are unified in the way they throw such exception like it happens in other places, so that a `RoutingMissingException` is raised.

Closes elastic#29678
  • Loading branch information
cbismuth authored and javanna committed Nov 26, 2018
1 parent 9bdbba2 commit 04ebc63
Show file tree
Hide file tree
Showing 6 changed files with 529 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -69,8 +70,8 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index()));
if ((item.routing() == null) && (clusterState.getMetaData().routingRequired(concreteSingleIndex, item.type()))) {
String message = "routing is required for [" + concreteSingleIndex + "]/[" + item.type() + "]/[" + item.id() + "]";
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(), new IllegalArgumentException(message)));
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(),
new RoutingMissingException(concreteSingleIndex, item.type(), item.id())));
continue;
}
} catch (Exception e) {
Expand All @@ -95,6 +96,12 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL
listener.onResponse(new MultiGetResponse(responses.toArray(new MultiGetItemResponse[responses.length()])));
}

executeShardAction(listener, responses, shardRequests);
}

protected void executeShardAction(ActionListener<MultiGetResponse> listener,
AtomicArray<MultiGetItemResponse> responses,
Map<ShardId, MultiGetShardRequest> shardRequests) {
final AtomicInteger counter = new AtomicInteger(shardRequests.size());

for (final MultiGetShardRequest shardRequest : shardRequests.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -67,17 +68,17 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
termVectorsRequest.routing(clusterState.metaData().resolveIndexRouting(termVectorsRequest.routing(),
termVectorsRequest.index()));
if (!clusterState.metaData().hasConcreteIndex(termVectorsRequest.index())) {
responses.set(i, new MultiTermVectorsItemResponse(null, new MultiTermVectorsResponse.Failure(termVectorsRequest.index(),
termVectorsRequest.type(), termVectorsRequest.id(), new IndexNotFoundException(termVectorsRequest.index()))));
responses.set(i, new MultiTermVectorsItemResponse(null,
new MultiTermVectorsResponse.Failure(termVectorsRequest.index(), termVectorsRequest.type(), termVectorsRequest.id(),
new IndexNotFoundException(termVectorsRequest.index()))));
continue;
}
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
if (termVectorsRequest.routing() == null &&
clusterState.getMetaData().routingRequired(concreteSingleIndex, termVectorsRequest.type())) {
clusterState.getMetaData().routingRequired(concreteSingleIndex, termVectorsRequest.type())) {
responses.set(i, new MultiTermVectorsItemResponse(null,
new MultiTermVectorsResponse.Failure(concreteSingleIndex, termVectorsRequest.type(), termVectorsRequest.id(),
new IllegalArgumentException("routing is required for [" + concreteSingleIndex + "]/[" +
termVectorsRequest.type() + "]/[" + termVectorsRequest.id() + "]"))));
new RoutingMissingException(concreteSingleIndex, termVectorsRequest.type(), termVectorsRequest.id()))));
continue;
}
ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
Expand All @@ -96,7 +97,14 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
listener.onResponse(new MultiTermVectorsResponse(responses.toArray(new MultiTermVectorsItemResponse[responses.length()])));
}

executeShardAction(listener, responses, shardRequests);
}

protected void executeShardAction(ActionListener<MultiTermVectorsResponse> listener,
AtomicArray<MultiTermVectorsItemResponse> responses,
Map<ShardId, MultiTermVectorsShardRequest> shardRequests) {
final AtomicInteger counter = new AtomicInteger(shardRequests.size());

for (final MultiTermVectorsShardRequest shardRequest : shardRequests.values()) {
shardAction.execute(shardRequest, new ActionListener<MultiTermVectorsShardResponse>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.termvectors.MultiTermVectorsItemResponse;
import org.elasticsearch.action.termvectors.MultiTermVectorsRequest;
import org.elasticsearch.action.termvectors.MultiTermVectorsResponse;
Expand Down Expand Up @@ -1110,6 +1111,7 @@ private static Fields[] getFieldsFor(MultiTermVectorsResponse responses) throws

for (MultiTermVectorsItemResponse response : responses) {
if (response.isFailed()) {
checkRoutingMissingException(response);
continue;
}
TermVectorsResponse getResponse = response.getResponse();
Expand All @@ -1121,6 +1123,13 @@ private static Fields[] getFieldsFor(MultiTermVectorsResponse responses) throws
return likeFields.toArray(Fields.EMPTY_ARRAY);
}

private static void checkRoutingMissingException(MultiTermVectorsItemResponse response) {
Throwable cause = ExceptionsHelper.unwrap(response.getFailure().getCause(), RoutingMissingException.class);
if (cause != null) {
throw ((RoutingMissingException) cause);
}
}

private static void handleExclude(BooleanQuery.Builder boolQuery, Item[] likeItems, QueryShardContext context) {
MappedFieldType idField = context.fieldMapper(IdFieldMapper.NAME);
if (idField == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.get;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.common.UUIDs.randomBase64UUID;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportMultiGetActionTests extends ESTestCase {

private static ThreadPool threadPool;
private static TransportService transportService;
private static ClusterService clusterService;
private static TransportMultiGetAction transportAction;
private static TransportShardMultiGetAction shardAction;

@BeforeClass
public static void beforeClass() throws Exception {
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());

transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.builder().put("node.name", "node1").build(),
boundAddress.publishAddress(), randomBase64UUID()), null, emptySet()) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};

final Index index1 = new Index("index1", randomBase64UUID());
final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
.metaData(new MetaData.Builder()
.put(new IndexMetaData.Builder(index1.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(IndexMetaData.SETTING_INDEX_UUID, index1.getUUID()))
.putMapping("type1",
XContentHelper.convertToJson(BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject()
.startObject("type1")
.startObject("_routing")
.field("required", false)
.endObject()
.endObject()
.endObject()), true, XContentType.JSON))
.putMapping("type2",
XContentHelper.convertToJson(BytesReference.bytes(XContentFactory.jsonBuilder()
.startObject()
.startObject("type2")
.startObject("_routing")
.field("required", true)
.endObject()
.endObject()
.endObject()), true, XContentType.JSON)))).build();

final ShardIterator shardIterator = mock(ShardIterator.class);
when(shardIterator.shardId()).thenReturn(new ShardId(index1, randomInt()));

final OperationRouting operationRouting = mock(OperationRouting.class);
when(operationRouting.getShards(eq(clusterState), eq(index1.getName()), anyString(), anyString(), anyString()))
.thenReturn(shardIterator);
when(operationRouting.shardId(eq(clusterState), eq(index1.getName()), anyString(), anyString()))
.thenReturn(new ShardId(index1, randomInt()));

clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(transportService.getLocalNode());
when(clusterService.state()).thenReturn(clusterState);
when(clusterService.operationRouting()).thenReturn(operationRouting);

shardAction = new TransportShardMultiGetAction(clusterService, transportService, mock(IndicesService.class), threadPool,
new ActionFilters(emptySet()), new Resolver()) {
@Override
protected void doExecute(Task task, MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {
}
};
}

@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
transportService = null;
clusterService = null;
transportAction = null;
shardAction = null;
}

public void testTransportMultiGetAction() {
final Task task = createTask();
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
request.add(new MultiGetRequest.Item("index1", "type1", "1"));
request.add(new MultiGetRequest.Item("index1", "type1", "2"));

final AtomicBoolean shardActionInvoked = new AtomicBoolean(false);
transportAction = new TransportMultiGetAction(transportService, clusterService, shardAction,
new ActionFilters(emptySet()), new Resolver()) {
@Override
protected void executeShardAction(final ActionListener<MultiGetResponse> listener,
final AtomicArray<MultiGetItemResponse> responses,
final Map<ShardId, MultiGetShardRequest> shardRequests) {
shardActionInvoked.set(true);
assertEquals(2, responses.length());
assertNull(responses.get(0));
assertNull(responses.get(1));
}
};

transportAction.execute(task, request.request(), new ActionListenerAdapter());
assertTrue(shardActionInvoked.get());
}

public void testTransportMultiGetAction_withMissingRouting() {
final Task task = createTask();
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
request.add(new MultiGetRequest.Item("index1", "type2", "1").routing("1"));
request.add(new MultiGetRequest.Item("index1", "type2", "2"));

final AtomicBoolean shardActionInvoked = new AtomicBoolean(false);
transportAction = new TransportMultiGetAction(transportService, clusterService, shardAction,
new ActionFilters(emptySet()), new Resolver()) {
@Override
protected void executeShardAction(final ActionListener<MultiGetResponse> listener,
final AtomicArray<MultiGetItemResponse> responses,
final Map<ShardId, MultiGetShardRequest> shardRequests) {
shardActionInvoked.set(true);
assertEquals(2, responses.length());
assertNull(responses.get(0));
assertThat(responses.get(1).getFailure().getFailure(), instanceOf(RoutingMissingException.class));
assertThat(responses.get(1).getFailure().getFailure().getMessage(),
equalTo("routing is required for [index1]/[type2]/[2]"));
}
};

transportAction.execute(task, request.request(), new ActionListenerAdapter());
assertTrue(shardActionInvoked.get());

}

private static Task createTask() {
return new Task(randomLong(), "transport", MultiGetAction.NAME, "description",
new TaskId(randomLong() + ":" + randomLong()), emptyMap());
}

static class Resolver extends IndexNameExpressionResolver {

@Override
public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
return new Index("index1", randomBase64UUID());
}
}

static class ActionListenerAdapter implements ActionListener<MultiGetResponse> {

@Override
public void onResponse(MultiGetResponse response) {
}

@Override
public void onFailure(Exception e) {
}
}
}
Loading

0 comments on commit 04ebc63

Please sign in to comment.