Skip to content

Commit

Permalink
Group field-caps node requests by index mapping hash
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Mar 17, 2022
1 parent 3d7b277 commit d0295d6
Show file tree
Hide file tree
Showing 3 changed files with 494 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.IntStream;

import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.array;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -470,10 +473,18 @@ public void testNoActiveCopy() throws Exception {
.put("index.routing.allocation.require._id", "unknown")
).setWaitForActiveShards(ActiveShardCount.NONE).setMapping("timestamp", "type=date", "field1", "type=keyword")
);
assertAcked(
prepareCreate("log-index-another").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.require._id", "unknown")
).setWaitForActiveShards(ActiveShardCount.NONE).setMapping("timestamp", "type=date", "another_field", "type=keyword")
);
{
final ElasticsearchException ex = expectThrows(
ElasticsearchException.class,
() -> client().prepareFieldCaps("log-index-*").setFields("*").get()
() -> client().prepareFieldCaps("log-index-in*").setFields("*").get()
);
assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
}
Expand All @@ -486,15 +497,48 @@ public void testNoActiveCopy() throws Exception {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
// log-index-inactive can be resolved by either log-index-1
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2", "log-index-inactive"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("long"));

assertThat(response.getFailures(), hasSize(1));
final FieldCapabilitiesFailure failure = response.getFailures().get(0);
assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive"));
assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-another"));
assertThat(failure.getException().getMessage(), equalTo("index [log-index-another] has no active shard copy"));
}
}

public void testSingleNodeRequest() {
String[] indices = IntStream.range(0, randomIntBetween(1, 5)).mapToObj(n -> "event_index_" + n).toArray(String[]::new);
for (String index : indices) {
assertAcked(prepareCreate(index).setMapping("timestamp", "type=date", "message", "type=text"));
}
FieldCapabilitiesRequest fieldCapRequest = new FieldCapabilitiesRequest();
fieldCapRequest.indices("event_index_*");
fieldCapRequest.fields("*");

AtomicInteger receivedRequests = new AtomicInteger();
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.addRequestHandlingBehavior(
TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
(handler, request, channel, task) -> {
receivedRequests.incrementAndGet();
handler.messageReceived(request, channel, task);
}
);
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, fieldCapRequest).actionGet();
assertThat(response.getIndices(), equalTo(indices));
assertThat(response.getField("message"), aMapWithSize(1));
assertThat(response.getField("message"), hasKey("text"));
assertThat(response.getFailures(), empty());
assertThat(receivedRequests.get(), equalTo(1));
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.clearAllRules();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
Expand All @@ -34,11 +35,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -56,15 +59,15 @@ final class RequestDispatcher {
private final OriginalIndices originalIndices;
private final long nowInMillis;

private final boolean hasFilter;
private final boolean withIndexFilter;
private final Executor executor;
private final Consumer<FieldCapabilitiesIndexResponse> onIndexResponse;
private final BiConsumer<String, Exception> onIndexFailure;
private final Runnable onComplete;

private final AtomicInteger pendingRequests = new AtomicInteger();
private final AtomicInteger executionRound = new AtomicInteger();
private final Map<String, IndexSelector> indexSelectors;
private final Map<String, Group> groups = ConcurrentCollections.newConcurrentMap();

RequestDispatcher(
ClusterService clusterService,
Expand All @@ -85,20 +88,25 @@ final class RequestDispatcher {
this.originalIndices = originalIndices;
this.nowInMillis = nowInMillis;
this.clusterState = clusterService.state();
this.hasFilter = fieldCapsRequest.indexFilter() != null && fieldCapsRequest.indexFilter() instanceof MatchAllQueryBuilder == false;
this.withIndexFilter = fieldCapsRequest.indexFilter() != null
&& fieldCapsRequest.indexFilter() instanceof MatchAllQueryBuilder == false;
this.executor = executor;
this.onIndexResponse = onIndexResponse;
this.onIndexFailure = onIndexFailure;
this.onComplete = new RunOnce(onComplete);
this.indexSelectors = ConcurrentCollections.newConcurrentMap();
for (String index : indices) {
final GroupShardsIterator<ShardIterator> shardIts = clusterService.operationRouting()
.searchShards(clusterState, new String[] { index }, null, null, null, null);
final IndexSelector indexResult = new IndexSelector(shardIts);
if (indexResult.nodeToShards.isEmpty()) {
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
final List<Group> groupedIndices = groupIndicesByMappingHash(clusterService, clusterState, withIndexFilter, indices);
for (Group group : groupedIndices) {
if (group.nodeToShards.isEmpty()) {
for (String index : group.indices) {
onIndexFailure.accept(
index,
new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy")
);
}
} else {
this.indexSelectors.put(index, indexResult);
for (String index : group.indices) {
this.groups.put(index, group);
}
}
}
}
Expand All @@ -108,11 +116,10 @@ void execute() {
@Override
public void onFailure(Exception e) {
// If we get rejected, mark pending indices as failed and complete
final List<String> failedIndices = new ArrayList<>(indexSelectors.keySet());
for (String failedIndex : failedIndices) {
final IndexSelector removed = indexSelectors.remove(failedIndex);
assert removed != null;
onIndexFailure.accept(failedIndex, e);
for (Group g : groups.values()) {
if (g.completed.compareAndSet(false, true)) {
g.indices.forEach(index -> onIndexFailure.accept(index, e));
}
}
onComplete.run();
}
Expand All @@ -127,31 +134,27 @@ protected void doRun() {
private void innerExecute() {
final Map<String, List<ShardId>> nodeToSelectedShards = new HashMap<>();
assert pendingRequests.get() == 0 : "pending requests = " + pendingRequests;
final List<String> failedIndices = new ArrayList<>();
for (Map.Entry<String, IndexSelector> e : indexSelectors.entrySet()) {
final String index = e.getKey();
final IndexSelector indexSelector = e.getValue();
final List<ShardRouting> selectedShards = indexSelector.nextTarget(hasFilter);
if (selectedShards.isEmpty()) {
failedIndices.add(index);
} else {
pendingRequests.addAndGet(selectedShards.size());
for (ShardRouting shard : selectedShards) {
nodeToSelectedShards.computeIfAbsent(shard.currentNodeId(), n -> new ArrayList<>()).add(shard.shardId());
groups.values().removeIf(g -> g.completed.get());
final Set<Group> visited = Collections.newSetFromMap(new IdentityHashMap<>());
for (Group group : groups.values()) {
if (visited.add(group)) {
final List<ShardRouting> selectedShards = group.nextTarget(withIndexFilter);
if (selectedShards.isEmpty()) {
if (group.completed.compareAndSet(false, true)) {
group.getFailures().forEach(onIndexFailure);
}
} else {
for (ShardRouting shard : selectedShards) {
nodeToSelectedShards.computeIfAbsent(shard.currentNodeId(), n -> new ArrayList<>()).add(shard.shardId());
}
}
}
}
for (String failedIndex : failedIndices) {
final IndexSelector indexSelector = indexSelectors.remove(failedIndex);
assert indexSelector != null;
final Exception failure = indexSelector.getFailure();
if (failure != null) {
onIndexFailure.accept(failedIndex, failure);
}
}
if (nodeToSelectedShards.isEmpty()) {
assert groups.values().stream().allMatch(g -> g.completed.get()) : "Some groups aren't completed yet";
onComplete.run();
} else {
pendingRequests.addAndGet(nodeToSelectedShards.size());
for (Map.Entry<String, List<ShardId>> e : nodeToSelectedShards.entrySet()) {
sendRequestToNode(e.getKey(), e.getValue());
}
Expand All @@ -168,7 +171,7 @@ private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
assert node != null;
LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds);
final ActionListener<FieldCapabilitiesNodeResponse> listener = ActionListener.wrap(
r -> onRequestResponse(shardIds, r),
this::onRequestResponse,
failure -> onRequestFailure(shardIds, failure)
);
final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest(
Expand All @@ -191,67 +194,111 @@ private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
);
}

private void afterRequestsCompleted(int numRequests) {
if (pendingRequests.addAndGet(-numRequests) == 0) {
private void afterRequestsCompleted() {
if (pendingRequests.decrementAndGet() == 0) {
// Here we only retry after all pending requests have responded to avoid exploding network requests
// when the cluster is unstable or overloaded as an eager retry approach can add more load to the cluster.
executionRound.incrementAndGet();
execute();
}
}

private void onRequestResponse(List<ShardId> shardIds, FieldCapabilitiesNodeResponse nodeResponse) {
private void onRequestResponse(FieldCapabilitiesNodeResponse nodeResponse) {
for (FieldCapabilitiesIndexResponse indexResponse : nodeResponse.getIndexResponses()) {
if (indexResponse.canMatch()) {
if (indexSelectors.remove(indexResponse.getIndexName()) != null) {
onIndexResponse.accept(indexResponse);
final Group group = groups.remove(indexResponse.getIndexName());
if (group == null) {
continue;
}
if (group.completed.compareAndSet(false, true)) {
final String mappingHash = group.mappingHash != null ? group.mappingHash : indexResponse.getIndexMappingHash();
for (String index : group.indices) {
onIndexResponse.accept(new FieldCapabilitiesIndexResponse(index, mappingHash, indexResponse.get(), true));
}
}
}
}
for (ShardId unmatchedShardId : nodeResponse.getUnmatchedShardIds()) {
final IndexSelector indexSelector = indexSelectors.get(unmatchedShardId.getIndexName());
if (indexSelector != null) {
indexSelector.addUnmatchedShardId(unmatchedShardId);
final Group group = groups.get(unmatchedShardId.getIndexName());
if (group != null) {
group.addUnmatchedShardId(unmatchedShardId);
}
}
for (Map.Entry<ShardId, Exception> e : nodeResponse.getFailures().entrySet()) {
final IndexSelector indexSelector = indexSelectors.get(e.getKey().getIndexName());
if (indexSelector != null) {
indexSelector.setFailure(e.getKey(), e.getValue());
final Group group = groups.get(e.getKey().getIndexName());
if (group != null) {
group.setFailure(e.getKey(), e.getValue());
}
}
afterRequestsCompleted(shardIds.size());
afterRequestsCompleted();
}

private void onRequestFailure(List<ShardId> shardIds, Exception e) {
for (ShardId shardId : shardIds) {
final IndexSelector indexSelector = indexSelectors.get(shardId.getIndexName());
if (indexSelector != null) {
indexSelector.setFailure(shardId, e);
final Group group = groups.get(shardId.getIndexName());
if (group != null) {
group.setFailure(shardId, e);
}
}
afterRequestsCompleted(shardIds.size());
afterRequestsCompleted();
}

private List<Group> groupIndicesByMappingHash(
ClusterService clusterService,
ClusterState clusterState,
boolean withIndexFilter,
String[] indices
) {
final Map<String, List<String>> withMappingHashes = new HashMap<>();
final List<Group> groups = new ArrayList<>();
for (String index : indices) {
final IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (withIndexFilter == false
&& indexMetadata != null
&& indexMetadata.mapping() != null
&& indexMetadata.mapping().getSha256() != null) {
withMappingHashes.computeIfAbsent(indexMetadata.mapping().getSha256(), k -> new ArrayList<>()).add(index);
} else {
final GroupShardsIterator<ShardIterator> shardIts = clusterService.operationRouting()
.searchShards(clusterState, new String[] { index }, null, null, null, null);
groups.add(new Group(List.of(index), null, shardIts));
}
}
for (Map.Entry<String, List<String>> e : withMappingHashes.entrySet()) {
final GroupShardsIterator<ShardIterator> shardIts = clusterService.operationRouting()
.searchShards(clusterState, e.getValue().toArray(String[]::new), null, null, null, null);
groups.add(new Group(e.getValue(), e.getKey(), shardIts));
}
return groups;
}

private static class IndexSelector {
/**
* A group of indices that have the same mapping hash
*/
private static class Group {
private final List<String> indices;
private final String mappingHash;
private final Map<String, List<ShardRouting>> nodeToShards = new HashMap<>();
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
private final Map<ShardId, Exception> failures = new HashMap<>();
private final AtomicBoolean completed = new AtomicBoolean();

IndexSelector(GroupShardsIterator<ShardIterator> shardIts) {
Group(List<String> indices, String mappingHash, GroupShardsIterator<ShardIterator> shardIts) {
this.indices = indices;
this.mappingHash = mappingHash;
for (ShardIterator shardIt : shardIts) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
}
}
}

synchronized Exception getFailure() {
Exception first = null;
for (Exception e : failures.values()) {
first = ExceptionsHelper.useOrSuppress(first, e);
synchronized Map<String, Exception> getFailures() {
final Map<String, Exception> perIndex = new HashMap<>();
for (Map.Entry<ShardId, Exception> e : failures.entrySet()) {
perIndex.compute(e.getKey().getIndexName(), (unused, curr) -> ExceptionsHelper.useOrSuppress(curr, e.getValue()));
}
return first;
return perIndex;
}

synchronized void setFailure(ShardId shardId, Exception failure) {
Expand Down
Loading

0 comments on commit d0295d6

Please sign in to comment.