Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Batch Fetch] Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs #14972

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))

### Dependencies
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -797,11 +798,26 @@ public void testBatchModeEnabledWithoutTimeout() throws Exception {
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
// Replica shard would be marked ineligible since there are no data nodes.
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
// Now start one data node
logger.info("--> restarting the first stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
ensureStableCluster(2);
ensureYellow("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());

// calling reroute and asserting on reroute response
logger.info("--> calling reroute while cluster is yellow");
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

// Now start last data node and ensure batch mode is working and cluster goes green
logger.info("--> restarting the second stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
Expand Down Expand Up @@ -842,11 +858,26 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
// Replica shard would be marked ineligible since there are no data nodes.
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
// Now start one data nodes and ensure batch mode is working
logger.info("--> restarting the first stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
ensureStableCluster(2);
ensureYellow("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());

// calling reroute and asserting on reroute response
logger.info("--> calling reroute while cluster is yellow");
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

// Now start last data node and ensure batch mode is working and cluster goes green
logger.info("--> restarting the second stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
Expand Down Expand Up @@ -907,7 +938,9 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
// All replica shards would be marked ineligible since there are no data nodes.
// They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
Expand Down Expand Up @@ -1051,6 +1084,18 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
ensureGreen("test");
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception {
// Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
// returning NO
this.allocationExplainReturnsNoWhenExtraReplicaShard(false);
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception {
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved
// Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
// returning NO
this.allocationExplainReturnsNoWhenExtraReplicaShard(true);
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
Expand Down Expand Up @@ -1104,7 +1149,9 @@ public void testNBatchesCreationAndAssignment() throws Exception {
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
// All replica shards would be marked ineligible since there are no data nodes.
// They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
Expand Down Expand Up @@ -1193,7 +1240,9 @@ public void testCulpritShardInBatch() throws Exception {
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
// Replica shard would be marked ineligible since there are no data nodes.
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertTrue(clusterRerouteResponse.isAcknowledged());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
Expand Down Expand Up @@ -1511,4 +1560,97 @@ private List<String> findNodesWithShard(final boolean primary) {
Collections.shuffle(requiredStartedShards, random());
return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList());
}

private void allocationExplainReturnsNoWhenExtraReplicaShard(boolean batchModeEnabled) throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), batchModeEnabled).build()
);
internalCluster().startDataOnlyNodes(5);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build()
);
ensureGreen("test");
ensureStableCluster(6);

// Stop one of the nodes to make the cluster yellow
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to
// INDEX_CREATED
List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));

ensureStableCluster(5);
ensureYellow("test");

logger.info("--> calling allocation explain API");
// shard should have decision NO because there is no valid node for the extra replica to go to
AllocateUnassignedDecision aud = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision();

assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());

// Now creating a new index with too many replicas and trying again
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build()
);

ensureYellowAndNoInitializingShards("test2");

logger.info("--> calling allocation explain API again");
// shard should have decision NO because there are 6 replicas and 4 data nodes
aud = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision();

assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());

logger.info("--> restarting the stopped node");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
);

ensureStableCluster(6);
ensureGreen("test");

logger.info("--> calling allocation explain API 3rd time");
// shard should still have decision NO because there are 6 replicas and 5 data nodes
aud = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision();

assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());

internalCluster().startDataOnlyNodes(1);

ensureStableCluster(7);
ensureGreen("test2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@
this.cache.deleteShard(shardId);
}

public boolean hasEmptyCache() {
return this.cache.getCache().isEmpty();

Check warning on line 84 in server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java#L84

Added line #L84 was not covered by tests
}

public AsyncShardFetchCache<T> getCache() {
return this.cache;

Check warning on line 88 in server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java#L88

Added line #L88 was not covered by tests
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
logger.trace("{}: ignoring allocation, can't be allocated on any node. Decision: {}", shardRouting, allocationDecision.type());
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,37 @@

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
/**
* This function is to check if asyncFetch has happened before for this shard batch, or is ongoing.
* It should return false if there has never been a fetch for this batch.
* This function is currently only used in the case of replica shards when all deciders returned NO/THROTTLE, and explain mode is ON.
* Allocation explain and manual reroute APIs try to append shard store information (matching bytes) to the allocation decision.
* However, these APIs do not want to trigger a new asyncFetch for these ineligible shards, unless the data from nodes is already there.
* This function is used to see if a fetch has happened to decide if it is possible to append shard store info without a new async fetch.
* In the case when shard has a batch but no fetch has happened before, it would be because it is a new batch.
* In the case when shard has a batch, and a fetch has happened before, and no fetch is ongoing, it would be because we have already completed fetch for all nodes.
*
* In order to check if a fetch has ever happened, we check 2 things:
* 1. If the shard batch cache is empty, we know that fetch has never happened so we return false.
* 2. If we see that the list of nodes to fetch from is empty, we know that all nodes have data or are ongoing a fetch. So we return true.
* 3. Otherwise we return false.
*
* see {@link AsyncShardFetchCache#findNodesToFetch()}
*/
String batchId = getBatchId(shard, shard.primary());
return batchId != null;
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved
if (batchId == null) {
return false;

Check warning on line 598 in server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java#L598

Added line #L598 was not covered by tests
}
logger.trace("Checking if fetching done for batch id {}", batchId);

Check warning on line 600 in server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java#L600

Added line #L600 was not covered by tests
ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId);
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved
// if fetchData has never been called, the per node cache will be empty and have no nodes
// this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData
rahulkarajgikar marked this conversation as resolved.
Show resolved Hide resolved
if (shardsBatch == null || shardsBatch.getAsyncFetcher().hasEmptyCache()) {
logger.trace("Batch cache is empty for batch {} ", batchId);
return false;

Check warning on line 606 in server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java#L605-L606

Added lines #L605 - L606 were not covered by tests
}
// this check below is to make sure we already have all the data and that we wouldn't create a new async fetchData call
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
return shardsBatch.getAsyncFetcher().getCache().findNodesToFetch().isEmpty();

Check warning on line 609 in server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java#L609

Added line #L609 was not covered by tests
}
}

Expand Down
Loading