Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [Batch Mode] Allocation explain API is stuck in AWAITING_INFO even though deciders are returning NO #14903

Closed
rahulkarajgikar opened this issue Jul 23, 2024 · 0 comments · Fixed by #14972
Labels
bug Something isn't working Cluster Manager

Comments

@rahulkarajgikar
Copy link
Contributor

rahulkarajgikar commented Jul 23, 2024

Describe the bug

  • Allocation explain API is stuck in AWAITING_INFO for some shards even when the deciders are returning NO
  • In this case, allocation explain should return NO decision because we already know deciders are returning NO. Instead , the API ends up calling AsyncShardFetch.asyncFetch() which it should not do.
  • We will see that this also impacts manual reroute flows, leading to an unnecessary AsyncShardFetch.asyncFetch() call.

Both these issues are happening because ShardsBatchGatewayAllocator.InternalReplicaBatchShardAllocator.hasInitiatedFetching() function is not correctly implemented, and always returns true.

Why does hasInitiatedFetching always return true?

protected boolean hasInitiatedFetching(ShardRouting shard) {
String batchId = getBatchId(shard, shard.primary());
return batchId != null;
}

hasInitiatedFetching() function checks if a batchId exists for a shard.

However, this will always be true, because the batchId is defined for the shard before this function is ever called.

Flow for allocation explain API

Here we will explain the flow, show why hasInitiatedFetching() always returns true, and how that causes allocation explain API to return AWAITING_INFO instead of NO.

Assume that we have a replica shard for which deciders are returning a NO decision [could be due to low disk, too many replicas], and allocation explain API is called for this shard.

Begin at this code path:

public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
assert unassignedShard.unassigned();
assert routingAllocation.debugDecision();
if (getBatchId(unassignedShard, unassignedShard.primary()) == null) {
createAndUpdateBatches(routingAllocation, unassignedShard.primary());
}
assert getBatchId(unassignedShard, unassignedShard.primary()) != null;
if (unassignedShard.primary()) {
assert primaryShardBatchAllocator != null;
return primaryShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else {
assert replicaShardBatchAllocator != null;
return replicaShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
}
}

Here we see createAndUpdateBatches() is called first

protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boolean primary) {

In this function, we assign a batchId for all unassigned shards, and create a new one if required.
At this point, every unassigned shard would have a valid batch id.

Then we call makeAllocationDecision()

return replicaShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);

public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> fetchDataResultSupplier = () -> {
return convertToNodeStoreFilesMetadataMap(
unassignedShard,
fetchData(List.of(unassignedShard), Collections.emptyList(), allocation)
);
};
return getUnassignedShardAllocationDecision(unassignedShard, allocation, fetchDataResultSupplier);
}

This creates a fetchData supplier, and passes it to getUnassignedShardAllocationDecision()

private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
ShardRouting shardRouting,
RoutingAllocation allocation,
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
) {
if (!isResponsibleFor(shardRouting)) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
final boolean explain = allocation.debugDecision();
Decision allocationDecision = result.v1();
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}
if (nodeStoreFileMetaDataMapSupplier != null) {
Map<DiscoveryNode, StoreFilesMetadata> discoveryNodeStoreFilesMetadataMap = nodeStoreFileMetaDataMapSupplier.get();
return getAllocationDecision(shardRouting, allocation, discoveryNodeStoreFilesMetadataMap, result, logger);
}
return null;
}

Line 183 checks hasInitiatedFetching(), since explain=true and deciders are returning NO:

if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {

protected boolean hasInitiatedFetching(ShardRouting shard) {
String batchId = getBatchId(shard, shard.primary());
return batchId != null;
}

hasInitiatedFetching() checks if a shard has a valid batch ID, which will always be true at this point, as we have previously established.

So we go into this block and call fetchData():

if (nodeStoreFileMetaDataMapSupplier != null) {
Map<DiscoveryNode, StoreFilesMetadata> discoveryNodeStoreFilesMetadataMap = nodeStoreFileMetaDataMapSupplier.get();
return getAllocationDecision(shardRouting, allocation, discoveryNodeStoreFilesMetadataMap, result, logger);
}

asyncFetch(discoNodesToFetch, fetchingRound);

void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
logger.trace("{} fetching [{}] from {}", reroutingKey, type, nodes);
action.list(shardAttributesMap, nodes, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
}
@Override
public void onFailure(Exception e) {
List<FailedNodeException> failures = new ArrayList<>(nodes.length);
for (final DiscoveryNode node : nodes) {
failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e));
}
processAsyncFetch(null, failures, fetchingRound);
}
});
}

AsyncShardFetch.fetchData() triggers an asyncFetch() and returns the fetcher object. At this point, the fetching would have just started in async manner. So we don't have any data from the nodes yet, so the nodeShardStores is null.

return getAllocationDecision(shardRouting, allocation, discoveryNodeStoreFilesMetadataMap, result, logger);

protected AllocateUnassignedDecision getAllocationDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores,
Tuple<Decision, Map<String, NodeAllocationResult>> allocationDecision,
Logger logger
) {
if (nodeShardStores == null) {
// node shard stores is null when we don't have data yet and still fetching the shard stores
logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard);
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (allocation.debugDecision()) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}

Since nodeShardStores is null, we return FETCHING_SHARD_DATA as the final result, which internally maps to AWAITING_INFO. This is why we see the AWAITING_INFO result instead of a NO result in allocation explain API the first time.

Eventually, when all the nodes respond and the async fetch finally completes (could be seconds or minutes later), onResponse() of asyncFetch() is called which calls processAsyncFetch()

Here we trigger a new reroute, this time with explain=false:

protected synchronized void processAsyncFetch(List<T> responses, List<FailedNodeException> failures, long fetchingRound) {
if (closed) {
// we are closed, no need to process this async fetch at all
logger.trace("{} ignoring fetched [{}] results, already closed", reroutingKey, type);
return;
}
logger.trace("{} processing fetched [{}] results", reroutingKey, type);
if (responses != null) {
cache.processResponses(responses, fetchingRound);
}
if (failures != null) {
cache.processFailures(failures, fetchingRound);
}
reroute(reroutingKey, "post_response");
}

This eventually calls allocateUnassignedBatch() - decides which shards are eligible or ineligible and calls fetchData accordingly.

public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
logger.trace("Starting shard allocation execution for unassigned replica shards: {}", shardRoutings.size());
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
Map<ShardRouting, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
for (ShardRouting shard : shardRoutings) {
AllocateUnassignedDecision shardDecisionWithoutFetch = getUnassignedShardAllocationDecision(shard, allocation, null);
// Without fetchData, decision for in-eligible shards is non-null from our preliminary checks and null for eligible shards.
if (shardDecisionWithoutFetch != null) {
ineligibleShards.add(shard);
ineligibleShardAllocationDecisions.put(shard, shardDecisionWithoutFetch);
} else {
eligibleShards.add(shard);
}
}
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

To decide if the shard is eligible, it calls getUnassignedShardAllocationDecision()

private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
ShardRouting shardRouting,
RoutingAllocation allocation,
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
) {
if (!isResponsibleFor(shardRouting)) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
final boolean explain = allocation.debugDecision();
Decision allocationDecision = result.v1();
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}
if (nodeStoreFileMetaDataMapSupplier != null) {
Map<DiscoveryNode, StoreFilesMetadata> discoveryNodeStoreFilesMetadataMap = nodeStoreFileMetaDataMapSupplier.get();
return getAllocationDecision(shardRouting, allocation, discoveryNodeStoreFilesMetadataMap, result, logger);
}
return null;
}

This time, this expression evaluates to TRUE because explain=false, so the function returns NO, and the shard is marked ineligible.
This is expected behaviour - when deciders return NO, the shard should be ineligible, so this is fine.

if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {

After this loop completes, we call InternalReplicaBatchShardAllocator.fetchData(), with this shard being marked as ineligible:

final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

@Override
@SuppressWarnings("unchecked")
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(
List<ShardRouting> eligibleShards,
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
return (AsyncShardFetch.FetchResult<
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) fetchDataAndCleanIneligibleShards(
eligibleShards,
inEligibleShards,
allocation
);
}
}

AsyncShardFetch.FetchResult<? extends BaseNodeResponse> fetchDataAndCleanIneligibleShards(

In the above code, we remove all the ineligible shards from the batch, which also wipes the shard entry from the batch cache:

private void removeFromBatch(ShardRouting shard) {
removeShard(shard.shardId());
clearShardFromCache(shard.shardId());
// assert that fetcher and shards are the same as batched shards
assert batchInfo.size() == asyncBatch.shardAttributesMap.size() : "Shards size is not equal to fetcher size";
}

So any information about this shard is now removed from the batch cache. So all the work from the previous fetchData also goes to waste.

So when we call allocation explain a second time, the cache will be empty, we will again restart the whole flow and return AWAITING_INFO forever...

Flow for manual reroute

The flow for manual reroute is affected in a very similar way.

When we call manual reroute API, the first API call is made with explain=true. This is hardcoded in the logic for manual reroutes.
Assume again that deciders are returning NO decision for our replica shard for any reason.

We will start with this entry point for the flow:

protected void innerAllocateUnassignedBatch(
RoutingAllocation allocation,
PrimaryShardBatchAllocator primaryBatchShardAllocator,
ReplicaShardBatchAllocator replicaBatchShardAllocator,
boolean primary
) {
// create batches for unassigned shards
Set<String> batchesToAssign = createAndUpdateBatches(allocation, primary);
if (batchesToAssign.isEmpty()) {
return;
}
if (primary) {
batchIdToStartedShardBatch.values()
.stream()
.filter(batch -> batchesToAssign.contains(batch.batchId))
.forEach(
shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation)
);
} else {
batchIdToStoreShardBatch.values()
.stream()
.filter(batch -> batchesToAssign.contains(batch.batchId))
.forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation));
}
}

We check for replica shard eligibility here:

for (ShardRouting shard : shardRoutings) {
AllocateUnassignedDecision shardDecisionWithoutFetch = getUnassignedShardAllocationDecision(shard, allocation, null);
// Without fetchData, decision for in-eligible shards is non-null from our preliminary checks and null for eligible shards.
if (shardDecisionWithoutFetch != null) {
ineligibleShards.add(shard);
ineligibleShardAllocationDecisions.put(shard, shardDecisionWithoutFetch);
} else {
eligibleShards.add(shard);
}
}

if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}

if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {

Now because hasInitiatedFetching returns true, the entire expression above returns False and the function getUnassignedShardAllocationDecision() returns null.

So we mark the shard as eligible.
** This is wrong. At this point, we already know the deciders are returning NO, so the shard should not have been marked eligible **

Because we incorrectly mark the shard as eligible, we trigger fetchData() here, which eventually calls AsyncShardFetch.fetchData():

final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

This again triggers a second reroute, this time with explain = false [hardcoded].

Now in the second reroute, because explain = false, the shard is correctly marked as ineligible and is wiped from the cache.

So essentially we triggered the first fetchData and second reroute for no reason - The first reroute should have returned NO decision without ever calling fetchData, and we should have never called a second reroute.

FIX

We need to fix hasInitiatedFetching() function to correctly check if fetching has happened at least once or not.
The implementation in non batch mode is correct so we see these issues.

The only case where we SHOULD call AsyncShardFetch.fetchData() for decision = NO, is when we already have the data from all nodes available in the cache. This way we know that when we call AsyncShardFetch.fetchData(), it won't trigger a new AsyncShardFetch.asyncFetch() because all the nodes in the cache will have an entry

public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId, Set<String>> ignoreNodes) {

cache.fillShardCacheWithDataNodes(nodes);
List<String> nodeIds = cache.findNodesToFetch();
if (nodeIds.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them
// use a unique round id to detect stale responses in processAsyncFetch
final long fetchingRound = round.incrementAndGet();
cache.markAsFetching(nodeIds, fetchingRound);
DiscoveryNode[] discoNodesToFetch = nodeIds.stream().map(nodes::get).toArray(DiscoveryNode[]::new);
asyncFetch(discoNodesToFetch, fetchingRound);
}

Essentially, in allocation explain or reroute flow for shards with decision=NO, we should only call AsyncShardFetch.fetchData() when we know that the cache already has the data, so we can guarantee a new asyncFetch would not get triggered for this ineligible shard.

The only point of using the data from the node cache in this case, is that we can also populate shard store info for each node along with the NO decision.

When we call getAllocationDecision() with this non empty cache, this function will augment the number of matching bytes that are there on each node for a specific shard. This gets appended to the NO decision here:

protected AllocateUnassignedDecision getAllocationDecision(

MatchingNodes matchingNodes = findMatchingNodes(
unassignedShard,
allocation,
false,
primaryNode,
primaryStore,
nodeShardStores,
explain
);
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";
List<NodeAllocationResult> nodeDecisions = augmentExplanationsWithStoreInfo(allocationDecision.v2(), matchingNodes.nodeDecisions);
if (allocationDecision.v1().type() != Decision.Type.YES) {
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.v1().type()),
nodeDecisions
);

Check augmentExplanationsWithStoreInfo() for more details about this:

private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores
) {
if (nodeDecisions == null || withShardStores == null) {
return null;
}
List<NodeAllocationResult> augmented = new ArrayList<>();
for (Map.Entry<String, NodeAllocationResult> entry : nodeDecisions.entrySet()) {
if (withShardStores.containsKey(entry.getKey())) {
augmented.add(withShardStores.get(entry.getKey()));
} else {
augmented.add(entry.getValue());
}
}
return augmented;
}

Related component

Cluster Manager

To Reproduce

  1. Create a cluster with 1 master node and 5 data nodes and batch mode on
  2. Create an index with 1 primary and 4 replicas
  3. Stop one data node, now the cluster will go yellow
  4. Call allocation/explain API for this yellow shard
  5. Allocation explain API will return AWAITING_INFO.

Expected behavior

Allocation explain API should return NO and not get stuck in AWAITING_INFO

Additional Details

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Cluster Manager
Projects
Status: ✅ Done
2 participants