diff --git a/common/src/main/java/com/amazon/opendistroforelasticsearch/sql/common/setting/Settings.java b/common/src/main/java/com/amazon/opendistroforelasticsearch/sql/common/setting/Settings.java new file mode 100644 index 0000000000..c90b975a82 --- /dev/null +++ b/common/src/main/java/com/amazon/opendistroforelasticsearch/sql/common/setting/Settings.java @@ -0,0 +1,39 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.common.setting; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Setting. + */ +public abstract class Settings { + @RequiredArgsConstructor + public enum Key { + PPL_QUERY_MEMORY_LIMIT("opendistro.ppl.query.memory_limit"); + + @Getter + private final String keyValue; + } + + /** + * Get Setting Value. + */ + public abstract T getSettingValue(Key key); +} diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/monitor/AlwaysHealthyMonitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/monitor/AlwaysHealthyMonitor.java new file mode 100644 index 0000000000..406884f718 --- /dev/null +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/monitor/AlwaysHealthyMonitor.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.monitor; + +/** + * Always healthy resource monitor. + */ +public class AlwaysHealthyMonitor extends ResourceMonitor { + public static final ResourceMonitor ALWAYS_HEALTHY_MONITOR = + new AlwaysHealthyMonitor(); + + /** + * always healthy. + */ + @Override + public boolean isHealthy() { + return true; + } +} diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/monitor/ResourceMonitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/monitor/ResourceMonitor.java new file mode 100644 index 0000000000..c14e68cf67 --- /dev/null +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/monitor/ResourceMonitor.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.monitor; + +/** + * The abstract interface of ResourceMonitor. + * When an fault is detected, the circuit breaker is open. + */ +public abstract class ResourceMonitor { + /** + * Is the resource healthy. + * + * @return true for healthy, otherwise false. + */ + public abstract boolean isHealthy(); +} diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/AggregationOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/AggregationOperator.java index 6e58ad07e6..9fd395e8e3 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/AggregationOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/AggregationOperator.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.stream.Collectors; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; @@ -43,9 +44,11 @@ @EqualsAndHashCode @ToString public class AggregationOperator extends PhysicalPlan { - + @Getter private final PhysicalPlan input; + @Getter private final List aggregatorList; + @Getter private final List groupByExprList; @EqualsAndHashCode.Exclude private final Group group; diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java index c29d6a2ab6..e8efadd51a 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java @@ -26,6 +26,7 @@ import java.util.function.BiFunction; import java.util.function.Predicate; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -33,12 +34,18 @@ * Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link * DedupeOperator#dedupeList} The result order follow the input order. */ +@Getter @EqualsAndHashCode public class DedupeOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final List dedupeList; + @Getter private final Integer allowedDuplication; + @Getter private final Boolean keepEmpty; + @Getter private final Boolean consecutive; @EqualsAndHashCode.Exclude diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/EvalOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/EvalOperator.java index 340bddd4ef..89549e498b 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/EvalOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/EvalOperator.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; import org.apache.commons.lang3.tuple.Pair; @@ -49,7 +50,9 @@ @EqualsAndHashCode(callSuper = false) @RequiredArgsConstructor public class EvalOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final List> expressionList; @Override diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/FilterOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/FilterOperator.java index 19a0077341..be6d48be39 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/FilterOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/FilterOperator.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.List; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; @@ -20,9 +21,11 @@ @ToString @RequiredArgsConstructor public class FilterOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final Expression conditions; - private ExprValue next = null; + @ToString.Exclude private ExprValue next = null; @Override public R accept(PhysicalPlanNodeVisitor visitor, C context) { diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index fc05fff336..9756b57cbb 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -65,4 +65,7 @@ public R visitValues(ValuesOperator node, C context) { return visitNode(node, context); } + public R visitSort(SortOperator node, C context) { + return visitNode(node, context); + } } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/ProjectOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/ProjectOperator.java index edb7c973a5..04aa049e57 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/ProjectOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/ProjectOperator.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; @@ -33,7 +34,9 @@ @EqualsAndHashCode @RequiredArgsConstructor public class ProjectOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final List projectList; @Override diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RemoveOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RemoveOperator.java index 810469de72..37aad433b2 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RemoveOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RemoveOperator.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.Set; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; @@ -40,7 +41,9 @@ @EqualsAndHashCode @RequiredArgsConstructor public class RemoveOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final Set removeList; @Override diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RenameOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RenameOperator.java index bbf3cc69ab..0a83cce09b 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RenameOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/RenameOperator.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; @@ -41,7 +42,9 @@ @ToString @RequiredArgsConstructor public class RenameOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final Map mapping; @Override diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/SortOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/SortOperator.java index fc5e6054d1..3cc1ee163c 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/SortOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/SortOperator.java @@ -31,6 +31,7 @@ import java.util.PriorityQueue; import lombok.Builder; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.Singular; import lombok.ToString; import org.apache.commons.lang3.tuple.Pair; @@ -43,8 +44,11 @@ @ToString @EqualsAndHashCode public class SortOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final Integer count; + @Getter private final List> sortList; @EqualsAndHashCode.Exclude private final Sorter sorter; @@ -79,7 +83,7 @@ public SortOperator( @Override public R accept(PhysicalPlanNodeVisitor visitor, C context) { - return null; + return visitor.visitSort(this, context); } @Override diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/monitor/AlwaysHealthyMonitorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/monitor/AlwaysHealthyMonitorTest.java new file mode 100644 index 0000000000..6e83cc54e6 --- /dev/null +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/monitor/AlwaysHealthyMonitorTest.java @@ -0,0 +1,30 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.monitor; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class AlwaysHealthyMonitorTest { + + @Test + void isHealthy() { + assertTrue(new AlwaysHealthyMonitor().isHealthy()); + } +} \ No newline at end of file diff --git a/docs/category.json b/docs/category.json index c77193d15e..d889ba4a8d 100644 --- a/docs/category.json +++ b/docs/category.json @@ -1,7 +1,8 @@ { "bash": [ "experiment/ppl/interfaces/endpoint.rst", - "experiment/ppl/interfaces/protocol.rst" + "experiment/ppl/interfaces/protocol.rst", + "experiment/ppl/admin/settings.rst" ], "ppl_cli": [ "experiment/ppl/cmd/dedup.rst", diff --git a/docs/experiment/ppl/admin/settings.rst b/docs/experiment/ppl/admin/settings.rst new file mode 100644 index 0000000000..afcf1428de --- /dev/null +++ b/docs/experiment/ppl/admin/settings.rst @@ -0,0 +1,48 @@ +.. highlight:: sh + +============ +PPL Settings +============ + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + + +Introduction +============ + +When Elasticsearch bootstraps, PPL plugin will register a few settings in Elasticsearch cluster settings. Most of the settings are able to change dynamically so you can control the behavior of PPL plugin without need to bounce your cluster. + +opendistro.ppl.query.memory_limit +================================= + +Description +----------- + +You can set heap memory usage limit for PPL query. When query running, it will detected whether the heap memory usage under the limit, if not, it will terminated the current query. The default value is: 85% + +Example +------- + +PPL query:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_cluster/settings \ + ... -d '{"persistent" : {"opendistro.ppl.query.memory_limit" : "80%"}}' + { + "acknowledged": true, + "persistent": { + "opendistro": { + "ppl": { + "query": { + "memory_limit": "80%" + } + } + } + }, + "transient": {} + } + diff --git a/docs/experiment/ppl/index.rst b/docs/experiment/ppl/index.rst index 7253f6dc3e..6645d9e255 100644 --- a/docs/experiment/ppl/index.rst +++ b/docs/experiment/ppl/index.rst @@ -19,6 +19,10 @@ OpenDistro PPL Reference Manual - `Protocol `_ +* **Administration** + + - `Plugin Settings `_ + * **Commands** - `Syntax `_ diff --git a/elasticsearch/build.gradle b/elasticsearch/build.gradle index f46fb69390..b8407262ef 100644 --- a/elasticsearch/build.gradle +++ b/elasticsearch/build.gradle @@ -12,6 +12,7 @@ dependencies { compile project(':core') compile group: 'org.elasticsearch', name: 'elasticsearch', version: "${es_version}" compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: "${es_version}" + compile "io.github.resilience4j:resilience4j-retry:1.5.0" testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchNodeClient.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchNodeClient.java index 2e58fa8809..433275f3cb 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchNodeClient.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/client/ElasticsearchNodeClient.java @@ -58,6 +58,8 @@ public class ElasticsearchNodeClient implements ElasticsearchClient { /** Index name expression resolver to get concrete index name. */ private final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(); + private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + /** * Get field mappings of index by an index expression. Majority is copied from legacy * LocalClusterState. @@ -111,7 +113,7 @@ public void schedule(Runnable task) { threadPool.schedule( threadPool.preserveContext(withCurrentContext(task)), new TimeValue(0), - "search" // TODO: use search worker pool for now + SQL_WORKER_THREAD_POOL_NAME ); } diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngine.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngine.java index 21fc2b467a..b3caa33b83 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngine.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngine.java @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.sql.common.response.ResponseListener; import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ExecutionProtector; import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; import java.util.ArrayList; @@ -31,8 +32,11 @@ public class ElasticsearchExecutionEngine implements ExecutionEngine { private final ElasticsearchClient client; + private final ExecutionProtector executionProtector; + @Override - public void execute(PhysicalPlan plan, ResponseListener listener) { + public void execute(PhysicalPlan physicalPlan, ResponseListener listener) { + PhysicalPlan plan = executionProtector.protect(physicalPlan); client.schedule( () -> { try { diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java new file mode 100644 index 0000000000..db82cb3b7d --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java @@ -0,0 +1,107 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector; + +import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.AggregationOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.EvalOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.FilterOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanNodeVisitor; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.ProjectOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.RemoveOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.RenameOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.SortOperator; +import com.amazon.opendistroforelasticsearch.sql.storage.TableScanOperator; +import lombok.RequiredArgsConstructor; + +/** + * Elasticsearch Execution Protector. + */ +@RequiredArgsConstructor +public class ElasticsearchExecutionProtector extends ExecutionProtector { + + /** + * Elasticsearch resource monitor. + */ + private final ResourceMonitor resourceMonitor; + + public PhysicalPlan protect(PhysicalPlan physicalPlan) { + return physicalPlan.accept(this, null); + } + + @Override + public PhysicalPlan visitFilter(FilterOperator node, Object context) { + return new FilterOperator(visitInput(node.getInput(), context), node.getConditions()); + } + + @Override + public PhysicalPlan visitAggregation(AggregationOperator node, Object context) { + return new AggregationOperator(visitInput(node.getInput(), context), node.getAggregatorList(), + node.getGroupByExprList()); + } + + @Override + public PhysicalPlan visitRename(RenameOperator node, Object context) { + return new RenameOperator(visitInput(node.getInput(), context), node.getMapping()); + } + + /** + * Decorate with {@link ResourceMonitorPlan}. + */ + @Override + public PhysicalPlan visitTableScan(TableScanOperator node, Object context) { + return new ResourceMonitorPlan(node, resourceMonitor); + } + + @Override + public PhysicalPlan visitProject(ProjectOperator node, Object context) { + return new ProjectOperator(visitInput(node.getInput(), context), node.getProjectList()); + } + + @Override + public PhysicalPlan visitRemove(RemoveOperator node, Object context) { + return new RemoveOperator(visitInput(node.getInput(), context), node.getRemoveList()); + } + + @Override + public PhysicalPlan visitEval(EvalOperator node, Object context) { + return new EvalOperator(visitInput(node.getInput(), context), node.getExpressionList()); + } + + @Override + public PhysicalPlan visitDedupe(DedupeOperator node, Object context) { + return new DedupeOperator(visitInput(node.getInput(), context), node.getDedupeList(), + node.getAllowedDuplication(), node.getKeepEmpty(), node.getConsecutive()); + } + + @Override + public PhysicalPlan visitSort(SortOperator node, Object context) { + return new SortOperator(visitInput(node.getInput(), context), node.getCount(), + node.getSortList()); + } + + PhysicalPlan visitInput(PhysicalPlan node, Object context) { + if (null == node) { + return node; + } else { + return node.accept(this, context); + } + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ExecutionProtector.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ExecutionProtector.java new file mode 100644 index 0000000000..81b5d1cb23 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ExecutionProtector.java @@ -0,0 +1,32 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector; + +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanNodeVisitor; + +/** + * Execution Plan Protector. + */ +public abstract class ExecutionProtector extends PhysicalPlanNodeVisitor { + + /** + * Decorated the PhysicalPlan to run in resource sensitive mode. + */ + public abstract PhysicalPlan protect(PhysicalPlan physicalPlan); +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/NoopExecutionProtector.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/NoopExecutionProtector.java new file mode 100644 index 0000000000..6aa656db67 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/NoopExecutionProtector.java @@ -0,0 +1,31 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector; + +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; + +/** + * No operation execution protector. + */ +public class NoopExecutionProtector extends ExecutionProtector { + + @Override + public PhysicalPlan protect(PhysicalPlan physicalPlan) { + return physicalPlan; + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java new file mode 100644 index 0000000000..0225a46974 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java @@ -0,0 +1,79 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector; + +import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue; +import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanNodeVisitor; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * A PhysicalPlan which will run the delegate plan in resource protection manner. + */ +@ToString +@RequiredArgsConstructor +@EqualsAndHashCode +public class ResourceMonitorPlan extends PhysicalPlan { + /** + * Delegated PhysicalPlan. + */ + private final PhysicalPlan delegate; + + /** + * ResourceMonitor. + */ + @ToString.Exclude + private final ResourceMonitor monitor; + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return delegate.accept(visitor, context); + } + + @Override + public void open() { + if (!this.monitor.isHealthy()) { + throw new IllegalStateException("resource is not enough to run the query, quit."); + } + delegate.open(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public List getChild() { + return delegate.getChild(); + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public ExprValue next() { + return delegate.next(); + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchMemoryHealthy.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchMemoryHealthy.java new file mode 100644 index 0000000000..8a3e287c29 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchMemoryHealthy.java @@ -0,0 +1,86 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.monitor; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.ThreadLocalRandom; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +/** + * Elasticsearch Memory Monitor. + */ +@Log4j2 +public class ElasticsearchMemoryHealthy { + private final RandomFail randomFail; + private final MemoryUsage memoryUsage; + + public ElasticsearchMemoryHealthy() { + randomFail = new RandomFail(); + memoryUsage = new MemoryUsage(); + } + + @VisibleForTesting + public ElasticsearchMemoryHealthy( + RandomFail randomFail, + MemoryUsage memoryUsage) { + this.randomFail = randomFail; + this.memoryUsage = memoryUsage; + } + + /** + * Is Memory Healthy. Calculate based on the current heap memory usage. + */ + public boolean isMemoryHealthy(long limitBytes) { + final long memoryUsage = this.memoryUsage.usage(); + log.debug("Memory usage:{}, limit:{}", memoryUsage, limitBytes); + if (memoryUsage < limitBytes) { + return true; + } else { + log.warn("Memory usage:{} exceed limit:{}", memoryUsage, limitBytes); + if (randomFail.shouldFail()) { + log.warn("Fast failure the current request"); + throw new MemoryUsageExceedFastFailureException(); + } else { + throw new MemoryUsageExceedException(); + } + } + } + + static class RandomFail { + public boolean shouldFail() { + return ThreadLocalRandom.current().nextBoolean(); + } + } + + static class MemoryUsage { + public long usage() { + final long freeMemory = Runtime.getRuntime().freeMemory(); + final long totalMemory = Runtime.getRuntime().totalMemory(); + return totalMemory - freeMemory; + } + } + + @NoArgsConstructor + public static class MemoryUsageExceedFastFailureException extends RuntimeException { + + } + + @NoArgsConstructor + public static class MemoryUsageExceedException extends RuntimeException { + + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchResourceMonitor.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchResourceMonitor.java new file mode 100644 index 0000000000..980c833648 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchResourceMonitor.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.monitor; + +import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; +import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import java.util.function.Supplier; +import lombok.extern.log4j.Log4j2; +import org.elasticsearch.common.unit.ByteSizeValue; + +/** + * {@link ResourceMonitor} implementation on Elasticsearch. When the heap memory usage exceeds + * certain threshold, the monitor is not healthy. + * Todo, add metrics. + */ +@Log4j2 +public class ElasticsearchResourceMonitor extends ResourceMonitor { + private final Settings settings; + private final Retry retry; + private final ElasticsearchMemoryHealthy memoryMonitor; + + /** + * Constructor of ElasticsearchCircuitBreaker. + */ + public ElasticsearchResourceMonitor( + Settings settings, + ElasticsearchMemoryHealthy memoryMonitor) { + this.settings = settings; + RetryConfig config = + RetryConfig.custom() + .maxAttempts(3) + .intervalFunction(IntervalFunction.ofExponentialRandomBackoff(1000)) + .retryExceptions(ElasticsearchMemoryHealthy.MemoryUsageExceedException.class) + .ignoreExceptions( + ElasticsearchMemoryHealthy.MemoryUsageExceedFastFailureException.class) + .build(); + retry = Retry.of("mem", config); + this.memoryMonitor = memoryMonitor; + } + + /** + * Is Healthy. + * + * @return true if healthy, otherwise return false. + */ + @Override + public boolean isHealthy() { + try { + ByteSizeValue limit = settings.getSettingValue(Settings.Key.PPL_QUERY_MEMORY_LIMIT); + Supplier booleanSupplier = + Retry.decorateSupplier(retry, + () -> memoryMonitor + .isMemoryHealthy(limit.getBytes())); + return booleanSupplier.get(); + } catch (Exception e) { + return false; + } + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/setting/ElasticsearchSettings.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/setting/ElasticsearchSettings.java new file mode 100644 index 0000000000..da3d463592 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/setting/ElasticsearchSettings.java @@ -0,0 +1,107 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.setting; + +import static org.elasticsearch.common.settings.Settings.EMPTY; + +import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; + +/** + * Setting implementation on Elasticsearch. + */ +@Log4j2 +public class ElasticsearchSettings extends Settings { + /** + * Default settings. + */ + private final Map> defaultSettings; + /** + * Latest setting value for each registered key. Thread-safe is required. + */ + @VisibleForTesting + private final Map latestSettings = new ConcurrentHashMap<>(); + + private static final Setting PPL_QUERY_MEMORY_LIMIT_SETTINGS = Setting.memorySizeSetting( + Key.PPL_QUERY_MEMORY_LIMIT.getKeyValue(), + "85%", + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + /** + * Construct ElasticsearchSetting. + * The ElasticsearchSetting must be singleton. + */ + public ElasticsearchSettings(ClusterSettings clusterSettings) { + ImmutableMap.Builder> settingBuilder = new ImmutableMap.Builder<>(); + register(settingBuilder, clusterSettings, Key.PPL_QUERY_MEMORY_LIMIT, + PPL_QUERY_MEMORY_LIMIT_SETTINGS, new Updater(Key.PPL_QUERY_MEMORY_LIMIT)); + defaultSettings = settingBuilder.build(); + } + + @SuppressWarnings("unchecked") + @Override + public T getSettingValue(Settings.Key key) { + return (T) latestSettings.getOrDefault(key, defaultSettings.get(key).getDefault(EMPTY)); + } + + /** + * Register the pair of {key, setting}. + */ + private void register(ImmutableMap.Builder> settingBuilder, + ClusterSettings clusterSettings, Settings.Key key, + Setting setting, + Consumer updater) { + settingBuilder.put(key, setting); + clusterSettings + .addSettingsUpdateConsumer(setting, updater); + } + + /** + * Add the inner class only for UT coverage purpuse. + * Lambda could be much elegant solution. But which is hard to test. + */ + @VisibleForTesting + @RequiredArgsConstructor + class Updater implements Consumer { + private final Settings.Key key; + + @Override + public void accept(Object newValue) { + log.debug("The value of setting [{}] changed to [{}]", key, newValue); + latestSettings.put(key, newValue); + } + } + + /** + * Used by Plugin to init Setting. + */ + public static List> pluginSettings() { + return new ImmutableList.Builder>().add(PPL_QUERY_MEMORY_LIMIT_SETTINGS).build(); + } +} diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngineTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngineTest.java index b14b7f0b06..f1ca7ac1f8 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngineTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionEngineTest.java @@ -31,6 +31,7 @@ import com.amazon.opendistroforelasticsearch.sql.common.response.ResponseListener; import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ElasticsearchExecutionProtector; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; import com.amazon.opendistroforelasticsearch.sql.storage.TableScanOperator; import java.util.ArrayList; @@ -50,6 +51,8 @@ class ElasticsearchExecutionEngineTest { @Mock private ElasticsearchClient client; + @Mock private ElasticsearchExecutionProtector protector; + @BeforeEach void setUp() { doAnswer( @@ -69,8 +72,9 @@ void executeSuccessfully() { Arrays.asList( tupleValue(of("name", "John", "age", 20)), tupleValue(of("name", "Allen", "age", 30))); FakePhysicalPlan plan = new FakePhysicalPlan(expected.iterator()); + when(protector.protect(plan)).thenReturn(plan); - ElasticsearchExecutionEngine executor = new ElasticsearchExecutionEngine(client); + ElasticsearchExecutionEngine executor = new ElasticsearchExecutionEngine(client, protector); List actual = new ArrayList<>(); executor.execute( plan, @@ -96,8 +100,9 @@ void executeWithFailure() { PhysicalPlan plan = mock(PhysicalPlan.class); RuntimeException expected = new RuntimeException("Execution error"); when(plan.hasNext()).thenThrow(expected); + when(protector.protect(plan)).thenReturn(plan); - ElasticsearchExecutionEngine executor = new ElasticsearchExecutionEngine(client); + ElasticsearchExecutionEngine executor = new ElasticsearchExecutionEngine(client, protector); AtomicReference actual = new AtomicReference<>(); executor.execute( plan, diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java new file mode 100644 index 0000000000..83bdb987bf --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java @@ -0,0 +1,146 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor; + +import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.literal; +import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.ref; +import static com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanDSL.filter; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort; +import com.amazon.opendistroforelasticsearch.sql.data.model.ExprBooleanValue; +import com.amazon.opendistroforelasticsearch.sql.data.model.ExprType; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ElasticsearchExecutionProtector; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ResourceMonitorPlan; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.storage.ElasticsearchIndexScan; +import com.amazon.opendistroforelasticsearch.sql.expression.Expression; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; +import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator; +import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator; +import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanDSL; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ElasticsearchExecutionProtectorTest { + @Mock + private ElasticsearchClient client; + + @Mock + private ResourceMonitor resourceMonitor; + + private ElasticsearchExecutionProtector executionProtector; + + @BeforeEach + public void setup() { + executionProtector = new ElasticsearchExecutionProtector(resourceMonitor); + } + + @Test + public void testProtectIndexScan() { + String indexName = "test"; + ReferenceExpression include = ref("age"); + ReferenceExpression exclude = ref("name"); + ReferenceExpression dedupeField = ref("name"); + Expression filterExpr = literal(ExprBooleanValue.ofTrue()); + List groupByExprs = Arrays.asList(ref("age")); + List aggregators = Arrays.asList(new AvgAggregator(groupByExprs, ExprType.DOUBLE)); + Map mappings = + ImmutableMap.of(ref("name"), ref("lastname")); + Pair newEvalField = + ImmutablePair.of(ref("name1"), ref("name")); + Integer sortCount = 100; + Pair sortField = + ImmutablePair.of(Sort.SortOption.PPL_ASC, ref("name1")); + + assertEquals( + PhysicalPlanDSL.project( + PhysicalPlanDSL.dedupe( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + filter( + resourceMonitor(new ElasticsearchIndexScan(client, + indexName)), + filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortCount, + sortField), + dedupeField), + include), + executionProtector.protect( + PhysicalPlanDSL.project( + PhysicalPlanDSL.dedupe( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + filter( + new ElasticsearchIndexScan(client, indexName), + filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortCount, + sortField), + dedupeField), + include)) + ); + } + + @Test + public void testWithoutProtection() { + Expression filterExpr = literal(ExprBooleanValue.ofTrue()); + + assertEquals( + filter( + filter(null, filterExpr), + filterExpr), + executionProtector.protect( + filter( + filter(null, filterExpr), + filterExpr) + ) + ); + } + + PhysicalPlan resourceMonitor(PhysicalPlan input) { + return new ResourceMonitorPlan(input, resourceMonitor); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java new file mode 100644 index 0000000000..de2e0d157b --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java @@ -0,0 +1,103 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ResourceMonitorPlan; +import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanNodeVisitor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ResourceMonitorPlanTest { + @Mock + private PhysicalPlan plan; + + @Mock + private ResourceMonitor resourceMonitor; + + @Mock + private PhysicalPlanNodeVisitor visitor; + + @Mock + private Object context; + + private ResourceMonitorPlan monitorPlan; + + @BeforeEach + public void setup() { + monitorPlan = new ResourceMonitorPlan(plan, resourceMonitor); + } + + @Test + void openExceedResourceLimit() { + when(resourceMonitor.isHealthy()).thenReturn(false); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> monitorPlan.open()); + assertEquals("resource is not enough to run the query, quit.", exception.getMessage()); + } + + @Test + void openSuccess() { + when(resourceMonitor.isHealthy()).thenReturn(true); + + monitorPlan.open(); + verify(plan, times(1)).open(); + } + + @Test + void nextSuccess() { + monitorPlan.next(); + verify(plan, times(1)).next(); + } + + @Test + void hasNextSuccess() { + monitorPlan.hasNext(); + verify(plan, times(1)).hasNext(); + } + + @Test + void closeSuccess() { + monitorPlan.close(); + verify(plan, times(1)).close(); + } + + @Test + void getChildSuccess() { + monitorPlan.getChild(); + verify(plan, times(1)).getChild(); + } + + @Test + void acceptSuccess() { + monitorPlan.accept(visitor, context); + verify(plan, times(1)).accept(visitor, context); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/NoopExecutionProtectorTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/NoopExecutionProtectorTest.java new file mode 100644 index 0000000000..e36baf35cf --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/NoopExecutionProtectorTest.java @@ -0,0 +1,41 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class NoopExecutionProtectorTest { + + @Mock + private PhysicalPlan plan; + + @Test + void protect() { + NoopExecutionProtector executionProtector = new NoopExecutionProtector(); + PhysicalPlan protectedPlan = executionProtector.protect(plan); + + assertEquals(plan, protectedPlan); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchMemoryHealthyTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchMemoryHealthyTest.java new file mode 100644 index 0000000000..08aeedf68d --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchMemoryHealthyTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.monitor; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ElasticsearchMemoryHealthyTest { + + @Mock + private ElasticsearchMemoryHealthy.RandomFail randomFail; + + @Mock + private ElasticsearchMemoryHealthy.MemoryUsage memoryUsage; + + private ElasticsearchMemoryHealthy monitor; + + @BeforeEach + public void setup() { + monitor = new ElasticsearchMemoryHealthy(randomFail, memoryUsage); + } + + @Test + void isMemoryHealthy() { + when(memoryUsage.usage()).thenReturn(10L); + + assertTrue(monitor.isMemoryHealthy(11L)); + } + + @Test + void memoryUsageExceedLimitFastFailure() { + when(memoryUsage.usage()).thenReturn(10L); + when(randomFail.shouldFail()).thenReturn(true); + + assertThrows(ElasticsearchMemoryHealthy.MemoryUsageExceedFastFailureException.class, + () -> monitor.isMemoryHealthy(9L)); + } + + @Test + void memoryUsageExceedLimitWithoutFastFailure() { + when(memoryUsage.usage()).thenReturn(10L); + when(randomFail.shouldFail()).thenReturn(false); + + assertThrows(ElasticsearchMemoryHealthy.MemoryUsageExceedException.class, + () -> monitor.isMemoryHealthy(9L)); + } + + @Test + void constructElasticsearchMemoryMonitorWithoutArguments() { + ElasticsearchMemoryHealthy monitor = new ElasticsearchMemoryHealthy(); + assertNotNull(monitor); + } + + @Test + void randomFail() { + ElasticsearchMemoryHealthy.RandomFail randomFail = new ElasticsearchMemoryHealthy.RandomFail(); + assertNotNull(randomFail.shouldFail()); + } + + @Test + void setMemoryUsage() { + ElasticsearchMemoryHealthy.MemoryUsage usage = + new ElasticsearchMemoryHealthy.MemoryUsage(); + assertTrue(usage.usage() > 0); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchResourceMonitorTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchResourceMonitorTest.java new file mode 100644 index 0000000000..c8a58f3422 --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/monitor/ElasticsearchResourceMonitorTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.monitor; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ElasticsearchResourceMonitorTest { + + @Mock + private Settings settings; + + @Mock + private ElasticsearchMemoryHealthy memoryMonitor; + + @BeforeEach + public void setup() { + when(settings.getSettingValue(Settings.Key.PPL_QUERY_MEMORY_LIMIT)) + .thenReturn(new ByteSizeValue(10L)); + } + + @Test + void isHealthy() { + when(memoryMonitor.isMemoryHealthy(anyLong())).thenReturn(true); + + ElasticsearchResourceMonitor resourceMonitor = + new ElasticsearchResourceMonitor(settings, memoryMonitor); + assertTrue(resourceMonitor.isHealthy()); + } + + @Test + void notHealthyFastFailure() { + when(memoryMonitor.isMemoryHealthy(anyLong())).thenThrow( + ElasticsearchMemoryHealthy.MemoryUsageExceedFastFailureException.class); + + ElasticsearchResourceMonitor resourceMonitor = + new ElasticsearchResourceMonitor(settings, memoryMonitor); + assertFalse(resourceMonitor.isHealthy()); + verify(memoryMonitor, times(1)).isMemoryHealthy(anyLong()); + } + + @Test + void notHealthyWithRetry() { + when(memoryMonitor.isMemoryHealthy(anyLong())).thenThrow( + ElasticsearchMemoryHealthy.MemoryUsageExceedException.class); + + ElasticsearchResourceMonitor resourceMonitor = + new ElasticsearchResourceMonitor(settings, memoryMonitor); + assertFalse(resourceMonitor.isHealthy()); + verify(memoryMonitor, times(3)).isMemoryHealthy(anyLong()); + } + + @Test + void healthyWithRetry() { + + when(memoryMonitor.isMemoryHealthy(anyLong())).thenThrow( + ElasticsearchMemoryHealthy.MemoryUsageExceedException.class).thenReturn(true); + + ElasticsearchResourceMonitor resourceMonitor = + new ElasticsearchResourceMonitor(settings, memoryMonitor); + assertTrue(resourceMonitor.isHealthy()); + verify(memoryMonitor, times(2)).isMemoryHealthy(anyLong()); + } +} diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/setting/ElasticsearchSettingsTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/setting/ElasticsearchSettingsTest.java new file mode 100644 index 0000000000..a50c2a2ed8 --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/setting/ElasticsearchSettingsTest.java @@ -0,0 +1,67 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.setting; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; +import java.util.List; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ElasticsearchSettingsTest { + + @Mock + private ClusterSettings clusterSettings; + + @Test + void getSettingValue() { + ElasticsearchSettings settings = new ElasticsearchSettings(clusterSettings); + ByteSizeValue sizeValue = settings.getSettingValue(Settings.Key.PPL_QUERY_MEMORY_LIMIT); + + assertNotNull(sizeValue); + } + + @Test + void pluginSettings() { + List> settings = ElasticsearchSettings.pluginSettings(); + + assertFalse(settings.isEmpty()); + } + + @Test + void update() { + ElasticsearchSettings settings = new ElasticsearchSettings(clusterSettings); + ByteSizeValue oldValue = settings.getSettingValue(Settings.Key.PPL_QUERY_MEMORY_LIMIT); + ElasticsearchSettings.Updater updater = + settings.new Updater(Settings.Key.PPL_QUERY_MEMORY_LIMIT); + updater.accept(new ByteSizeValue(0L)); + + ByteSizeValue newValue = settings.getSettingValue(Settings.Key.PPL_QUERY_MEMORY_LIMIT); + + assertNotEquals(newValue.getBytes(), oldValue.getBytes()); + } +} \ No newline at end of file diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/legacy/SQLIntegTestCase.java index 3e5d984bed..3981aa99d0 100644 --- a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/legacy/SQLIntegTestCase.java @@ -346,7 +346,7 @@ protected static class ClusterSetting { private final String name; private final String value; - ClusterSetting(String type, String name, String value) { + public ClusterSetting(String type, String name, String value) { this.type = type; this.name = name; this.value = (value == null) ? "null" : ("\"" + value + "\""); diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLIntegTestCase.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLIntegTestCase.java index 77f1141640..87c7cb9bd7 100644 --- a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLIntegTestCase.java +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLIntegTestCase.java @@ -15,7 +15,7 @@ package com.amazon.opendistroforelasticsearch.sql.ppl; -import com.amazon.opendistroforelasticsearch.sql.legacy.RestIntegTestCase; +import com.amazon.opendistroforelasticsearch.sql.legacy.SQLIntegTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -32,7 +32,7 @@ /** * ES Rest integration test base for PPL testing */ -public abstract class PPLIntegTestCase extends RestIntegTestCase { +public abstract class PPLIntegTestCase extends SQLIntegTestCase { protected JSONObject executeQuery(String query) throws IOException { return jsonify(executeQueryToString(query)); diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLPluginIT.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLPluginIT.java index a6eb31f74b..86219cc5cd 100644 --- a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLPluginIT.java +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/PPLPluginIT.java @@ -61,10 +61,10 @@ public void testQueryEndpointShouldFail() throws IOException { exceptionRule.expect(ResponseException.class); exceptionRule.expect(hasProperty("response", statusCode(500))); - client().performRequest(makeRequest("search invalid")); + client().performRequest(makePPLRequest("search invalid")); } - protected Request makeRequest(String query) { + protected Request makePPLRequest(String query) { Request post = new Request("POST", "/_opendistro/_ppl"); post.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query)); return post; diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ResourceMonitorIT.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ResourceMonitorIT.java new file mode 100644 index 0000000000..0514bb4320 --- /dev/null +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ResourceMonitorIT.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.ppl; + +import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_DOG; +import static com.amazon.opendistroforelasticsearch.sql.util.MatcherUtils.columnName; +import static com.amazon.opendistroforelasticsearch.sql.util.MatcherUtils.verifyColumn; + +import java.io.IOException; +import org.elasticsearch.client.ResponseException; +import org.hamcrest.Matchers; +import org.json.JSONObject; +import org.junit.Test; + +public class ResourceMonitorIT extends PPLIntegTestCase { + + @Override + public void init() throws IOException { + loadIndex(Index.DOG); + } + + @Test + public void queryExceedResourceLimitShouldFail() throws IOException { + // update opendistro.ppl.query.memory_limit to 1% + updateClusterSettings( + new ClusterSetting("persistent", "opendistro.ppl.query.memory_limit", "1%")); + String query = String.format("search source=%s age=20", TEST_INDEX_DOG); + + ResponseException exception = + expectThrows(ResponseException.class, () -> executeQuery(query)); + assertEquals(500, exception.getResponse().getStatusLine().getStatusCode()); + assertThat(exception.getMessage(), Matchers.containsString("resource is not enough to run the" + + " query, quit.")); + + // update opendistro.ppl.query.memory_limit to default value 85% + updateClusterSettings( + new ClusterSetting("persistent", "opendistro.ppl.query.memory_limit", "85%")); + JSONObject result = executeQuery(String.format("search source=%s", TEST_INDEX_DOG)); + verifyColumn(result, columnName("dog_name"), columnName("holdersName"), columnName("age")); + } +} diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/StandaloneIT.java index 8907ebb1e6..b017da6f63 100644 --- a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/StandaloneIT.java @@ -22,9 +22,11 @@ import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchRestClient; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.ElasticsearchExecutionEngine; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ElasticsearchExecutionProtector; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.storage.ElasticsearchStorageEngine; import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine; import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine.QueryResponse; +import com.amazon.opendistroforelasticsearch.sql.monitor.AlwaysHealthyMonitor; import com.amazon.opendistroforelasticsearch.sql.ppl.config.PPLServiceConfig; import com.amazon.opendistroforelasticsearch.sql.ppl.domain.PPLQueryRequest; import com.amazon.opendistroforelasticsearch.sql.protocol.response.QueryResult; @@ -58,8 +60,10 @@ public void init() { ElasticsearchClient client = new ElasticsearchRestClient(restClient); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); - context.registerBean(StorageEngine.class, () -> new ElasticsearchStorageEngine(client)); - context.registerBean(ExecutionEngine.class, () -> new ElasticsearchExecutionEngine(client)); + context.registerBean(StorageEngine.class, + () -> new ElasticsearchStorageEngine(client)); + context.registerBean(ExecutionEngine.class, () -> new ElasticsearchExecutionEngine(client, + new ElasticsearchExecutionProtector(new AlwaysHealthyMonitor()))); context.register(PPLServiceConfig.class); context.refresh(); diff --git a/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/ElasticsearchSQLPluginConfig.java b/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/ElasticsearchSQLPluginConfig.java new file mode 100644 index 0000000000..45be68dc0a --- /dev/null +++ b/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/ElasticsearchSQLPluginConfig.java @@ -0,0 +1,62 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.legacy.plugin; + +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchNodeClient; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.ElasticsearchExecutionEngine; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ExecutionProtector; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.NoopExecutionProtector; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.storage.ElasticsearchStorageEngine; +import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine; +import com.amazon.opendistroforelasticsearch.sql.storage.StorageEngine; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; + +/** + * Elasticsearch Plugin Config for SQL. + */ +public class ElasticsearchSQLPluginConfig { + @Autowired + private ClusterService clusterService; + + @Autowired + private NodeClient nodeClient; + + @Bean + public ElasticsearchClient client() { + return new ElasticsearchNodeClient(clusterService, nodeClient); + } + + @Bean + public StorageEngine storageEngine() { + return new ElasticsearchStorageEngine(client()); + } + + @Bean + public ExecutionEngine executionEngine() { + return new ElasticsearchExecutionEngine(client(), protector()); + } + + @Bean + public ExecutionProtector protector() { + return new NoopExecutionProtector(); + } +} diff --git a/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java index 97ab6d7245..6f243e684b 100644 --- a/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -24,10 +24,7 @@ import com.amazon.opendistroforelasticsearch.sql.ast.tree.UnresolvedPlan; import com.amazon.opendistroforelasticsearch.sql.common.antlr.SyntaxCheckException; import com.amazon.opendistroforelasticsearch.sql.common.response.ResponseListener; -import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchNodeClient; -import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.ElasticsearchExecutionEngine; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.security.SecurityAccess; -import com.amazon.opendistroforelasticsearch.sql.elasticsearch.storage.ElasticsearchStorageEngine; import com.amazon.opendistroforelasticsearch.sql.protocol.response.QueryResult; import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.SimpleJsonResponseFormatter; import com.amazon.opendistroforelasticsearch.sql.sql.SQLService; @@ -106,9 +103,7 @@ private SQLService createSQLService(NodeClient client) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.registerBean(ClusterService.class, () -> clusterService); context.registerBean(NodeClient.class, () -> client); - context.register(ElasticsearchNodeClient.class); - context.register(ElasticsearchStorageEngine.class); - context.register(ElasticsearchExecutionEngine.class); + context.register(ElasticsearchSQLPluginConfig.class); context.register(SQLServiceConfig.class); context.refresh(); return context.getBean(SQLService.class); diff --git a/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SQLPlugin.java index 8df9180480..77475d5268 100644 --- a/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SQLPlugin.java @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.sql.plugin; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.setting.ElasticsearchSettings; import com.amazon.opendistroforelasticsearch.sql.legacy.esdomain.LocalClusterState; import com.amazon.opendistroforelasticsearch.sql.legacy.executor.AsyncRestExecutor; import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.Metrics; @@ -23,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.sql.legacy.plugin.RestSqlStatsAction; import com.amazon.opendistroforelasticsearch.sql.legacy.plugin.SqlSettings; import com.amazon.opendistroforelasticsearch.sql.plugin.rest.RestPPLQueryAction; +import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -62,6 +64,11 @@ public class SQLPlugin extends Plugin implements ActionPlugin { private ClusterService clusterService; + /** + * Settings should be inited when bootstrap the plugin. + */ + private com.amazon.opendistroforelasticsearch.sql.common.setting.Settings pluginSettings; + public String name() { return "sql"; } @@ -78,12 +85,13 @@ public List getRestHandlers(Settings settings, RestController restC IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { Objects.requireNonNull(clusterService, "Cluster service is required"); + Objects.requireNonNull(pluginSettings, "Cluster settings is required"); LocalClusterState.state().setResolver(indexNameExpressionResolver); Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( - new RestPPLQueryAction(restController, clusterService), + new RestPPLQueryAction(restController, clusterService, pluginSettings), new RestSqlAction(settings, clusterService), new RestSqlStatsAction(settings, restController), new RestSqlSettingsAction(settings, restController) @@ -101,6 +109,7 @@ public Collection createComponents(Client client, ClusterService cluster NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameResolver) { this.clusterService = clusterService; + this.pluginSettings = new ElasticsearchSettings(clusterService.getClusterSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setSqlSettings(sqlSettings); @@ -126,7 +135,10 @@ public List> getExecutorBuilders(Settings settings) { @Override public List> getSettings() { - return sqlSettings.getSettings(); + ImmutableList> settings = + new ImmutableList.Builder>().addAll(sqlSettings.getSettings()) + .addAll(ElasticsearchSettings.pluginSettings()).build(); + return settings; } } diff --git a/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/ElasticsearchPluginConfig.java b/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/ElasticsearchPluginConfig.java index 1fc60b6bbd..cc9de58760 100644 --- a/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/ElasticsearchPluginConfig.java +++ b/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/ElasticsearchPluginConfig.java @@ -16,11 +16,17 @@ package com.amazon.opendistroforelasticsearch.sql.plugin.rest; +import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchNodeClient; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.ElasticsearchExecutionEngine; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ElasticsearchExecutionProtector; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.executor.protector.ExecutionProtector; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.monitor.ElasticsearchMemoryHealthy; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.monitor.ElasticsearchResourceMonitor; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.storage.ElasticsearchStorageEngine; import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine; +import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; import com.amazon.opendistroforelasticsearch.sql.storage.StorageEngine; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; @@ -41,6 +47,9 @@ public class ElasticsearchPluginConfig { @Autowired private NodeClient nodeClient; + @Autowired + private Settings settings; + @Bean public ElasticsearchClient client() { return new ElasticsearchNodeClient(clusterService, nodeClient); @@ -53,7 +62,16 @@ public StorageEngine storageEngine() { @Bean public ExecutionEngine executionEngine() { - return new ElasticsearchExecutionEngine(client()); + return new ElasticsearchExecutionEngine(client(), protector()); } + @Bean + public ResourceMonitor resourceMonitor() { + return new ElasticsearchResourceMonitor(settings, new ElasticsearchMemoryHealthy()); + } + + @Bean + public ExecutionProtector protector() { + return new ElasticsearchExecutionProtector(resourceMonitor()); + } } diff --git a/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/RestPPLQueryAction.java index 6785f91237..7062b31ccb 100644 --- a/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/rest/RestPPLQueryAction.java @@ -20,6 +20,7 @@ import static org.elasticsearch.rest.RestStatus.OK; import com.amazon.opendistroforelasticsearch.sql.common.response.ResponseListener; +import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.security.SecurityAccess; import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine.QueryResponse; import com.amazon.opendistroforelasticsearch.sql.plugin.request.PPLQueryRequestFactory; @@ -53,9 +54,19 @@ public class RestPPLQueryAction extends BaseRestHandler { */ private final ClusterService clusterService; - public RestPPLQueryAction(RestController restController, ClusterService clusterService) { + /** + * Settings required by been initialization. + */ + private final Settings pluginSettings; + + /** + * Constructor of RestPPLQueryAction. + */ + public RestPPLQueryAction(RestController restController, ClusterService clusterService, + Settings pluginSettings) { super(); this.clusterService = clusterService; + this.pluginSettings = pluginSettings; } @Override @@ -77,11 +88,24 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod PPLQueryRequestFactory.getPPLRequest(request), createListener(channel)); } + /** + * Ideally, the AnnotationConfigApplicationContext should be shared across Plugin. By default, + * spring construct all the bean as singleton. Currently, there are no better solution to + * create the bean in protocol scope. The limitations are + * alt-1, add annotation for bean @Scope(value = SCOPE_PROTOTYPE, proxyMode = TARGET_CLASS), it + * works by add the proxy, + * but when running in Elasticsearch, all the operation need security permission whic is hard + * to control. + * alt-2, using ObjectFactory with @Autowired, it also works, but require add to all the + * configuration. + * We will revisit the current solution if any major issue found. + */ private PPLService createPPLService(NodeClient client) { return doPrivileged(() -> { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.registerBean(ClusterService.class, () -> clusterService); context.registerBean(NodeClient.class, () -> client); + context.registerBean(Settings.class, () -> pluginSettings); context.register(ElasticsearchPluginConfig.class); context.register(PPLServiceConfig.class); context.refresh();