Skip to content

Commit

Permalink
[Feature/extensions] Add getSettings support for AD (#686)
Browse files Browse the repository at this point in the history
* Implement Extension getSettings as copy of Plugin getSettings

Signed-off-by: Daniel Widdis <[email protected]>

* Copy updated HelloWorld action to placeholder RestCreateDetectorAction

Signed-off-by: Daniel Widdis <[email protected]>

* Replace references to deleted BaseNodeRequest with TransportRequest

Signed-off-by: Daniel Widdis <[email protected]>

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Oct 12, 2022
1 parent 74aaa81 commit a14b2f8
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 36 deletions.
50 changes: 50 additions & 0 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
package org.opensearch.ad;

import static java.util.Collections.unmodifiableList;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.ad.rest.RestCreateDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.sdk.Extension;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionSettings;
import org.opensearch.sdk.ExtensionsRunner;

import com.google.common.collect.ImmutableList;

public class AnomalyDetectorExtension implements Extension {

private static final String EXTENSION_SETTINGS_PATH = "/ad-extension.yml";
Expand All @@ -33,6 +42,47 @@ public List<ExtensionRestHandler> getExtensionRestHandlers() {
return List.of(new RestCreateDetectorAction());
}

@Override
public List<Setting<?>> getSettings() {
// Copied from AnomalyDetectorPlugin getSettings
List<Setting<?>> enabledSetting = EnabledSetting.getInstance().getSettings();
List<Setting<?>> systemSetting = ImmutableList
.of(
AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW,
AnomalyDetectorSettings.PAGE_SIZE,
AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD,
AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD,
AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE,
AnomalyDetectorSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE,
AnomalyDetectorSettings.BACKOFF_MINUTES,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_CONCURRENCY,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_BATCH_SIZE,
AnomalyDetectorSettings.COOLDOWN_MINUTES,
AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR,
AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS,
AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE,
AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS,
AnomalyDetectorSettings.REQUEST_TIMEOUT,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.DETECTION_INTERVAL,
AnomalyDetectorSettings.DETECTION_WINDOW_DELAY,
AnomalyDetectorSettings.MAX_SINGLE_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_MULTI_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_ANOMALY_FEATURES
);
return unmodifiableList(
Stream
.of(enabledSetting.stream(), systemSetting.stream())
.reduce(Stream::concat)
.orElseGet(Stream::empty)
.collect(Collectors.toList())
);
}

private static ExtensionSettings initializeSettings() throws IOException {
ExtensionSettings settings = Extension.readSettingsFromYaml(EXTENSION_SETTINGS_PATH);
if (settings == null || settings.getHostAddress() == null || settings.getHostPort() == null) {
Expand Down
123 changes: 101 additions & 22 deletions src/main/java/org/opensearch/ad/rest/RestCreateDetectorAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,132 @@
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.PUT;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.NOT_ACCEPTABLE;
import static org.opensearch.rest.RestStatus.NOT_FOUND;
import static org.opensearch.rest.RestStatus.NOT_MODIFIED;
import static org.opensearch.rest.RestStatus.OK;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.opensearch.common.xcontent.XContentType;
import org.opensearch.extensions.rest.ExtensionRestRequest;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionRestResponse;

public class RestCreateDetectorAction implements ExtensionRestHandler {

private static final String GREETING = "Hello, %s!";
private String worldName = "World";
private List<String> worldAdjectives = new ArrayList<>();
private Random rand = new Random();

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/hello"), new Route(PUT, "/hello/{name}"));
}

@Override
public ExtensionRestResponse handleRequest(Method method, String uri) {
// We need to track which parameters are consumed to pass back to OpenSearch
List<String> consumedParams = new ArrayList<>();
if (Method.GET.equals(method) && "/hello".equals(uri)) {
return new ExtensionRestResponse(OK, String.format(GREETING, worldName), consumedParams);
} else if (Method.PUT.equals(method) && uri.startsWith("/hello/")) {
// Placeholder code here for parameters in named wildcard paths
// Full implementation based on params() will be implemented as part of
// https://github.com/opensearch-project/opensearch-sdk-java/issues/111
String name = uri.substring("/hello/".length());
consumedParams.add("name");
try {
worldName = URLDecoder.decode(name, StandardCharsets.UTF_8);
} catch (IllegalArgumentException e) {
return new ExtensionRestResponse(BAD_REQUEST, e.getMessage(), consumedParams);
public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
Method method = request.method();

if (Method.GET.equals(method)) {
return handleGetRequest(request);
} else if (Method.POST.equals(method)) {
return handlePostRequest(request);
} else if (Method.DELETE.equals(method)) {
return handleDeleteRequest(request);
} else if (Method.PUT.equals(method)) {
return handlePutRequest(request);
}
return handleBadRequest(request);
}

private ExtensionRestResponse handleGetRequest(ExtensionRestRequest request) {
String worldNameWithRandomAdjective = worldAdjectives.isEmpty()
? worldName
: String.join(" ", worldAdjectives.get(rand.nextInt(worldAdjectives.size())), worldName);
return new ExtensionRestResponse(request, OK, String.format(GREETING, worldNameWithRandomAdjective));
}

private ExtensionRestResponse handlePostRequest(ExtensionRestRequest request) {
if (request.hasContent()) {
String adjective = "";
XContentType contentType = request.getXContentType();
if (contentType == null) {
// Plain text
adjective = request.content().utf8ToString();
} else if (contentType.equals(XContentType.JSON)) {
adjective = parseJsonAdjective(request.content().utf8ToString());
} else {
return new ExtensionRestResponse(request, NOT_ACCEPTABLE, "Only text and JSON content types are supported");
}
if (!adjective.isBlank()) {
worldAdjectives.add(adjective);
return new ExtensionRestResponse(request, OK, "Added " + adjective + " to words that describe the world!");
}
return new ExtensionRestResponse(request, BAD_REQUEST, "No adjective included with POST request");
}
return new ExtensionRestResponse(request, BAD_REQUEST, "No content included with POST request");
}

private ExtensionRestResponse handleDeleteRequest(ExtensionRestRequest request) {
if (request.hasContent()) {
String adjective = "";
XContentType contentType = request.getXContentType();
if (contentType == null) {
// Plain text
adjective = request.content().utf8ToString();
} else if (contentType.equals(XContentType.JSON)) {
adjective = parseJsonAdjective(request.content().utf8ToString());
} else {
return new ExtensionRestResponse(request, NOT_ACCEPTABLE, "Only text and JSON content types are supported");
}
if (!adjective.isBlank()) {
if (worldAdjectives.remove(adjective)) {
return new ExtensionRestResponse(request, OK, "Goodbye, " + adjective + " world!");
}
return new ExtensionRestResponse(request, NOT_MODIFIED, "");
}
return new ExtensionRestResponse(OK, "Updated the world's name to " + worldName, consumedParams);
return new ExtensionRestResponse(request, BAD_REQUEST, "No adjective included with DELETE request");
}
return new ExtensionRestResponse(request, BAD_REQUEST, "No content included with DELETE request");
}

private ExtensionRestResponse handlePutRequest(ExtensionRestRequest request) {
String name = request.param("name");
try {
worldName = URLDecoder.decode(name, StandardCharsets.UTF_8);
} catch (IllegalArgumentException e) {
return new ExtensionRestResponse(request, BAD_REQUEST, e.getMessage());
}
return new ExtensionRestResponse(
NOT_FOUND,
"Extension REST action improperly configured to handle " + method.name() + " " + uri,
consumedParams
);
return new ExtensionRestResponse(request, OK, "Updated the world's name to " + worldName);
}

private ExtensionRestResponse handleBadRequest(ExtensionRestRequest request) {
return new ExtensionRestResponse(request, NOT_FOUND, "Extension REST action improperly configured to handle " + request.toString());
}

private String parseJsonAdjective(String json) {
// TODO: Once CreateComponents has an XContentRegistry available we can parse from there
// For now we just hack our way into the result.
boolean foundLabel = false;
boolean foundColon = false;
for (String s : json.split("\"")) {
if (!foundLabel) {
foundLabel = "adjective".equals(s);
} else if (!foundColon) {
foundColon = s.contains(":");
} else {
// This is the adjective!
return s;
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

public class ADCancelTaskNodeRequest extends BaseNodeRequest {
public class ADCancelTaskNodeRequest extends TransportRequest {
private String detectorId;
private String detectorTaskId;
private String userName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

/**
* ADStatsNodeRequest to get a nodes stat
*/
public class ADStatsNodeRequest extends BaseNodeRequest {
public class ADStatsNodeRequest extends TransportRequest {
private ADStatsRequest request;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

public class ADTaskProfileNodeRequest extends BaseNodeRequest {
public class ADTaskProfileNodeRequest extends TransportRequest {
private String detectorId;

public ADTaskProfileNodeRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.transport.TransportRequest;

/**
* Delete model represents the request to an individual node
*/
public class CronNodeRequest extends BaseNodeRequest {
public class CronNodeRequest extends TransportRequest {

public CronNodeRequest() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

/**
* Delete model represents the request to an individual node
*/
public class DeleteModelNodeRequest extends BaseNodeRequest {
public class DeleteModelNodeRequest extends TransportRequest {

private String adID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import java.io.IOException;
import java.util.Set;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

/**
* Class representing a nodes's profile request
*/
public class ProfileNodeRequest extends BaseNodeRequest {
public class ProfileNodeRequest extends TransportRequest {
private ProfileRequest request;

public ProfileNodeRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

public class MockADCancelTaskNodeRequest_1_0 extends BaseNodeRequest {
public class MockADCancelTaskNodeRequest_1_0 extends TransportRequest {
private String detectorId;
private String userName;

Expand Down

0 comments on commit a14b2f8

Please sign in to comment.