-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathBaseRestHandler.java
339 lines (303 loc) · 13.2 KB
/
BaseRestHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.rest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.spell.LevenshteinDistance;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.rest.action.admin.cluster.RestNodesUsageAction;
import org.opensearch.tasks.Task;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
/**
* Base handler for REST requests.
* <p>
* This handler makes sure that the headers & context of the handled {@link RestRequest requests} are copied over to
* the transport requests executed by the associated client. While the context is fully copied over, not all the headers
* are copied, but a selected few. It is possible to control what headers are copied over by returning them in
* {@link ActionPlugin#getRestHeaders()}.
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public abstract class BaseRestHandler implements RestHandler {
public static final Setting<Boolean> MULTI_ALLOW_EXPLICIT_INDEX = Setting.boolSetting(
"rest.action.multi.allow_explicit_index",
true,
Property.NodeScope
);
private final LongAdder usageCount = new LongAdder();
/**
* @deprecated declare your own logger.
*/
@Deprecated
protected Logger logger = LogManager.getLogger(getClass());
public final long getUsageCount() {
return usageCount.sum();
}
/**
* @return the name of this handler. The name should be human readable and
* should describe the action that will performed when this API is
* called. This name is used in the response to the
* {@link RestNodesUsageAction}.
*/
public abstract String getName();
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams = request.unconsumedParams()
.stream()
.filter(p -> !responseParams().contains(p))
.collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
if (request.hasContent() && request.isContentConsumed() == false) {
throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
}
usageCount.increment();
// execute the action
action.accept(channel);
}
public static String unrecognizedStrings(
final RestRequest request,
final Set<String> invalids,
final Set<String> candidates,
final String detail
) {
StringBuilder message = new StringBuilder(
String.format(Locale.ROOT, "request [%s] contains unrecognized %s%s: ", request.path(), detail, invalids.size() > 1 ? "s" : "")
);
boolean first = true;
for (final String invalid : invalids) {
final LevenshteinDistance ld = new LevenshteinDistance();
final List<Tuple<Float, String>> scoredParams = new ArrayList<>();
for (final String candidate : candidates) {
final float distance = ld.getDistance(invalid, candidate);
if (distance > 0.5f) {
scoredParams.add(new Tuple<>(distance, candidate));
}
}
CollectionUtil.timSort(scoredParams, (a, b) -> {
// sort by distance in reverse order, then parameter name for equal distances
int compare = a.v1().compareTo(b.v1());
if (compare != 0) return -compare;
else return a.v2().compareTo(b.v2());
});
if (first == false) {
message.append(", ");
}
message.append("[").append(invalid).append("]");
final List<String> keys = scoredParams.stream().map(Tuple::v2).collect(Collectors.toList());
if (keys.isEmpty() == false) {
message.append(" -> did you mean ");
if (keys.size() == 1) {
message.append("[").append(keys.get(0)).append("]");
} else {
message.append("any of ").append(keys.toString());
}
message.append("?");
}
first = false;
}
return message.toString();
}
/**
* Returns a String message of the detail of any unrecognized error occurred. The string is intended for use in error messages to be returned to the user.
*
* @param request The request that caused the exception
* @param invalids Strings from the request which were unable to be understood.
* @param candidates A set of words that are most likely to be the valid strings determined invalid, to be suggested to the user.
* @param detail The parameter contains the details of the exception.
* @return a String that contains the message.
*/
protected final String unrecognized(
final RestRequest request,
final Set<String> invalids,
final Set<String> candidates,
final String detail
) {
return unrecognizedStrings(request, invalids, candidates, detail);
}
/**
* REST requests are handled by preparing a channel consumer that represents the execution of
* the request against a channel.
*
* @opensearch.api
*/
@FunctionalInterface
@PublicApi(since = "1.0.0")
protected interface RestChannelConsumer extends CheckedConsumer<RestChannel, Exception> {}
/**
* Prepare the request for execution. Implementations should consume all request params before
* returning the runnable for actual execution. Unconsumed params will immediately terminate
* execution of the request. However, some params are only used in processing the response;
* implementations can override {@link BaseRestHandler#responseParams()} to indicate such
* params.
*
* @param request the request to execute
* @param client client for executing actions on the local node
* @return the action to execute
* @throws IOException if an I/O exception occurred parsing the request and preparing for
* execution
*/
protected abstract RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException;
/**
* Parameters used for controlling the response and thus might not be consumed during
* preparation of the request execution in
* {@link BaseRestHandler#prepareRequest(RestRequest, NodeClient)}.
*
* @return a set of parameters used to control the response and thus should not trip strict
* URL parameter checks.
*/
protected Set<String> responseParams() {
return Collections.emptySet();
}
/**
* Parse the deprecated request parameter 'master_timeout', and add deprecated log if the parameter is used.
* It also validates whether the two parameters 'master_timeout' and 'cluster_manager_timeout' are not assigned together.
* The method is temporarily added in 2.0 duing applying inclusive language. Remove the method along with MASTER_ROLE.
* @param mnr the action request
* @param request the REST request to handle
* @param logger the logger that logs deprecation notices
* @param logMsgKeyPrefix the key prefix of a deprecation message to avoid duplicate messages.
*/
public static void parseDeprecatedMasterTimeoutParameter(
ClusterManagerNodeRequest mnr,
RestRequest request,
DeprecationLogger logger,
String logMsgKeyPrefix
) {
final String MASTER_TIMEOUT_DEPRECATED_MESSAGE =
"Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead.";
final String DUPLICATE_PARAMETER_ERROR_MESSAGE =
"Please only use one of the request parameters [master_timeout, cluster_manager_timeout].";
if (request.hasParam("master_timeout")) {
logger.deprecate(logMsgKeyPrefix + "_master_timeout_parameter", MASTER_TIMEOUT_DEPRECATED_MESSAGE);
if (request.hasParam("cluster_manager_timeout")) {
throw new OpenSearchParseException(DUPLICATE_PARAMETER_ERROR_MESSAGE);
}
mnr.clusterManagerNodeTimeout(request.paramAsTime("master_timeout", mnr.clusterManagerNodeTimeout()));
}
}
/**
* A wrapper for the base handler.
*
* @opensearch.internal
*/
public static class Wrapper extends BaseRestHandler {
protected final BaseRestHandler delegate;
public Wrapper(BaseRestHandler delegate) {
this.delegate = Objects.requireNonNull(delegate, "BaseRestHandler delegate can not be null");
}
@Override
public String getName() {
return delegate.getName();
}
@Override
public List<Route> routes() {
return delegate.routes();
}
@Override
public List<DeprecatedRoute> deprecatedRoutes() {
return delegate.deprecatedRoutes();
}
@Override
public List<ReplacedRoute> replacedRoutes() {
return delegate.replacedRoutes();
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return delegate.prepareRequest(request, client);
}
@Override
protected Set<String> responseParams() {
return delegate.responseParams();
}
@Override
public boolean canTripCircuitBreaker() {
return delegate.canTripCircuitBreaker();
}
@Override
public boolean supportsContentStream() {
return delegate.supportsContentStream();
}
@Override
public boolean allowsUnsafeBuffers() {
return delegate.allowsUnsafeBuffers();
}
@Override
public boolean allowSystemIndexAccessByDefault() {
return delegate.allowSystemIndexAccessByDefault();
}
}
/**
* Return a task immediately when executing some long-running operations asynchronously, like reindex, resize, open, force merge
*/
public RestChannelConsumer sendTask(String nodeId, Task task) {
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", nodeId + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
};
}
}