Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
[PPL] Add Resource monitor to avoid OOM (#533)
Browse files Browse the repository at this point in the history
* update

* add doc

* address comments

* address comments
  • Loading branch information
penghuo authored Jun 29, 2020
1 parent 087da88 commit df16ffe
Show file tree
Hide file tree
Showing 43 changed files with 1,473 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.common.setting;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Setting.
*/
public abstract class Settings {
@RequiredArgsConstructor
public enum Key {
PPL_QUERY_MEMORY_LIMIT("opendistro.ppl.query.memory_limit");

@Getter
private final String keyValue;
}

/**
* Get Setting Value.
*/
public abstract <T> T getSettingValue(Key key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.monitor;

/**
* Always healthy resource monitor.
*/
public class AlwaysHealthyMonitor extends ResourceMonitor {
public static final ResourceMonitor ALWAYS_HEALTHY_MONITOR =
new AlwaysHealthyMonitor();

/**
* always healthy.
*/
@Override
public boolean isHealthy() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.monitor;

/**
* The abstract interface of ResourceMonitor.
* When an fault is detected, the circuit breaker is open.
*/
public abstract class ResourceMonitor {
/**
* Is the resource healthy.
*
* @return true for healthy, otherwise false.
*/
public abstract boolean isHealthy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

Expand All @@ -43,9 +44,11 @@
@EqualsAndHashCode
@ToString
public class AggregationOperator extends PhysicalPlan {

@Getter
private final PhysicalPlan input;
@Getter
private final List<Aggregator> aggregatorList;
@Getter
private final List<Expression> groupByExprList;
@EqualsAndHashCode.Exclude
private final Group group;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,26 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

/**
* Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link
* DedupeOperator#dedupeList} The result order follow the input order.
*/
@Getter
@EqualsAndHashCode
public class DedupeOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final List<Expression> dedupeList;
@Getter
private final Integer allowedDuplication;
@Getter
private final Boolean keepEmpty;
@Getter
private final Boolean consecutive;

@EqualsAndHashCode.Exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Map.Entry;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -49,7 +50,9 @@
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class EvalOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final List<Pair<ReferenceExpression, Expression>> expressionList;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

Expand All @@ -20,9 +21,11 @@
@ToString
@RequiredArgsConstructor
public class FilterOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final Expression conditions;
private ExprValue next = null;
@ToString.Exclude private ExprValue next = null;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ public R visitValues(ValuesOperator node, C context) {
return visitNode(node, context);
}

public R visitSort(SortOperator node, C context) {
return visitNode(node, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

Expand All @@ -33,7 +34,9 @@
@EqualsAndHashCode
@RequiredArgsConstructor
public class ProjectOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final List<Expression> projectList;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map.Entry;
import java.util.Set;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

Expand All @@ -40,7 +41,9 @@
@EqualsAndHashCode
@RequiredArgsConstructor
public class RemoveOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final Set<ReferenceExpression> removeList;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

Expand All @@ -41,7 +42,9 @@
@ToString
@RequiredArgsConstructor
public class RenameOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final Map<ReferenceExpression, ReferenceExpression> mapping;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.PriorityQueue;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -43,8 +44,11 @@
@ToString
@EqualsAndHashCode
public class SortOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
@Getter
private final Integer count;
@Getter
private final List<Pair<SortOption, Expression>> sortList;
@EqualsAndHashCode.Exclude
private final Sorter sorter;
Expand Down Expand Up @@ -79,7 +83,7 @@ public SortOperator(

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return null;
return visitor.visitSort(this, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.monitor;

import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class AlwaysHealthyMonitorTest {

@Test
void isHealthy() {
assertTrue(new AlwaysHealthyMonitor().isHealthy());
}
}
3 changes: 2 additions & 1 deletion docs/category.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"bash": [
"experiment/ppl/interfaces/endpoint.rst",
"experiment/ppl/interfaces/protocol.rst"
"experiment/ppl/interfaces/protocol.rst",
"experiment/ppl/admin/settings.rst"
],
"ppl_cli": [
"experiment/ppl/cmd/dedup.rst",
Expand Down
48 changes: 48 additions & 0 deletions docs/experiment/ppl/admin/settings.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
.. highlight:: sh

============
PPL Settings
============

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

When Elasticsearch bootstraps, PPL plugin will register a few settings in Elasticsearch cluster settings. Most of the settings are able to change dynamically so you can control the behavior of PPL plugin without need to bounce your cluster.

opendistro.ppl.query.memory_limit
=================================

Description
-----------

You can set heap memory usage limit for PPL query. When query running, it will detected whether the heap memory usage under the limit, if not, it will terminated the current query. The default value is: 85%

Example
-------

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_cluster/settings \
... -d '{"persistent" : {"opendistro.ppl.query.memory_limit" : "80%"}}'
{
"acknowledged": true,
"persistent": {
"opendistro": {
"ppl": {
"query": {
"memory_limit": "80%"
}
}
}
},
"transient": {}
}

4 changes: 4 additions & 0 deletions docs/experiment/ppl/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ OpenDistro PPL Reference Manual

- `Protocol <interfaces/protocol.rst>`_

* **Administration**

- `Plugin Settings <admin/settings.rst>`_

* **Commands**

- `Syntax <cmd/syntax.rst>`_
Expand Down
1 change: 1 addition & 0 deletions elasticsearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
compile project(':core')
compile group: 'org.elasticsearch', name: 'elasticsearch', version: "${es_version}"
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: "${es_version}"
compile "io.github.resilience4j:resilience4j-retry:1.5.0"

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
Expand Down
Loading

0 comments on commit df16ffe

Please sign in to comment.