Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Protect window operator by circuit breaker (#1006)
Browse files Browse the repository at this point in the history
* Change protector and add UT

* Check resource on every 1000 calls
  • Loading branch information
dai-chen authored Jan 29, 2021
1 parent df61641 commit 614c500
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public PhysicalPlan visitRename(RenameOperator node, Object context) {
*/
@Override
public PhysicalPlan visitTableScan(TableScanOperator node, Object context) {
return new ResourceMonitorPlan(node, resourceMonitor);
return doProtect(node);
}

@Override
Expand Down Expand Up @@ -111,10 +111,14 @@ public PhysicalPlan visitHead(HeadOperator node, Object context) {
);
}

/**
* Decorate input node with {@link ResourceMonitorPlan} to avoid aggregate
* window function pre-loads too many data into memory in worst case.
*/
@Override
public PhysicalPlan visitWindow(WindowOperator node, Object context) {
return new WindowOperator(
visitInput(node.getInput(), context),
doProtect(visitInput(node.getInput(), context)),
node.getWindowFunction(),
node.getWindowDefinition());
}
Expand All @@ -124,11 +128,10 @@ public PhysicalPlan visitWindow(WindowOperator node, Object context) {
*/
@Override
public PhysicalPlan visitSort(SortOperator node, Object context) {
return new ResourceMonitorPlan(
return doProtect(
new SortOperator(
visitInput(node.getInput(), context),
node.getSortList()),
resourceMonitor);
node.getSortList()));
}

/**
Expand All @@ -155,4 +158,16 @@ PhysicalPlan visitInput(PhysicalPlan node, Object context) {
return node.accept(this, context);
}
}

private PhysicalPlan doProtect(PhysicalPlan node) {
if (isProtected(node)) {
return node;
}
return new ResourceMonitorPlan(node, resourceMonitor);
}

private boolean isProtected(PhysicalPlan node) {
return (node instanceof ResourceMonitorPlan);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
@RequiredArgsConstructor
@EqualsAndHashCode
public class ResourceMonitorPlan extends PhysicalPlan {

/**
* How many method calls to delegate's next() to perform resource check once.
*/
public static final long NUMBER_OF_NEXT_CALL_TO_CHECK = 1000;

/**
* Delegated PhysicalPlan.
*/
Expand All @@ -44,6 +50,13 @@ public class ResourceMonitorPlan extends PhysicalPlan {
@ToString.Exclude
private final ResourceMonitor monitor;

/**
* Count how many calls to delegate's next() already.
*/
@EqualsAndHashCode.Exclude
private long nextCallCount = 0L;


@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return delegate.accept(visitor, context);
Expand Down Expand Up @@ -74,6 +87,10 @@ public boolean hasNext() {

@Override
public ExprValue next() {
boolean shouldCheck = (++nextCallCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0);
if (shouldCheck && !this.monitor.isHealthy()) {
throw new IllegalStateException("resource is not enough to load next row, quit.");
}
return delegate.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.NamedAggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation.AggregateWindowFunction;
import com.amazon.opendistroforelasticsearch.sql.expression.window.ranking.RankFunction;
import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor;
import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -213,6 +214,50 @@ public void testProtectSortForWindowOperator() {
windowDefinition)));
}

@Test
public void testProtectWindowOperatorInput() {
NamedExpression avg = named(mock(AggregateWindowFunction.class));
WindowDefinition windowDefinition = mock(WindowDefinition.class);

assertEquals(
window(
resourceMonitor(
values()),
avg,
windowDefinition),
executionProtector.protect(
window(
values(),
avg,
windowDefinition)));
}

@SuppressWarnings("unchecked")
@Test
public void testNotProtectWindowOperatorInputIfAlreadyProtected() {
NamedExpression avg = named(mock(AggregateWindowFunction.class));
Pair<Sort.SortOption, Expression> sortItem =
ImmutablePair.of(DEFAULT_ASC, DSL.ref("age", INTEGER));
WindowDefinition windowDefinition =
new WindowDefinition(emptyList(), ImmutableList.of(sortItem));

assertEquals(
window(
resourceMonitor(
sort(
values(emptyList()),
sortItem)),
avg,
windowDefinition),
executionProtector.protect(
window(
sort(
values(emptyList()),
sortItem),
avg,
windowDefinition)));
}

@Test
public void testWithoutProtection() {
Expression filterExpr = literal(ExprBooleanValue.of(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,26 @@ void openSuccess() {

@Test
void nextSuccess() {
monitorPlan.next();
verify(plan, times(1)).next();
when(resourceMonitor.isHealthy()).thenReturn(true);

for (int i = 1; i <= 1000; i++) {
monitorPlan.next();
}
verify(resourceMonitor, times(1)).isHealthy();
verify(plan, times(1000)).next();
}

@Test
void nextExceedResourceLimit() {
when(resourceMonitor.isHealthy()).thenReturn(false);

for (int i = 1; i < 1000; i++) {
monitorPlan.next();
}

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> monitorPlan.next());
assertEquals("resource is not enough to load next row, quit.", exception.getMessage());
}

@Test
Expand Down

0 comments on commit 614c500

Please sign in to comment.