Skip to content

Commit

Permalink
Code PoC MVP.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Jun 17, 2023
1 parent 9cb88a2 commit 195de33
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* Currently, V2 engine does not support queries with:
* - aggregation (GROUP BY clause or aggregation functions like min/max)
* - in memory aggregation (window function)
* - LIMIT/OFFSET clause(s)
* - OFFSET clause
* - without FROM clause
* - JOIN
* - a subquery
Expand Down Expand Up @@ -103,10 +103,13 @@ public Boolean visitValues(Values node, Object context) {
return Boolean.TRUE;
}

// Queries with LIMIT/OFFSET clauses are unsupported
// Limit can't be pushed down in pagination, so has to be implemented by `LimitOperator`.
// OpenSearch rejects scroll query with `from` parameter, so offset can't be pushed down.
// Non-zero offset produces incomplete or even empty pages, so making it not supported.
@Override
public Boolean visitLimit(Limit node, Object context) {
return Boolean.FALSE;
// TODO open a GH ticket for offset
return node.getOffset() == 0 && canPaginate(node, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import static com.facebook.presto.matching.DefaultMatcher.DEFAULT_MATCHER;

import com.facebook.presto.matching.Match;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
Expand All @@ -28,20 +32,20 @@
*/
public class LogicalPlanOptimizer {

private final List<Rule<?>> rules;
private final List<Rule<LogicalPlan>> rules;

/**
* Create {@link LogicalPlanOptimizer} with customized rules.
*/
public LogicalPlanOptimizer(List<Rule<?>> rules) {
public LogicalPlanOptimizer(List<Rule<LogicalPlan>> rules) {
this.rules = rules;
}

/**
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
return new LogicalPlanOptimizer(Stream.of(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
Expand All @@ -51,47 +55,48 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
new PushDownPageSize(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
new PushDownPageSize(),
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_NESTED,
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
new CreateTableWriteBuilder())
.map(r -> (Rule<LogicalPlan>)r).collect(Collectors.toList()));
}

/**
* Optimize {@link LogicalPlan}.
*/
public LogicalPlan optimize(LogicalPlan plan) {
LogicalPlan optimized = internalOptimize(plan);
var node = plan;
for (Rule<LogicalPlan> rule : rules) {
node = traverseAndOptimize(node, rule);
}
return node;
}

private LogicalPlan traverseAndOptimize(LogicalPlan plan, Rule<LogicalPlan> rule) {
LogicalPlan optimized = internalOptimize(plan, rule);
optimized.replaceChildPlans(
optimized.getChild().stream().map(this::optimize).collect(
Collectors.toList()));
return internalOptimize(optimized);
optimized.getChild().stream().map(p -> traverseAndOptimize(p, rule))
.collect(Collectors.toList()));
return internalOptimize(optimized, rule);
}

private LogicalPlan internalOptimize(LogicalPlan plan) {
private LogicalPlan internalOptimize(LogicalPlan plan, Rule<LogicalPlan> rule) {
LogicalPlan node = plan;
boolean done = false;
while (!done) {
done = true;
for (Rule rule : rules) {
Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
if (node != match.value()) {
done = false;
}
}
}
Match<LogicalPlan> match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
}
return node;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class TableScanPushDown<T extends LogicalPlan> implements Rule<T> {

@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern<T> pattern,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<TableScanBuilder>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
package org.opensearch.sql.planner.physical;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.exception.NoCursorException;
import org.opensearch.sql.planner.SerializablePlan;

/**
* The limit operator sets a window, to and block the rows out of the window
Expand All @@ -25,16 +30,22 @@
* it occurs when the original result set has a size smaller than {index + limit},
* or even not greater than the offset. The latter results in an empty output.</p>
*/
@RequiredArgsConstructor
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class LimitOperator extends PhysicalPlan {
private final PhysicalPlan input;
private final Integer limit;
private final Integer offset;
@AllArgsConstructor
public class LimitOperator extends PhysicalPlan implements SerializablePlan {
private PhysicalPlan input;
private Integer limit;
private Integer offset;
private Integer count = 0;

public LimitOperator(PhysicalPlan input, Integer limit, Integer offset) {
this.input = input;
this.limit = limit;
this.offset = offset;
}

@Override
public void open() {
super.open();
Expand Down Expand Up @@ -67,4 +78,29 @@ public List<PhysicalPlan> getChild() {
return ImmutableList.of(input);
}

/** Don't use, it is for deserialization needs only. */
@Deprecated
public LimitOperator() {
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
limit = in.readInt();
count = in.readInt();
// note: offset aren't serialized and deserialized, because not supported in pagination
// TODO open a GH ticket and post here link
offset = 0;
input = (PhysicalPlan) in.readObject();
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
if (count == limit) {
// paging is finished
throw new NoCursorException();
}
out.writeInt(limit);
out.writeInt(count);
out.writeObject(((SerializablePlan) input).getPlanForSerialization());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public PhysicalPlan visitLimit(LimitOperator node, Object context) {
return new LimitOperator(
visitInput(node.getInput(), context),
node.getLimit(),
node.getOffset());
node.getOffset(),
node.getCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ public OpenSearchRequestBuilder(int requestedTotalSize,
*/
public OpenSearchRequest build(OpenSearchRequest.IndexName indexName,
int maxResultWindow, TimeValue scrollTimeout) {
int size = requestedTotalSize;
if (pageSize == null) {
if (startFrom + size > maxResultWindow) {
if (startFrom + requestedTotalSize > maxResultWindow) {
sourceBuilder.size(maxResultWindow - startFrom);
return new OpenSearchScrollRequest(
indexName, scrollTimeout, sourceBuilder, exprValueFactory);
Expand Down Expand Up @@ -182,7 +181,6 @@ public void pushDownSort(List<SortBuilder<?>> sortBuilders) {
public void pushDownLimit(Integer limit, Integer offset) {
requestedTotalSize = limit;
startFrom = offset;
sourceBuilder.from(offset).size(limit);
}

public void pushDownTrackedScore(boolean trackScores) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class OpenSearchIndexScanBuilder extends TableScanBuilder {
/** Is limit operator pushed down. */
private boolean isLimitPushedDown = false;

/** Is page size set. */
private boolean isPageSizePushedDown = false;

/**
* Constructor used during query execution.
*/
Expand Down Expand Up @@ -80,6 +83,7 @@ public boolean pushDownAggregation(LogicalAggregation aggregation) {

@Override
public boolean pushDownPageSize(LogicalPaginate paginate) {
isPageSizePushedDown = true;
return delegate.pushDownPageSize(paginate);
}

Expand All @@ -93,8 +97,12 @@ public boolean pushDownSort(LogicalSort sort) {

@Override
public boolean pushDownLimit(LogicalLimit limit) {
if (isPageSizePushedDown) {
return false;
}
// Assume limit push down happening on OpenSearchIndexScanQueryBuilder
isLimitPushedDown = true;
// TODO move this logic to OpenSearchRequestBuilder?
return delegate.pushDownLimit(limit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@


import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndIndexScan;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndRelation;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeFilterAndRelation;
Expand All @@ -23,10 +28,10 @@ public class PrometheusLogicalPlanOptimizerFactory {
* Create Prometheus storage specified logical plan optimizer.
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
return new LogicalPlanOptimizer(Stream.of(
new MergeFilterAndRelation(),
new MergeAggAndIndexScan(),
new MergeAggAndRelation()
));
).map(r -> (Rule<LogicalPlan>)r).collect(Collectors.toList()));
}
}

0 comments on commit 195de33

Please sign in to comment.