Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Adding support for Extension Create Components [environment settings and add setting update consumers] #138

Merged
merged 20 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
953d5f7
Added support for enviornment settings requests and unit tests
joshpalis Sep 8, 2022
13e7358
Merge branch 'main' into create-components
joshpalis Sep 8, 2022
0ce5b38
updated environment settings support for create components
joshpalis Sep 12, 2022
7f5e945
Updating javadocs for environmentsetting request/ response handlers
joshpalis Sep 12, 2022
0915c5e
Merge branch 'main' into create-components
joshpalis Sep 13, 2022
71520fb
Adding inital support for AddSettingsUpdateConsumer
joshpalis Sep 14, 2022
8fc8992
Updated implementation in preparation for getSettings support integra…
joshpalis Sep 15, 2022
0eddec4
Updated sendAddSettingsUpdateConsumerRequest to pull settings list di…
joshpalis Sep 16, 2022
6700813
Integrated writeable settings support
joshpalis Sep 16, 2022
b084c20
Updating writeable settings integration for setting type
joshpalis Sep 19, 2022
349eeb2
rebase with main
joshpalis Sep 19, 2022
4119194
Merge branch 'main' into create-components
joshpalis Sep 20, 2022
0b24a01
resolving import merge conflict
joshpalis Sep 23, 2022
3f25da7
Replacing AddSettingsUpdateConsumerResponseHandler with ExtensionBool…
joshpalis Sep 23, 2022
9c779f0
refactoring sendAddSettingsUpdateConsumerRequest. Now each component …
joshpalis Sep 23, 2022
4d94bd5
simplifying settingUpdateConsumer registration
joshpalis Sep 23, 2022
9f46643
Fixing javadocs
joshpalis Sep 23, 2022
e7624bc
fixing javadocs
joshpalis Sep 23, 2022
3540be3
adding missing javadocs
joshpalis Sep 23, 2022
4a0335c
fixing javadocs
joshpalis Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.EnvironmentSettingsRequest;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
Expand All @@ -41,8 +46,10 @@
import org.opensearch.rest.RestResponse;
import org.opensearch.transport.netty4.Netty4Transport;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.sdk.handlers.AddSettingsUpdateConsumerResponseHandler;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.RegisterRestActionsResponseHandler;
import org.opensearch.search.SearchModule;
Expand All @@ -53,15 +60,21 @@
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.TransportResponse;
import org.opensearch.common.settings.WriteableSetting;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.function.Consumer;

import static java.util.Collections.emptySet;
import static org.opensearch.common.UUIDs.randomBase64UUID;
Expand All @@ -78,6 +91,7 @@ public class ExtensionsRunner {

private String uniqueId;
private DiscoveryNode opensearchNode;
private DiscoveryExtension extensionNode;
private TransportService extensionTransportService = null;
private ExtensionRestPathRegistry extensionRestPathRegistry = new ExtensionRestPathRegistry();

Expand All @@ -90,6 +104,7 @@ public class ExtensionsRunner {
* https://github.com/opensearch-project/opensearch-sdk-java/issues/119
*/
private TransportActions transportActions = new TransportActions(new HashMap<>());
private Map<Setting<?>, Consumer<?>> settingUpdateConsumers = new HashMap<>();

/**
* Instantiates a new Extensions Runner using test settings.
Expand Down Expand Up @@ -152,6 +167,10 @@ private void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

private void setExtensionNode(DiscoveryExtension extensionNode) {
this.extensionNode = extensionNode;
}

DiscoveryNode getOpensearchNode() {
return opensearchNode;
}
Expand All @@ -172,6 +191,7 @@ InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequ
} finally {
// After sending successful response to initialization, send the REST API
setOpensearchNode(opensearchNode);
setExtensionNode(extensionInitRequest.getExtension());
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
transportActions.sendRegisterTransportActionsRequest(extensionTransportService, opensearchNode);
Expand Down Expand Up @@ -222,6 +242,66 @@ ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest ind
return indicesModuleNameResponse;
}

/**
* Handles a request to update a setting from OpenSearch. Extensions must register their setting keys and consumers within the settingUpdateConsumer map
*
* @param updateSettingsRequest The request to handle.
* @return A response acknowledging the request.
*/
@SuppressWarnings("unchecked")
ExtensionBooleanResponse handleUpdateSettingsRequest(UpdateSettingsRequest updateSettingsRequest) {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Registering UpdateSettingsRequest received from OpenSearch");

boolean settingUpdateStatus = true;

WriteableSetting.WriteableSettingGenericType settingType = updateSettingsRequest.getSettingType();
Setting<?> componentSetting = updateSettingsRequest.getComponentSetting();
Object data = updateSettingsRequest.getData();

// Setting updater in OpenSearch performs setting change validation, only need to cast the consumer to the corresponding type and
// invoke the consumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a request to change anything in your PR but a note for awareness. The original validator and parser are lost in the existing setup, so if there was some extra validation step (for example, minimum value integer) it might not properly validate on the OpenSearch side. That's fine for now as it will eventually be handled by #154.

try {
switch (settingType) {
case Boolean:
Consumer<Boolean> boolConsumer = (Consumer<Boolean>) this.settingUpdateConsumers.get(componentSetting);
boolConsumer.accept(Boolean.parseBoolean(data.toString()));
case Integer:
Consumer<Integer> intConsumer = (Consumer<Integer>) this.settingUpdateConsumers.get(componentSetting);
intConsumer.accept(Integer.parseInt(data.toString()));
case Long:
Consumer<Long> longConsumer = (Consumer<Long>) this.settingUpdateConsumers.get(componentSetting);
longConsumer.accept(Long.parseLong(data.toString()));
case Float:
Consumer<Float> floatConsumer = (Consumer<Float>) this.settingUpdateConsumers.get(componentSetting);
floatConsumer.accept(Float.parseFloat(data.toString()));
case Double:
Consumer<Double> doubleConsumer = (Consumer<Double>) this.settingUpdateConsumers.get(componentSetting);
doubleConsumer.accept(Double.parseDouble(data.toString()));
case String:
Consumer<String> stringConsumer = (Consumer<String>) this.settingUpdateConsumers.get(componentSetting);
stringConsumer.accept(data.toString());
case TimeValue:
Consumer<TimeValue> timeValueConsumer = (Consumer<TimeValue>) this.settingUpdateConsumers.get(componentSetting);
timeValueConsumer.accept(TimeValue.parseTimeValue(data.toString(), componentSetting.getKey()));
case ByteSizeValue:
Consumer<ByteSizeValue> byteSizeValueConsumer = (Consumer<ByteSizeValue>) this.settingUpdateConsumers.get(
componentSetting
);
byteSizeValueConsumer.accept(ByteSizeValue.parseBytesSizeValue(data.toString(), componentSetting.getKey()));
case Version:
Consumer<Version> versionConsumer = (Consumer<Version>) this.settingUpdateConsumers.get(componentSetting);
versionConsumer.accept((Version) data);
default:
throw new UnsupportedOperationException("Setting Update Consumer type does not exist and is not handled here");
}
} catch (Exception e) {
logger.info(e.getMessage());
settingUpdateStatus = false;
}

return new ExtensionBooleanResponse(settingUpdateStatus);
}

/**
* Handles a request from OpenSearch to execute a REST request on the extension.
*
Expand Down Expand Up @@ -392,6 +472,15 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(handleRestExecuteOnExtensionRequest(request)))
);

transportService.registerRequestHandler(
ExtensionsOrchestrator.REQUEST_EXTENSION_UPDATE_SETTINGS,
ThreadPool.Names.GENERIC,
false,
false,
UpdateSettingsRequest::new,
((request, channel, task) -> channel.sendResponse(handleUpdateSettingsRequest(request)))
);

}

/**
Expand Down Expand Up @@ -475,6 +564,74 @@ public void sendLocalNodeRequest(TransportService transportService) {
}
}

/**
* Requests the environment setting values from OpenSearch for the corresponding component settings. The result will be handled by a {@link EnvironmentSettingsResponseHandler}.
*
* @param componentSettings The component setting that correspond to the values provided by the environment settings
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendEnvironmentSettingsRequest(TransportService transportService, List<Setting<?>> componentSettings) {
logger.info("Sending Environment Settings request to OpenSearch");
EnvironmentSettingsResponseHandler environmentSettingsResponseHandler = new EnvironmentSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_ENVIRONMENT_SETTINGS,
new EnvironmentSettingsRequest(componentSettings),
environmentSettingsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Environment Settings request to OpenSearch", e);
}
}

/**
* Registers the component {@link Setting} and the corresponding consumer to the settingsUpdateConsumer map.
* Extensions will use this method instead of invoking ClusterSettings#addSettingsUpdateConsumer
*
* @param componentSetting The component setting associated with the consumer
* @param consumer The setting update consumer associated with the component setting
*/
public <T> void registerSettingUpdateConsumer(Setting<T> componentSetting, Consumer<T> consumer) {
this.settingUpdateConsumers.put(componentSetting, consumer);
}

/**
* Requests the environment setting values from OpenSearch for the corresponding component settings.
* This should be called after extension components have registered all their setting update consumers with the settingUpdateConsumers map.
* The result will be handled by a {@link EnvironmentSettingsResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
* @throws Exception if there are no setting update consumers registered within the settingUpdateConsumers map
*/
public void sendAddSettingsUpdateConsumerRequest(TransportService transportService) throws Exception {
logger.info("Sending Add Settings Update Consumer request to OpenSearch");

// Determine if setting update consumers have been registered
if (this.settingUpdateConsumers.isEmpty()) {
throw new Exception("There are no setting update consumers registered");
} else {

// Extract registered settings from setting update consumer map
List<Setting<?>> componentSettings = new ArrayList<>(this.settingUpdateConsumers.size());
componentSettings.addAll(this.settingUpdateConsumers.keySet());

AddSettingsUpdateConsumerResponseHandler addSettingsUpdateConsumerResponseHandler =
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
new AddSettingsUpdateConsumerResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_ADD_SETTINGS_UPDATE_CONSUMER,
new AddSettingsUpdateConsumerRequest(this.extensionNode, componentSettings),
addSettingsUpdateConsumerResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Add Settings Update Consumer request to OpenSearch", e);
}
}

}

private Settings getSettings() {
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendAddSettingsUpdateConsumerRequest} call.
*/
public class AddSettingsUpdateConsumerResponseHandler implements TransportResponseHandler<ExtensionBooleanResponse> {
private static final Logger logger = LogManager.getLogger(AddSettingsUpdateConsumerResponseHandler.class);

@Override
public void handleResponse(ExtensionBooleanResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("AddSettingsUpdateConsumerResponse failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public ExtensionBooleanResponse read(StreamInput in) throws IOException {
return new ExtensionBooleanResponse(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendEnvironmentSettingsRequest} call.
*/
public class EnvironmentSettingsResponseHandler implements TransportResponseHandler<EnvironmentSettingsResponse> {
private static final Logger logger = LogManager.getLogger(EnvironmentSettingsResponseHandler.class);

@Override
public void handleResponse(EnvironmentSettingsResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("EnvironmentSettingsRequest failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public EnvironmentSettingsResponse read(StreamInput in) throws IOException {
return new EnvironmentSettingsResponse(in);
}
}
Loading