-
Notifications
You must be signed in to change notification settings - Fork 59
/
ExtensionsRunner.java
342 lines (302 loc) · 14.2 KB
/
ExtensionsRunner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.discovery.PluginRequest;
import org.opensearch.discovery.PluginResponse;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleNameResponse;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.sdk.netty4.Netty4Transport;
import org.opensearch.sdk.netty4.SharedGroupFactory;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptySet;
import static org.opensearch.common.UUIDs.randomBase64UUID;
/**
* The primary class to run an extension.
* <p>
* This class Javadoc will eventually be expanded with a full description/tutorial for users.
*/
public class ExtensionsRunner {
private ExtensionSettings extensionSettings = readExtensionSettings();
private DiscoveryNode opensearchNode;
/**
* Instantiates a new Extensions Runner.
*
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
*/
public ExtensionsRunner() throws IOException {}
private final Settings settings = Settings.builder()
.put("node.name", extensionSettings.getExtensionName())
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostAddress())
.put(TransportSettings.PORT.getKey(), extensionSettings.getHostPort())
.build();
private final Logger logger = LogManager.getLogger(ExtensionsRunner.class);
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};
private ExtensionSettings readExtensionSettings() throws IOException {
File file = new File(ExtensionSettings.EXTENSION_DESCRIPTOR);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
ExtensionSettings extensionSettings = objectMapper.readValue(file, ExtensionSettings.class);
return extensionSettings;
}
private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}
public DiscoveryNode getOpensearchNode() {
return opensearchNode;
}
/**
* Handles a plugin request from OpenSearch. This is the first request for the transport communication and will initialize the extension and will be a part of OpenSearch bootstrap.
*
* @param pluginRequest The request to handle.
* @return A response to OpenSearch validating that this is an extension.
*/
PluginResponse handlePluginsRequest(PluginRequest pluginRequest) {
logger.info("Registering Plugin Request received from OpenSearch");
PluginResponse pluginResponse = new PluginResponse(extensionSettings.getExtensionName());
opensearchNode = pluginRequest.getSourceNode();
setOpensearchNode(opensearchNode);
return pluginResponse;
}
/**
* Handles a request for extension point indices from OpenSearch. The {@link #handlePluginsRequest(PluginRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @param transportService The transport service communicating with OpenSearch.
* @return A response to OpenSearch with this extension's index and search listeners.
*/
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);
// handlePluginsRequest will set the opensearchNode while bootstraping of OpenSearch
DiscoveryNode opensearchNode = getOpensearchNode();
transportService.connectToNode(opensearchNode);
return indicesModuleResponse;
}
/**
* Handles a request for extension name from OpenSearch. The {@link #handlePluginsRequest(PluginRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @return A response acknowledging the request.
*/
IndicesModuleNameResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) {
// Works as beforeIndexRemoved
logger.info("Registering Indices Module Name Request received from OpenSearch");
IndicesModuleNameResponse indicesModuleNameResponse = new IndicesModuleNameResponse(true);
return indicesModuleNameResponse;
}
/**
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
*
* @param settings The transport settings to configure.
* @param threadPool A thread pool to use.
* @return The configured Netty4Transport object.
*/
public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) {
NetworkService networkService = new NetworkService(Collections.emptyList());
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
SearchModule searchModule = new SearchModule(settings, Collections.emptyList());
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
indicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
null,
ClusterModule.getNamedWriteables().stream()
).flatMap(Function.identity()).collect(Collectors.toList());
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
Netty4Transport transport = new Netty4Transport(
settings,
Version.CURRENT,
threadPool,
networkService,
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
new SharedGroupFactory(settings)
);
return transport;
}
/**
* Creates a TransportService object. This object will control communication between the extension and OpenSearch.
*
* @param settings The transport settings to configure.
* @return The configured TransportService object.
*/
public TransportService createTransportService(Settings settings) {
ThreadPool threadPool = new ThreadPool(settings);
Netty4Transport transport = getNetty4Transport(settings, threadPool);
final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport);
// create transport service
return new TransportService(
settings,
transport,
threadPool,
NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put("node.name", extensionSettings.getExtensionName()).build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet(),
connectionManager
);
}
/**
* Starts a TransportService.
*
* @param transportService The TransportService to start.
*/
public void startTransportService(TransportService transportService) {
// start transport service and accept incoming requests
transportService.start();
transportService.acceptIncomingRequests();
// Extension Request is the first request for the transport communication.
// This request will initialize the extension and will be a part of OpenSearch bootstrap
transportService.registerRequestHandler(
ExtensionsOrchestrator.REQUEST_EXTENSION_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
PluginRequest::new,
(request, channel, task) -> channel.sendResponse(handlePluginsRequest(request))
);
transportService.registerRequestHandler(
ExtensionsOrchestrator.INDICES_EXTENSION_POINT_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService)))
);
transportService.registerRequestHandler(
ExtensionsOrchestrator.INDICES_EXTENSION_NAME_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleNameRequest(request)))
);
}
/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}
}
/**
* Requests the cluster settings from OpenSearch. The result will be handled by a {@link ClusterSettingsResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendClusterSettingsRequest(TransportService transportService) {
logger.info("Sending Cluster Settings request to OpenSearch");
ClusterSettingsResponseHandler clusterSettingsResponseHandler = new ClusterSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS),
clusterSettingsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
}
/**
* Requests the local node from OpenSearch. The result will be handled by a {@link LocalNodeResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendLocalNodeRequest(TransportService transportService) {
logger.info("Sending Local Node request to OpenSearch");
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE),
localNodeResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
}
private Settings getSettings() {
return settings;
}
/**
* Starts an ActionListener.
*
* @param timeout The timeout for the listener in milliseconds. A timeout of 0 means no timeout.
*/
public void startActionListener(int timeout) {
final ActionListener actionListener = new ActionListener();
actionListener.runActionListener(true, timeout);
}
/**
* Run the Extension. Imports settings, establishes a connection with an OpenSearch cluster via a Transport Service, and sets up a listener for responses.
*
* @param args Unused
* @throws IOException if the runner failed to connect to the OpenSearch cluster.
*/
public static void main(String[] args) throws IOException {
ExtensionsRunner extensionsRunner = new ExtensionsRunner();
// configure and retrieve transport service with settings
Settings settings = extensionsRunner.getSettings();
TransportService transportService = extensionsRunner.createTransportService(settings);
// start transport service and action listener
extensionsRunner.startTransportService(transportService);
extensionsRunner.startActionListener(0);
}
}