Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Async shard fetches taking up GBs of memory causing ClusterManager JVM to spike with large no. of shards (> 50k) #5098

Open
shwetathareja opened this issue Nov 7, 2022 · 24 comments
Assignees

Comments

@shwetathareja
Copy link
Member

Describe the bug
GatewayAllocator performs async fetch for unassigned shards to check if any node has shards on its local disk. These async fetches are done per shard. When nodes are dropped in the cluster, it results in unassigned shards and GatewayAllocator performs async fetches when nodes try to join back. In a cluster which has large no. of shards in 10s of thousands it results in too many async fetches causing cluster manager JVM to spike.

In one of the cluster which had > 50K shards, async fetches was taking 13GB memory.

Screenshot 2022-09-22 at 3 30 38 PM

Expected behavior
These async fetches should be batched per node instead of being executed per shard per node.

OpenSearch Version : 1.1

Additional context
Also, these async fetches were discussed earlier in the context of taking up lot of CPU while sending these requests during reroute operation. elastic/elasticsearch#57498 It should also be evaluated if these requests should be sent from different threadpool instead of blocking the masterService#updateTask which is single threaded and needed for processing cluster state updates faster.

@Gaurav614
Copy link
Contributor

I tried doing a repro of the issue, wherein I found that approx 82-83% of heap is being used by GatewayAllocator.

Steps of repro:

  1. Create a cluster of 50 data nodes, and with around 130k shards.
  2. Restarted all 50 nodes in a rolling fashion
  3. Had a cron that took active heap dumps on master during that time.

On analysis found that DiscoveryNode is part of response of AsyncShardFetch and is consuming considerable amount of memory.

Screenshots:
Screenshot 2023-04-11 at 1 37 35 PM

Screenshot 2023-04-11 at 1 38 09 PM

Screenshot 2023-04-11 at 1 35 03 PM

@Gaurav614
Copy link
Contributor

There can be another improvement area around the cache strategy used in AsyncFetch. It occupies significant chunk of master's memory (about 60%) in case of large cluster restarts.

The Cache currently stores two types of data, NodeGatewayStartedShards and NodeStoreFilesMetadata.
We can also think if there is any scope of improvement in these objects store or not.(One is definitely as stated above the DisoveryNode one).

ScreenShot:
Screenshot 2023-04-14 at 10 17 31 AM

Step of repro were same as here: #5098 (comment)

@shwetathareja
Copy link
Member Author

shwetathareja commented Apr 19, 2023

On analysis found that DiscoveryNode is part of response of AsyncShardFetch and is consuming considerable amount of memory.

Today, considering responses from all the nodes are cached for an unassigned shard so that GatewayAllocator can generate the complete picture. It caches the empty response as well which can be replaced by a dummy object. This will reduce the DiscoveryNode class overhead significantly from n (nodes) * m (unassignedshards) to m only.

return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null);

@amkhar
Copy link
Contributor

amkhar commented Jun 19, 2023

Overall Proposal

Context

During node-left and node-join we do a reroute for assigning unassigned shards. To do this assignment Cluster Manager fetches the shards metadata in async manner on per shard basis. This metadata contains the information about the shard and its allocation. We need this to take a new decision on allocation. reroute is the crucial method call of AllocationService which takes care of this flow for whole cluster.

public ClusterState reroute(ClusterState clusterState, String reason)

Once we receive the data from all the nodes (all nodes relevant for the shardId), we build allocation decision and perform reroute. [I’ll explain code flow in detail upcoming section]

Problem

In case there are 50K/100K shards, during cluster restart scenarios, we end up making transport calls for each and every shard. And all transport threads are doing this same work(same code flow for different shardId) which chokes the transport and it make the cluster unstable. OOM kill is one of the results of this situation as there are back to back calls.
Spike in JVM is seen due to so many objects of DiscoveryNode class, which are coming in response of the transport calls made to the data nodes. As this is a base class in transport call response class, it’s always present for each shard call’s response.

Acronyms :
GA - GatwayAllocator
PSA - PrimaryShardAllocator
RSA - ReplicaShardAllocator
IPSA - InternalPrimaryShardAllocator

Current flow

Let’s understand the code flow a bit better.

Node left scenario

Coordinator.Java has a method removeNode which is the handler for any node failure. It submits a task for node-left with executor type NodeRemovalClusterStateTaskExecutor

clusterManagerService.submitStateUpdateTask(
"node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),

execute method of this particular executor triggers AllocationService

return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));

Above method unassign shards that are associated with a node which is not part of the cluster. Executor always pass reroute flag as true.

This method first fail all the shards present on bad nodes, then triggers reroute method of AllocationService.

if (reroute) {
return reroute(clusterState, reason);

reroute calls internal private reroute method which triggers the allocation of existing unassigned shards.

allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first

private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.beforeAllocation(allocation);
}
final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
if (shardRouting.primary()) {
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);
}
}
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
}
final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
while (replicaIterator.hasNext()) {
final ShardRouting shardRouting = replicaIterator.next();
if (shardRouting.primary() == false) {
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator);
}
}
}

Now for each primary and replica shard, allocateUnassigned method of GatewayAllocator class is called which triggers

innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);

innerAllocate calls allocateUnassigned method of BaseGatewayShardAllocator which tries to make allocation decision:

final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);

makeAllocation decision is written separately for primary and replica in PSA & RSA respectively. PSA calls fetchData method of IPSA(written in GA file) to find the metadata about this shard from data nodes using transport call:

final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);

GA maintains a map where key is shardId and value is a fetcher object, this fetcher object is used for actually making the transport call and fetch the data.

AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch = asyncFetchStarted.computeIfAbsent(
shard.shardId(),
shardId -> new InternalAsyncFetch<>(
logger,
"shard_started",
shardId,
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
startedAction
)
);
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(
allocation.nodes(),
allocation.getIgnoreNodes(shard.shardId())
);

fetchData method is written in AsyncFetchShard class, as this is an async method, we intelligently triggers reroute in the async response of this method. Once data is returned, reroute will trigger the flow but won’t call data nodes again as it’ll see that required data is already received in the cache. Because we keep looking in the cache and check which are remaining for fetching using following call within fetchData method:

List<NodeEntry<T>> nodesToFetch = findNodesToFetch(cache);

Based on the result of this data - AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState
We build the result of which nodes we can allocate this shard to and mark allocation decision as YES.
Based on this BaseGatewayShardAllocator’s allocataeUnassigned method initializes the shard.

if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
unassignedAllocationHandler.initialize(
allocateUnassignedDecision.getTargetNode().getId(),
allocateUnassignedDecision.getAllocationId(),
getExpectedShardSize(shardRouting, allocation),
allocation.changes()
);
} else {
unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());

Node join scenario

JoinTaskExecutor has the code for the actual Task which gets executed on node-join, it triggers a reroute as part of its flow in case cluster manager is changed or a new node joins. In case cluster state is updated with existing nodes, as part of applying the cluster state IndicesClusterStateService triggers shard action - shart/started or shard/failure. Both started and failed executors(ShardStateAction class) have a handler in case cluster state is published from cluster manager and triggers reroute:

rerouteService.reroute(
"post-join reroute",
Priority.HIGH,
ActionListener.wrap(r -> logger.trace("post-join reroute completed"), e -> logger.debug("post-join reroute failed", e))

Shard Started

public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
rerouteService.reroute(
"reroute after starting shards",
prioritySupplier.get(),
ActionListener.wrap(
r -> logger.trace("reroute after starting shards succeeded"),
e -> logger.debug("reroute after starting shards failed", e)
)

Shard Failed

public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size();
if (numberOfUnassignedShards > 0) {
// The reroute called after failing some shards will not assign any shard back to the node on which it failed. If there were
// no other options for a failed shard then it is left unassigned. However, absent other options it's better to try and
// assign it again, even if that means putting it back on the node on which it previously failed:
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
logger.trace("{}, scheduling a reroute", reason);
rerouteService.reroute(
reason,
prioritySupplier.get(),
ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)

So we know ultimately it’ll trigger the whole flow of assigning unassigned shards.

Solution

Key bottleneck we found by profiling is too many calls of the same fetchData transport(too many serialization/deserialization) as well as the in memory load of too many DiscoveryNode objects once response is received and until it is garbage collected. *More analysis of heap dump needs to be done for understanding more data points on memory side. *

Phase 1: Batch calls for multiple shards

First solution is to batch call for multiple shards which is being done 1 per shard today. This will contain different request type as we take multiple data structures similar to what we send today for 1 shard. Response structure will also change accordingly. This batching can be done on node level or with a fix batch count depending on performance or failure scenarios.
A draft PR has been opened already #7269 , this has rough code for making the call on node level. As we know today 1K shards can be present on a node, we can discuss further if that number is okay or we need configurable batching number to give more resiliency to the solution. Based on this discussion, the solution will change a little bit as currently I'm suggesting changes only for per node level call.

Phase 2 : Reduce Discovery Node objects

We must reduce DiscoveryNodeObjects as they are being created via new when we receive the response as part of code :

public NodeGatewayStartedShards(StreamInput in) throws IOException {
    super(in);
}

Code inside super

protected BaseNodeResponse(StreamInput in) throws IOException {
        super(in);
        **node = new DiscoveryNode(in);**
    }

Part 1:

We reduce the number of these objects based on the number of calls we make - for example we make a batch of 100 and send 1000 calls. So, only 1000 DiscoveryNode objects will be present depending on for which node we’re sending the call. This way we reduce DN objects as well as save on ser/desr work. We also need to keep a version check(>3.0) for destination node so BWC is maintained(as we know older version nodes won't have this handler).

Changes required

We need to change the flow starting from AllocationService to all the way down to the transport call (fetchData).

Surrounding changes :

  • Need to introduce a new method like allocateUnassignedBatch or allocateUnassignedPerNode in the interface ExistingShardsAllocator , because today it has allocateUnassigned and that is tightly written for per shard.
  • Use the newly written method in allocateExistingUnassignedShards method based on version to avoid BWC issues.
  • Construct a map like <Node, List> , iterate this map and call allocateUnassignedPerNode passing one entry of this map.
  • Write a new implementation of allocateUnassignedPerNode in GatewayAllocator which calls the allocateUnassignedPerNode of BaseGatewayShardsAllocator
  • Add a new method similar to makeAllocationDecision, as makeAllocationDecision makes sense for allocations done on specific nodes. As the actual implementation of this method is written in PSA/RSA. They need to write a new implementation and return a List [will discuss later if abstracting out the makeAllocationDecision all the way to ExistingShardsAllocator would make sense ??]
  • BaseGatewayShardsAllocator then use these list of decisions to do the actual allocation.

Core changes required for new transport call :

AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores = fetchData(unassignedShard, allocation);

  • This fetch data needs to be changed for a list of shards not just a single shard so a new method is required in both PSA & RSA with new implementation.
  • As the fetcher objects of GA needs to do the work per node basis the core map of per shard fetcher
private final ConcurrentMap<
        ShardId,
        AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
            .newConcurrentMap();

will change to a different type like

private final ConcurrentMap<
        NodeId,
        AsyncShardBatchFetch<TransportNodesListGatewayStartedShards**Batch**.NodeGatewayStartedShards**Batch**>> nodeLevelShardFetchers = ConcurrentCollections
            .newConcurrentMap();
  • Currently request contains shardId and customDataPath, new request will have a List of <ShardId, CustomDataPath> or map of same in case .get method is required in implementation.
  • Similarly response contain an allocationId in NodeGatewayStartedShards, new response would have the mapping of <shardId, NodeGatewayStartedShards> .
  • Key thing is that the overall response from 1 node will be having 1 DiscoveryNode object in base class BaseNodeResponse, so for many shards (or all shards on a node) there will be reduced number of DiscoveryNode object in response anyways.
  • Implementation of the transport call on node will be similar to current nodeOperation code but it'll work of a number of shards not just 1.
  • Cache object which contains the information about which node's fetching is in progress or done or failed, will be flipped to maintain similar information for shardId (in our node level fetcher class's cache).

Changes for failure handling or correctness

  • We'll keep getting new set of shards which are unassigned if more nodes go down. For that we need to maintain an atomic set where shards are maintained for which fetch is already happening and async success call will clear it out from the set. This way we always make sure that we can cleanly send a new request for different set of shards if new nodes fail. Before sending a call, we must check and get the diff using that atomic/synchronized set.
  • In case result was failed for a particular shard (as request is being executed for single node it's hard to see partial failures, but still..), we'll either restart fetching based on retryable exceptions or permanently fail the fetch as it is done today.

logger.warn(
() -> new ParameterizedMessage(
"{}: failed to list shard for {} on node [{}]",
shardId,
type,
failure.nodeId()
),
failure
);
nodeEntry.doneFetching(failure.getCause());

Part 2:

We keep the DiscoveryNode objects limited to the number of nodes exactly. As we know only fixed number of nodes should exist or join/drop the cluster, we keep some common cache for all this particular information and refer it(instead of doing new) whenever needed based on ephemeralId or some other relevant id. This way we can save on memory. As first solution itself is giving some relief in number of DiscoveryNode objects, we can debate more on if this is truly required based on gains diff of part 1 and part 2.

Impact

A benchmarking was run to see that we can scale well with batching approach.

Cluster can scale up to 100-120K shards without facing any OOM kill where current code faces OOM kill with 45K shards. [more details on benchmarking will be attached below]

Note : Current POC code also contain changes where we don’t rely on BaseNodeResponse class so number of DiscoveryNode objects are reduced to number of calls only and right now that is per node, so part2 of the phase 2 solution is automatically covered. But based on discussion on these approaches, I'll write actual code

Pending Items:

Note 2: Integration tests are failing, need to look at those.

AI: Also need to explore if we need to change the GA async fetch call to generic threadpool instead of using clusterManager updateTask

@amkhar
Copy link
Contributor

amkhar commented Jun 19, 2023

@sachinpkale @Bukhtawar @andrross @shwetathareja looking for feedback on proposal.

@amkhar
Copy link
Contributor

amkhar commented Jun 26, 2023

@sachinpkale @Bukhtawar @andrross @shwetathareja gentle reminder.

@amkhar
Copy link
Contributor

amkhar commented Jun 27, 2023

@reta - would love some feedback.

@shwetathareja
Copy link
Member Author

Thanks @amkhar for explaining the problem in detail.

Couple of points on the Phase 1 solution for batching (more around implementation):

  1. The batch size for no. of unassigned shards queried per node can be configurable.
  2. Lets say during first round of reroute, 10 shards were unassigned and a request was sent to all nodes for those 10 unassigned shards, now 1 more shard got unassigned, would it send a new request with that one shard or send with 11 shards or wait for first request to return before sending a new request. would like to understand this more how are you handling this with atomic set as called out above. Would this set be maintain across fetching round?
  3. On a particular node, while fetching a particular shard information, it should have timeout so that if it is unable to collect information for one shard, it shouldn't keep other shards also waiting.
  4. What will be the behavior during mixed mode upgrades would it send batch request to new nodes and per shard to old nodes?
  5. Are there enough integration tests for the current flow, if not please add new tests upfront so that we can establish correctness via those tests after you make the change.
  6. Lets say Leader sent a request r1 to node n1 and assumed it timed out and sent another one r2 as well. But the response for first request r1 arrived later after r2. In this case, old fetch round responses should be discarded. But now consider the case from Point 2 where 2 fetch requests were sent with different shardId list to same node and both responses should be processed. How are both the cases differentiated and handled?

Regarding Phase 2 for reducing no. of discovery node objects:
These async fetches are done per unassigned shard per node as leader doesn't know which node has the shard. But the major overhead comes as it caches the response as is and for cases where shard is not present (which will be the majority) is also stored as is and has DiscoveryNode object. As you are making calls per node and response structure is already changed, then do you still need explicit changes for DiscoveryNode, that should be addressed automatically?

Be careful when changing ExistingShardsAllocator interface to ensure it is not a breaking change as it is exposed via ClusterPlugin

default Map<String, ExistingShardsAllocator> getExistingShardsAllocators() {

This also means you will need to keep per shard fetching marked as deprecated in 2.x and can be removed in 3.0 only.

In case result was failed for a particular shard (as request is being executed for single node it's hard to see partial failures, but still..), we'll either restart fetching based on retryable exceptions or permanently fail the fetch as it is done today.

will it fetch only for failed shards or all the shards in that batch? What is the current retry mechanism?

Cache object which contains the information about which node's fetching is in progress or done or failed, will be flipped to maintain similar information for shardId (in our node level fetcher class's cache).

Today, the cache can get per shard view easily, Now it has to iterate over all entries in the top level map with NodeId as the key to check the shard status. Right?

Add a new method similar to makeAllocationDecision, as makeAllocationDecision makes sense for allocations done on specific nodes. As the actual implementation of this method is written in PSA/RSA. They need to write a new implementation and return a List [will discuss later if abstracting out the makeAllocationDecision all the way to ExistingShardsAllocator would make sense ??]

How would it impact _cluster/allocation/explain API which can be triggered per shard?

@Bukhtawar
Copy link
Collaborator

Thanks Aman, per node calls make sense, I am assuming this is only being done for assigning primary shards first and then replica shards subsequently. It would be easier to review the code once you have the draft ready

@reta
Copy link
Collaborator

reta commented Jun 27, 2023

@reta - would love some feedback.

Thanks @amkhar , the details go way beyond my knowledge but batching per node looks very logical

@amkhar
Copy link
Contributor

amkhar commented Jun 29, 2023

@shwetathareja

The batch size for no. of unassigned shards queried per node can be configurable.

Yes.

Lets say during first round of reroute, 10 shards were unassigned and a request was sent to all nodes for those 10 unassigned shards, now 1 more shard got unassigned,

We have this cache object today, it'll be utilized to know which shards are already being fetched for a particular node.
private final Map<String, NodeEntry<T>> cache = new HashMap<>();
will be changed to
private final Map<ShardId, ShardEntry<T>> cache = new HashMap<>();
Similarly our current findNodesToFetch method will change to findShardsToFetch, that way we identify from cache, which all shards should be sent for fetching and skip the ones which are already being fetched.

would it send a new request with that one shard

Yes

or send with 11 shards or wait for first request to return before sending a new request.

No and No

On a particular node, while fetching a particular shard information, it should have timeout so that if it is unable to collect information for one shard, it shouldn't keep other shards also waiting.

We've this piece of code per shard which is being executed on the data node.

final ShardId shardId = request.getShardId();
logger.trace("{} loading local shard state info", shardId);
ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(
logger,
namedXContentRegistry,
nodeEnv.availableShardPaths(request.shardId)
);
if (shardStateMetadata != null) {
if (indicesService.getShardOrNull(shardId) == null
&& shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) {
final String customDataPath;
if (request.getCustomDataPath() != null) {
customDataPath = request.getCustomDataPath();
} else {
// TODO: Fallback for BWC with older OpenSearch versions.
// Remove once request.getCustomDataPath() always returns non-null
final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex());
if (metadata != null) {
customDataPath = new IndexSettings(metadata, settings).customDataPath();
} else {
logger.trace("{} node doesn't have meta data for the requests index", shardId);
throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex());
}
}
// we don't have an open shard on the store, validate the files on disk are openable
ShardPath shardPath = null;
try {
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
} catch (Exception exception) {
final ShardPath finalShardPath = shardPath;
logger.trace(
() -> new ParameterizedMessage(
"{} can't open index for shard [{}] in path [{}]",
shardId,
shardStateMetadata,
(finalShardPath != null) ? finalShardPath.resolveIndex() : ""
),
exception
);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
return new NodeGatewayStartedShards(
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
null,
exception
);
}
}
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
final IndexShard shard = indicesService.getShardOrNull(shardId);
return new NodeGatewayStartedShards(
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
shard != null ? shard.getLatestReplicationCheckpoint() : null
);
}
logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null);

A for loop is required to run it for multiple shards, I don't think running this in parallel new threads is a good idea. So, yes, we can add a timeout for per shard execution, if any of the shard fetch code execution(one execution of loop code) takes time, we can mark this as failed shard with exception type as OpenSearchTimeOut and return the response accordingly.

Lets say Leader sent a request r1 to node n1 and assumed it timed out and sent another one r2 as well.

Even today, we don't assume any time outs. Listener gets a response in onFailure, if there is a total failure or node, we do get this call in listener's onFailure. Based on this call we reset the nodeEntry present in the cache. So, new calls can go through now for this node.

In upcoming change : As we'll have shardEntry in the cache (mostly concurrent), if a request is timed out or we received a full failure from node, here on leader side, we'll reset the shardEntry so it can be re-fetched by next set of calls (until then new calls won't be made for those shards, only after we update this cache{source of truth for us}).

But the response for first request r1 arrived later after r2

Not possible, as discussed above. If the shardId is same, we'll send the new request, only after marking previous one as failed or updating the common cache object as restartFetching.

In this case, old fetch round responses should be discarded.

They will be discarded, current logic does it for nodeEntries, we'll do same thing for shardEntries. Only change is - today in per shard cache, we used to mark one entry of HM as fetching=false, now we'll need to do it for a batch of entries of HM for each shard in our newly created per node cache.

But now consider the case from Point 2 where 2 fetch requests were sent with different shardId list to same node and both responses should be processed. How are both the cases differentiated and handled?

As the cache contain different entries for every shard, two different request for different set of shards can easily update their respective set of entries in the map and we'll update the response only after all the shards are fetched from that particular node.

As you are making calls per node and response structure is already changed, then do you still need explicit changes for DiscoveryNode, that should be addressed automatically?

Yes, partially count of DiscoveryNode overall in memory will be automatically reduced as number of responses are reduced. Basically, if we're returning the responses from 1 node only, we don't need to keep the BaseNodeResponse class as parent class of response object. Because we know it's the same DiscoveryNode object for all shards (which are being sent in 1 single request).

As I mentioned previously #5098 (comment)

As first solution itself is giving some relief in number of DiscoveryNode objects, we can debate more on if this is truly required based on gains diff of part 1 and part 2.

Phase 2 will only be applicable in case we set the batchSize as 100 and there are 1000 shards on the node. Now we may end up calling new transport 10 times and get 10 DiscoveryNode objects for the same node. So, to optimize even that we could see if we can just keep it 1 DiscoveryNode object per node and not store every response's DiscoveryNode object in memory if ephemeralId or uniqueId for the node is same(then it can be referred from or updated in some cache maintained on cluster manager side).

Be careful when changing ExistingShardsAllocator interface to ensure it is not a breaking change as it is exposed via ClusterPlugin

For now I only see one implementation and this method is not overridden there. I think if a new ExistingShardsAllocator implementation is provided by new plugins, that need to add extra methods which we're planning to add now (allocation of shards in a batch per node).

What will be the behavior during mixed mode upgrades would it send batch request to new nodes and per shard to old nodes?

Handling of this will be difficult and make the code complicated (imagine all book keeping done for both type of requests). So it'll be easier to enable batched request only when all nodes have the handler for batch request. Else we'll need to write more code just to handle the different type of responses and merge their responses to get a final view :)

will it fetch only for failed shards or all the shards in that batch? What is the current retry mechanism?

Current retry mechanism marks which node returned failure, based on that in next round only those nodes are picked up. For failures, we'll mark only specific shards as failed in cache if only some of those are failed while fetching from a node. We won't fail the whole batch. Whole batch will be failed only in case node itself return failure or gets disconnected.

Today, the cache can get per shard view easily, Now it has to iterate over all entries in the top level map with NodeId as the key to check the shard status. Right?

Cache is only being used for book keeping purpose today to know which nodes are fetching or fetching is done/failed etc. We also store the result in cache that is a map where key is nodeId and value is NodeEntry so for each node where we're sending the request we keep the result in cache. Then overall decision is made base on this map.
So, yes it's true that when we batch the request for a single node, our response won't have all the necessary data for making an allocation decision. For that we'll need to read the full data : responses of every "per node batched request" to actually get a view of an unassigned shard and all the nodes it got in the different responses.
For example :
response-from-n1 = {S1: s1-res-n1, S2 : s2-res-n1}
response-from-n2 = {S1: s1-res-n2, S2 : s2-res-n2}

Now for making an actual allocation decision, we'll need a view of
S1 -> [s1-res-n1, s2-res-n2]

So, we do need to traverse outer node level map and build internal shard level view to make final decision (as it's done today in PSA).
Ref :

final NodeShardsResult nodeShardsResult = buildNodeShardsResult(
unassignedShard,
snapshotRestore,
allocation.getIgnoreNodes(unassignedShard.shardId()),
inSyncAllocationIds,
shardState,
logger
);
final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
logger.debug(
"[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]",
unassignedShard.index(),
unassignedShard.id(),
nodeShardsResult.orderedAllocationCandidates.size(),
unassignedShard,
inSyncAllocationIds
);
if (enoughAllocationsFound == false) {
if (snapshotRestore) {
// let BalancedShardsAllocator take care of allocating this shard
logger.debug(
"[{}][{}]: missing local data, will restore from [{}]",
unassignedShard.index(),
unassignedShard.id(),
unassignedShard.recoverySource()
);
return AllocateUnassignedDecision.NOT_TAKEN;
} else {
// We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
// We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
// this shard will be picked up when the node joins and we do another allocation reroute
logger.debug(
"[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",
unassignedShard.index(),
unassignedShard.id(),
nodeShardsResult.allocationsFound
);
return AllocateUnassignedDecision.no(
AllocationStatus.NO_VALID_SHARD_COPY,
explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null
);
}
}
NodesToAllocate nodesToAllocate = buildNodesToAllocate(
allocation,
nodeShardsResult.orderedAllocationCandidates,
unassignedShard,
false
);

@Gaurav614
Copy link
Contributor

Gaurav614 commented Jun 30, 2023

Thanks @amkhar for collating the knowledge and attaching my Draft PR for it. So here are few points that I will like to clarify:

We'll keep getting new set of shards which are unassigned if more nodes go down. For that we need to maintain an atomic set where shards are maintained for which fetch is already happening and async success call will clear it out from the set. This way we always make sure that we can cleanly send a new request for different set of shards if new nodes fail. Before sending a call, we must check and get the diff using that atomic/synchronized set.

I think you mean to say the diff should be between the atomic set you maintain where fetch is in being progress and the Unassigned Shards list maintained by RoutingNodes here. So once fetch gets completed for current batch, we clear it from our atomic set. And if the Allocators(PSA/RSA) are unable to make decision with current available data, shards will still be in unassigned state. And the next round of batch(reroute call) will pick it as a diff.

Lets say during first round of reroute, 10 shards were unassigned and a request was sent to all nodes for those 10 unassigned shards, now 1 more shard got unassigned,

We have this cache object today, it'll be utilized to know which shards are already being fetched for a particular node.
private final Map<String, NodeEntry> cache = new HashMap<>();
will be changed to
private final Map<ShardId, ShardEntry> cache = new HashMap<>();
Similarly our current findNodesToFetch method will change to findShardsToFetch, that way we identify from cache, which all shards should be sent for fetching and skip the ones which are already being fetched.

I think you are confusing with the use of cache here, cache is store intermittant response from nodes as and when it is received over transport, the key will still be NodeId(String). What you can do to maintain the diff, is use the current batchOfShards request object that is being sent to AsyncShardFetch class to get current fetching shards, that can be atomic set


In case result was failed for a particular shard (as request is being executed for single node it's hard to see partial failures, but still..), we'll either restart fetching based on retryable exceptions or permanently fail the fetch as it is done today.

In current scenario:
There are two categories of failures
1- physical failures(FailedNodeException), 2- failures while fetching the metadata from shards.

1- First type of failures happen, when during the the transport any catastrophic event happens. These failures are restarted again by doing another round of reroute

// if at least one node failed, make sure to have a protective reroute
// here, just case this round won't find anything, and we need to retry fetching data
if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) {
reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]");
}
return new FetchResult<>(shardId, fetchData, allIgnoreNodes);

2- For second failures, The respone NodeGatewayStartedShard itself store the exception

public static class NodeGatewayStartedShards extends BaseNodeResponse {
private final String allocationId;
private final boolean primary;
private final Exception storeException;
private final ReplicationCheckpoint replicationCheckpoint;

These exception are handled by PSA/RSA accordingly.

if (nodeShardState.storeException() == null) {
if (allocationId == null) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else {
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
}
} else {
final String finalAllocationId = allocationId;
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
logger.trace(
() -> new ParameterizedMessage(
"[{}] on node [{}] has allocation id [{}] but the store can not be "
+ "opened as it's locked, treating as valid shard",
shard,
nodeShardState.getNode(),
finalAllocationId
),
nodeShardState.storeException()

We should not change this existing logic, since even if we have batch, in case of catastrophic physical Failures( category-1) we need to restart entire batch, because that node will come under FailedNodeException hierarchy.

Category 2 will be handled by PSA/RSA as it is done now.

@Gaurav614
Copy link
Contributor

Gaurav614 commented Jun 30, 2023

@shwetathareja @amkhar I still think that our main culprit here is GatewayAllocator fetching part which eats memory and has overheard of more number of transport calls. Current @amkhar solution is trying to revamp the Assignment part which is not our culprit.

We could make separation of concerns here by separating fetching part and Abstracting it out in different new interface.

We build the view of data by fetching (with the current proposed way of @amkhar that handles failures and have retries, i.e better version of my draft PR) and send the data to PSA/RSA per shard as it is done today. The downside is we have to loop over data from all nodes(which will be finite in number and in general less than number of shards) to get the data of required shard that PSA/RSA wants like I have demonstrated in a crude way in my POC code:

protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
ShardId shardId = shard.shardId();
Map<DiscoveryNode, TransportNodesCollectGatewayStartedShard.ListOfNodeGatewayStartedShards> discoveryNodeListOfNodeGatewayStartedShardsMap = shardsPerNode;
if (shardsPerNode.isEmpty()) {
return new AsyncShardFetch.FetchResult<>(shardId, null, Collections.emptySet());
}
HashMap<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> dataToAdapt = new HashMap<>();
for (DiscoveryNode node : discoveryNodeListOfNodeGatewayStartedShardsMap.keySet()) {
TransportNodesCollectGatewayStartedShard.ListOfNodeGatewayStartedShards shardsOnThatNode = discoveryNodeListOfNodeGatewayStartedShardsMap.get(node);
if (shardsOnThatNode.getListOfNodeGatewayStartedShards().containsKey(shardId)) {
TransportNodesCollectGatewayStartedShard.NodeGatewayStartedShards nodeGatewayStartedShardsFromAdapt = shardsOnThatNode.getListOfNodeGatewayStartedShards().get(shardId);
// construct a object to adapt
TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeGatewayStartedShardsToAdapt = new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, nodeGatewayStartedShardsFromAdapt.allocationId(),
nodeGatewayStartedShardsFromAdapt.primary(), nodeGatewayStartedShardsFromAdapt.replicationCheckpoint(), nodeGatewayStartedShardsFromAdapt.storeException());
dataToAdapt.put(node, nodeGatewayStartedShardsToAdapt);
}
}
return new AsyncShardFetch.FetchResult<>(shardId, dataToAdapt, Collections.emptySet());

If you look high level, the data collection part is still abstracted from PSA/RSA. By with my idea we are changing the abstraction layer of fetching from single shard to batch of shards.

This way will save from making any change in current interfaces and abstract classes and avoid any breaking changes.

@shwetathareja
Copy link
Member Author

For now I only see one implementation and this method is not overridden there. I think if a new ExistingShardsAllocator implementation is provided by new plugins, that need to add extra methods which we're planning to add now (allocation of shards in a batch per node).

@amkhar There could be custom plugin implementation that customers might be using in their private repos. And, you can't introduce a breaking change in 2.x where they need to implement a new method in the interface.

@shwetathareja
Copy link
Member Author

A for loop is required to run it for multiple shards, I don't think running this in parallel new threads is a good idea.

Today, it runs in parallel for all unassigned shard by the virtue of per shard calls. What is the latency overhead for fetch this detail on data node per shard?

@shwetathareja
Copy link
Member Author

For now I only see one implementation and this method is not overridden there. I think if a new ExistingShardsAllocator implementation is provided by new plugins, that need to add extra methods which we're planning to add now (allocation of shards in a batch per node).

@amkhar There could be custom plugin implementation that customers might be using in their private repos. And, you can't introduce a breaking change in 2.x where they need to implement a new method in the interface.

One suggestion you can run a loop on existing allocateUnassigned method from current interface in loop as the default implementation for the new batch method in ExistingShardsAllocator. That ways you can move to batch method without breaking any clients.

@Gaurav614
Copy link
Contributor

Gaurav614 commented Jul 5, 2023

Benchmarking result of POC code[#7269 ]

Version: Opensearch 2.3
Number of data nodes: 50
Number of master nodes: 3
Heap for node(commited): 5GB
Java version : Java 11
Number shards: 58000
Scenario: Restarted all data nodes

  1. We are reducing the number of Response Objects of the Transport Calls since we are now making less number of transport calls altogether

  2. Overall Heap decreased for DisoveryNodeObject objects reduction due to batching to number of data nodes
    Older code
    Screenshot 2023-06-13 at 1 26 06 PM
    Poc Code:
    Screenshot 2023-06-13 at 1 26 52 PM

  3. Number of cache objects reduced to number of data nodes.
    Older Code:
    Screenshot 2023-06-13 at 2 23 27 PM
    POC code:
    Screenshot 2023-06-13 at 2 23 45 PM

@amkhar
Copy link
Contributor

amkhar commented Jul 26, 2023

Thanks Aman, per node calls make sense, I am assuming this is only being done for assigning primary shards first and then replica shards subsequently. It would be easier to review the code once you have the draft ready

@Bukhtawar thanks for your response. As this is a bigger change, we're raising small sub PRs to make the reviews easy.
2 PRs and multiple draft PRs are out for review. Feel free to provide feedback.

This is the order of PRs which I think we'll follow for merging in main.

  1. Transport action request response class for primary Added transport action for bulk async shard fetch for primary shards #8218
  2. Transport action request response class for replica Add batch async shard fetch transport action for replica #8218 #8356
  3. AsyncBatchShardFetch class Batch Async Fetcher class changes #8742
  4. BaseGatewayShardsAllocator class - draft PR (BaseGatewayShardAllocator changes for Assigning the batch of shards #8776)
  5. PrimaryShardBatchAllocator class - draft PR Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards #8916
  6. ReplicaShardBatchAllocator class - need to be raised
  7. GatewayAllocator class - part 1 draft PR (Async Batch shards changes for GatewayAllocator #8746), part 2 draft PR (FetchData changes for primaries and replicas #8865)
  8. AllocationService - actual calling is being done here. partial draft PR (Allocation service changes for batch assignment #8888)

@amkhar
Copy link
Contributor

amkhar commented Jul 26, 2023

@shwetathareja
We need to decide about how to merge the PRs of this bigger project. Order of PRs in my understanding is going to be like this : #5098 (comment)

@andrross
Copy link
Member

andrross commented Jul 27, 2023

This is the order of PRs which I think we'll follow for merging in main.

@amkhar What is the backport plan? Will each of these PRs be safe to backport to 2.x immediately upon merging to main? Also, can you tag these issues/PRs with the intended version (like v2.10.0), or let me know if you don't have permissions to add labels.

@amkhar
Copy link
Contributor

amkhar commented Jul 28, 2023

@amkhar What is the backport plan? Will each of these PRs be safe to backport to 2.x immediately upon merging to main?

@andrross
Yes, as most of the PRs are new classes, it should be safe to merge them to 2.x. Even the last PR which will contain the triggering code, should be safe to backport.
If I realize any hiccups, will bring that to attention.

Also, can you tag these issues/PRs with the intended version (like v2.10.0), or let me know if you don't have permissions to add labels.

I've added all subtasks of this bigger project here :

I don't have permissions to add labels, yes intended version is 2.10.0. Please attach that label or add permissions for me to attach labels.

@amkhar
Copy link
Contributor

amkhar commented Jul 28, 2023

@vikasvb90 @dhwanilpatel - requesting for feedback on the PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 🆕 New
Development

No branches or pull requests

8 participants