Skip to content

Commit

Permalink
[Feature/extensions] Adding support for Extension Create Components […
Browse files Browse the repository at this point in the history
…environment settings and add setting update consumers] (opensearch-project#138)

* Added support for enviornment settings requests and unit tests

Signed-off-by: Joshua Palis <[email protected]>

* updated environment settings support for create components

Signed-off-by: Joshua Palis <[email protected]>

* Updating javadocs for environmentsetting request/ response handlers

Signed-off-by: Joshua Palis <[email protected]>

* Adding inital support for AddSettingsUpdateConsumer

Signed-off-by: Joshua Palis <[email protected]>

* Updated implementation in preparation for getSettings support integration

Signed-off-by: Joshua Palis <[email protected]>

* Updated sendAddSettingsUpdateConsumerRequest to pull settings list directly from setting update consumer registry

Signed-off-by: Joshua Palis <[email protected]>

* Integrated writeable settings support

Signed-off-by: Joshua Palis <[email protected]>

* Updating writeable settings integration for setting type

Signed-off-by: Joshua Palis <[email protected]>

* Replacing AddSettingsUpdateConsumerResponseHandler with ExtensionBooleanResponseHandler, moving handleUpdateSettings method to seperate class

Signed-off-by: Joshua Palis <[email protected]>

* refactoring sendAddSettingsUpdateConsumerRequest. Now each component will only need to provide a map of setting and consumer, and upon sending the addsettingupdateconsumer request, this map will be registered within the updateSettingsRequestHandler

Signed-off-by: Joshua Palis <[email protected]>

* simplifying settingUpdateConsumer registration

Signed-off-by: Joshua Palis <[email protected]>

* Fixing javadocs

Signed-off-by: Joshua Palis <[email protected]>

* fixing javadocs

Signed-off-by: Joshua Palis <[email protected]>

* adding missing javadocs

Signed-off-by: Joshua Palis <[email protected]>

* fixing javadocs

Signed-off-by: Joshua Palis <[email protected]>

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored and kokibas committed Mar 17, 2023
1 parent 22ca0ef commit ba3e70f
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 1 deletion.
89 changes: 89 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.EnvironmentSettingsRequest;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
Expand All @@ -46,7 +50,10 @@
import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionBooleanResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -59,12 +66,15 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.function.Consumer;

import static java.util.Collections.emptySet;
import static org.opensearch.common.UUIDs.randomBase64UUID;
Expand All @@ -81,6 +91,7 @@ public class ExtensionsRunner {

private String uniqueId;
private DiscoveryNode opensearchNode;
private DiscoveryExtension extensionNode;
private TransportService extensionTransportService = null;
// The routes and classes which handle the REST requests
private final ExtensionRestPathRegistry extensionRestPathRegistry = new ExtensionRestPathRegistry();
Expand All @@ -96,6 +107,10 @@ public class ExtensionsRunner {
* https://github.com/opensearch-project/opensearch-sdk-java/issues/119
*/
private TransportActions transportActions = new TransportActions(new HashMap<>());
/**
* Instantiates a new update settings request handler
*/
UpdateSettingsRequestHandler updateSettingsRequestHandler = new UpdateSettingsRequestHandler();

/**
* Instantiates a new Extensions Runner using test settings.
Expand Down Expand Up @@ -161,6 +176,10 @@ private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

private void setExtensionNode(DiscoveryExtension extensionNode) {
this.extensionNode = extensionNode;
}

DiscoveryNode getOpensearchNode() {
return opensearchNode;
}
Expand All @@ -181,6 +200,7 @@ InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequ
} finally {
// After sending successful response to initialization, send the REST API and Settings
setOpensearchNode(opensearchNode);
setExtensionNode(extensionInitRequest.getExtension());
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
sendRegisterCustomSettingsRequest(extensionTransportService);
Expand Down Expand Up @@ -409,6 +429,15 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(handleRestExecuteOnExtensionRequest(request)))
);

transportService.registerRequestHandler(
ExtensionsOrchestrator.REQUEST_EXTENSION_UPDATE_SETTINGS,
ThreadPool.Names.GENERIC,
false,
false,
UpdateSettingsRequest::new,
((request, channel, task) -> channel.sendResponse(updateSettingsRequestHandler.handleUpdateSettingsRequest(request)))
);

}

/**
Expand Down Expand Up @@ -536,6 +565,66 @@ public void sendActionListenerOnFailureRequest(TransportService transportService
}
}

/**
* Requests the environment setting values from OpenSearch for the corresponding component settings. The result will be handled by a {@link EnvironmentSettingsResponseHandler}.
*
* @param componentSettings The component setting that correspond to the values provided by the environment settings
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendEnvironmentSettingsRequest(TransportService transportService, List<Setting<?>> componentSettings) {
logger.info("Sending Environment Settings request to OpenSearch");
EnvironmentSettingsResponseHandler environmentSettingsResponseHandler = new EnvironmentSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS,
new EnvironmentSettingsRequest(componentSettings),
environmentSettingsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Environment Settings request to OpenSearch", e);
}
}

/**
* Registers settings and setting consumers with the {@link UpdateSettingsRequestHandler} and then sends a request to OpenSearch to register these Setting objects with a callback to this extension.
* The result will be handled by a {@link ExtensionBooleanResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
* @param settingUpdateConsumers A map of setting objects and their corresponding consumers
* @throws Exception if there are no setting update consumers within the settingUpdateConsumers map
*/
public void sendAddSettingsUpdateConsumerRequest(TransportService transportService, Map<Setting<?>, Consumer<?>> settingUpdateConsumers)
throws Exception {
logger.info("Sending Add Settings Update Consumer request to OpenSearch");

// Determine if there are setting update consumers to be registered
if (settingUpdateConsumers.isEmpty()) {
throw new Exception("There are no setting update consumers to be registered");
} else {

// Register setting update consumers to UpdateSettingsRequestHandler
this.updateSettingsRequestHandler.registerSettingUpdateConsumer(settingUpdateConsumers);

// Extract registered settings from setting update consumer map
List<Setting<?>> componentSettings = new ArrayList<>(settingUpdateConsumers.size());
componentSettings.addAll(settingUpdateConsumers.keySet());

ExtensionBooleanResponseHandler extensionBooleanResponseHandler = new ExtensionBooleanResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_ADD_SETTINGS_UPDATE_CONSUMER,
new AddSettingsUpdateConsumerRequest(this.extensionNode, componentSettings),
extensionBooleanResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Add Settings Update Consumer request to OpenSearch", e);
}
}

}

private Settings getSettings() {
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendEnvironmentSettingsRequest} call.
*/
public class EnvironmentSettingsResponseHandler implements TransportResponseHandler<EnvironmentSettingsResponse> {
private static final Logger logger = LogManager.getLogger(EnvironmentSettingsResponseHandler.class);

@Override
public void handleResponse(EnvironmentSettingsResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("EnvironmentSettingsRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public EnvironmentSettingsResponse read(StreamInput in) throws IOException {
return new EnvironmentSettingsResponse(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.WriteableSetting;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.extensions.UpdateSettingsRequest;

/**
* Handles requests to update settings
*/
public class UpdateSettingsRequestHandler {

private static final Logger logger = LogManager.getLogger(UpdateSettingsRequestHandler.class);

private Map<Setting<?>, Consumer<?>> settingUpdateConsumers;

/**
* Instantiates a new Update Setting Request Handler
*/
public UpdateSettingsRequestHandler() {
this.settingUpdateConsumers = new HashMap<>();
}

/**
* Registers the component {@link Setting} and the corresponding consumer to the settingsUpdateConsumer map.
* This map is used only when handling {@link UpdateSettingsRequest}
*
* @param settingUpdateConsumers The settings and their corresponding update consumers to register
*/
public void registerSettingUpdateConsumer(Map<Setting<?>, Consumer<?>> settingUpdateConsumers) {
this.settingUpdateConsumers.putAll(settingUpdateConsumers);
}

/**
* Handles a request to update a setting from OpenSearch. Extensions must register their setting keys and consumers within the settingUpdateConsumer map
*
* @param updateSettingsRequest The request to handle.
* @return A response acknowledging the request.
*/
@SuppressWarnings("unchecked")
public ExtensionBooleanResponse handleUpdateSettingsRequest(UpdateSettingsRequest updateSettingsRequest) {

logger.info("Registering UpdateSettingsRequest received from OpenSearch");

boolean settingUpdateStatus = true;

WriteableSetting.SettingType settingType = updateSettingsRequest.getSettingType();
Setting<?> componentSetting = updateSettingsRequest.getComponentSetting();
Object data = updateSettingsRequest.getData();

// Setting updater in OpenSearch performs setting change validation, only need to cast the consumer to the corresponding type and
// invoke the consumer
try {
switch (settingType) {
case Boolean:
Consumer<Boolean> boolConsumer = (Consumer<Boolean>) this.settingUpdateConsumers.get(componentSetting);
boolConsumer.accept(Boolean.parseBoolean(data.toString()));
case Integer:
Consumer<Integer> intConsumer = (Consumer<Integer>) this.settingUpdateConsumers.get(componentSetting);
intConsumer.accept(Integer.parseInt(data.toString()));
case Long:
Consumer<Long> longConsumer = (Consumer<Long>) this.settingUpdateConsumers.get(componentSetting);
longConsumer.accept(Long.parseLong(data.toString()));
case Float:
Consumer<Float> floatConsumer = (Consumer<Float>) this.settingUpdateConsumers.get(componentSetting);
floatConsumer.accept(Float.parseFloat(data.toString()));
case Double:
Consumer<Double> doubleConsumer = (Consumer<Double>) this.settingUpdateConsumers.get(componentSetting);
doubleConsumer.accept(Double.parseDouble(data.toString()));
case String:
Consumer<String> stringConsumer = (Consumer<String>) this.settingUpdateConsumers.get(componentSetting);
stringConsumer.accept(data.toString());
case TimeValue:
Consumer<TimeValue> timeValueConsumer = (Consumer<TimeValue>) this.settingUpdateConsumers.get(componentSetting);
timeValueConsumer.accept(TimeValue.parseTimeValue(data.toString(), componentSetting.getKey()));
case ByteSizeValue:
Consumer<ByteSizeValue> byteSizeValueConsumer = (Consumer<ByteSizeValue>) this.settingUpdateConsumers.get(
componentSetting
);
byteSizeValueConsumer.accept(ByteSizeValue.parseBytesSizeValue(data.toString(), componentSetting.getKey()));
case Version:
Consumer<Version> versionConsumer = (Consumer<Version>) this.settingUpdateConsumers.get(componentSetting);
versionConsumer.accept((Version) data);
default:
throw new UnsupportedOperationException("Setting Update Consumer type does not exist and is not handled here");
}
} catch (Exception e) {
logger.info(e.getMessage());
settingUpdateStatus = false;
}

return new ExtensionBooleanResponse(settingUpdateStatus);
}
}
Loading

0 comments on commit ba3e70f

Please sign in to comment.