-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
AsyncShardFetchCache.java
317 lines (280 loc) · 11.8 KB
/
AsyncShardFetchCache.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.gateway;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.transport.ReceiveTimeoutTransportException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import reactor.util.annotation.NonNull;
/**
* AsyncShardFetchCache will operate on the node level cache which is map of String and BaseNodeEntry. initData,
* putData and getData needs to be called for all the nodes. This class is responsible for managing the flow for all
* the nodes.
* It'll also give useful insights like how many ongoing fetches are happening, how many nodes are left for fetch or
* mark some node in fetching mode. All of these functionalities require checking the cache information and respond
* accordingly.
* <p>
* initData : how to initialize an entry of shard cache for a node.
* putData : how to store the response of transport action in the cache.
* getData : how to get the stored data for any shard allocators like {@link PrimaryShardAllocator} or
* {@link ReplicaShardAllocator}
* deleteShard : how to clean up the stored data from cache for a shard.
*
* @param <K> Response type of transport action which has the data to be stored in the cache.
*
* @opensearch.internal
*/
public abstract class AsyncShardFetchCache<K extends BaseNodeResponse> {
private final Logger logger;
private final String type;
protected AsyncShardFetchCache(Logger logger, String type) {
this.logger = logger;
this.type = type;
}
abstract void initData(DiscoveryNode node);
abstract void putData(DiscoveryNode node, K response);
abstract K getData(DiscoveryNode node);
@NonNull
abstract Map<String, ? extends BaseNodeEntry> getCache();
/**
* Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will
* automatically be cleaned up once shards are assigned.
*
* @param shardId for which we need to free up the cached data.
*/
abstract void deleteShard(ShardId shardId);
/**
* Returns the number of fetches that are currently ongoing.
*/
int getInflightFetches() {
int count = 0;
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.isFetching()) {
count++;
}
}
return count;
}
/**
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
* it nodes that are no longer part of the state.
*/
void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
// verify that all current data nodes are there
for (final DiscoveryNode node : nodes.getDataNodes().values()) {
if (getCache().containsKey(node.getId()) == false) {
initData(node);
}
}
// remove nodes that are not longer part of the data nodes set
getCache().keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId));
}
/**
* Finds all the nodes that need to be fetched. Those are nodes that have no
* data, and are not in fetch mode.
*/
List<String> findNodesToFetch() {
List<String> nodesToFetch = new ArrayList<>();
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
nodesToFetch.add(nodeEntry.getNodeId());
}
}
return nodesToFetch;
}
/**
* Are there any nodes that are fetching data?
*/
boolean hasAnyNodeFetching() {
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.isFetching()) {
return true;
}
}
return false;
}
/**
* Get the data from cache, ignore the failed entries. Use getData functional interface to get the data, as
* different implementations may have different ways to populate the data from cache.
*
* @param nodes Discovery nodes for which we need to return the cache data.
* @param failedNodes return failedNodes with the nodes where fetch has failed.
* @return Map of cache data for every DiscoveryNode.
*/
Map<DiscoveryNode, K> getCacheData(DiscoveryNodes nodes, Set<String> failedNodes) {
Map<DiscoveryNode, K> fetchData = new HashMap<>();
for (Iterator<? extends Map.Entry<String, ? extends BaseNodeEntry>> it = getCache().entrySet().iterator(); it.hasNext();) {
Map.Entry<String, BaseNodeEntry> entry = (Map.Entry<String, BaseNodeEntry>) it.next();
String nodeId = entry.getKey();
BaseNodeEntry nodeEntry = entry.getValue();
DiscoveryNode node = nodes.get(nodeId);
if (node != null) {
if (nodeEntry.isFailed()) {
// if its failed, remove it from the list of nodes, so if this run doesn't work
// we try again next round to fetch it again
it.remove();
failedNodes.add(nodeEntry.getNodeId());
} else {
K nodeResponse = getData(node);
if (nodeResponse != null) {
fetchData.put(node, nodeResponse);
}
}
}
}
return fetchData;
}
void processResponses(List<K> responses, long fetchingRound) {
for (K response : responses) {
BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
if (nodeEntry != null) {
if (validateNodeResponse(nodeEntry, fetchingRound)) {
// if the entry is there, for the right fetching round and not marked as failed already, process it
logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response);
putData(response.getNode(), response);
}
}
}
}
private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) {
if (nodeEntry.getFetchingRound() != fetchingRound) {
assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
logger.trace(
"received response for [{}] from node {} for an older fetching round (expected: {} but was: {})",
nodeEntry.getNodeId(),
type,
nodeEntry.getFetchingRound(),
fetchingRound
);
return false;
} else if (nodeEntry.isFailed()) {
logger.trace("node {} has failed for [{}] (failure [{}])", nodeEntry.getNodeId(), type, nodeEntry.getFailure());
return false;
}
return true;
}
private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) {
if (nodeEntry.getFetchingRound() != fetchingRound) {
assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
logger.trace(
"received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})",
nodeEntry.getNodeId(),
type,
nodeEntry.getFetchingRound(),
fetchingRound
);
} else if (nodeEntry.isFailed() == false) {
// if the entry is there, for the right fetching round and not marked as failed already, process it
Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
// if the request got rejected or timed out, we need to try it again next time...
if (retryableException(unwrappedCause)) {
nodeEntry.restartFetching();
} else {
logger.warn(() -> new ParameterizedMessage("failed to list shard for {} on node [{}]", type, failure.nodeId()), failure);
nodeEntry.doneFetching(failure.getCause());
}
}
}
boolean retryableException(Throwable unwrappedCause) {
return unwrappedCause instanceof OpenSearchRejectedExecutionException
|| unwrappedCause instanceof ReceiveTimeoutTransportException
|| unwrappedCause instanceof OpenSearchTimeoutException;
}
void processFailures(List<FailedNodeException> failures, long fetchingRound) {
for (FailedNodeException failure : failures) {
logger.trace("processing failure {} for [{}]", failure, type);
BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
if (nodeEntry != null) {
handleNodeFailure(nodeEntry, failure, fetchingRound);
}
}
}
/**
* Common function for removing whole node entry.
*
* @param nodeId nodeId to be cleaned.
*/
void remove(String nodeId) {
this.getCache().remove(nodeId);
}
void markAsFetching(List<String> nodeIds, long fetchingRound) {
for (String nodeId : nodeIds) {
getCache().get(nodeId).markAsFetching(fetchingRound);
}
}
/**
* A node entry, holding only node level fetching related information.
* Actual metadata of shard is stored in child classes.
*/
static class BaseNodeEntry {
private final String nodeId;
private boolean fetching;
private boolean valueSet;
private Throwable failure;
private long fetchingRound;
BaseNodeEntry(String nodeId) {
this.nodeId = nodeId;
}
String getNodeId() {
return this.nodeId;
}
boolean isFetching() {
return fetching;
}
void markAsFetching(long fetchingRound) {
assert fetching == false : "double marking a node as fetching";
this.fetching = true;
this.fetchingRound = fetchingRound;
}
void doneFetching() {
assert fetching : "setting value but not in fetching mode";
assert failure == null : "setting value when failure already set";
this.valueSet = true;
this.fetching = false;
}
void doneFetching(Throwable failure) {
assert fetching : "setting value but not in fetching mode";
assert valueSet == false : "setting failure when already set value";
assert failure != null : "setting failure can't be null";
this.failure = failure;
this.fetching = false;
}
void restartFetching() {
assert fetching : "restarting fetching, but not in fetching mode";
assert valueSet == false : "value can't be set when restarting fetching";
assert failure == null : "failure can't be set when restarting fetching";
this.fetching = false;
}
boolean isFailed() {
return failure != null;
}
boolean hasData() {
return valueSet || failure != null;
}
Throwable getFailure() {
assert hasData() : "getting failure when data has not been fetched";
return failure;
}
long getFetchingRound() {
return fetchingRound;
}
}
}