Skip to content

Commit

Permalink
Simplify OpenSearchIndexScanBuilder (opensearch-project#275) (opensea…
Browse files Browse the repository at this point in the history
…rch-project#1738)

Signed-off-by: Max Ksyunz <[email protected]>
  • Loading branch information
Max Ksyunz authored Jun 14, 2023
1 parent da386e5 commit 29f99aa
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.Settings;
Expand All @@ -33,7 +34,6 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/** OpenSearch table (index) implementation. */
Expand Down Expand Up @@ -171,19 +171,14 @@ public PhysicalPlan implement(LogicalPlan plan) {
public TableScanBuilder createScanBuilder() {
final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT);

final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
var builder = new OpenSearchRequestBuilder(
querySizeLimit,
createExprValueFactory());

return new OpenSearchIndexScanBuilder(builder) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder requestBuilder) {
final TimeValue cursorKeepAlive =
settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
return new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(),
requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive));
}
};
Function<OpenSearchRequestBuilder, OpenSearchIndexScan> createScanOperator =
requestBuilder -> new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(),
requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive));
return new OpenSearchIndexScanBuilder(builder, createScanOperator);
}

private OpenSearchExprValueFactory createExprValueFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.opensearch.storage.scan;

import java.util.function.Function;
import lombok.EqualsAndHashCode;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
Expand All @@ -24,8 +25,9 @@
* by delegated builder internally. This is to avoid conditional check of different push down logic
* for non-aggregate and aggregate query everywhere.
*/
public abstract class OpenSearchIndexScanBuilder extends TableScanBuilder {
public class OpenSearchIndexScanBuilder extends TableScanBuilder {

private final Function<OpenSearchRequestBuilder, OpenSearchIndexScan> scanFactory;
/**
* Delegated index scan builder for non-aggregate or aggregate query.
*/
Expand All @@ -38,25 +40,27 @@ public abstract class OpenSearchIndexScanBuilder extends TableScanBuilder {
/**
* Constructor used during query execution.
*/
protected OpenSearchIndexScanBuilder(OpenSearchRequestBuilder requestBuilder) {
public OpenSearchIndexScanBuilder(OpenSearchRequestBuilder requestBuilder,
Function<OpenSearchRequestBuilder, OpenSearchIndexScan> scanFactory) {
this.delegate = new OpenSearchIndexScanQueryBuilder(requestBuilder);
this.scanFactory = scanFactory;

}

/**
* Constructor used for unit tests.
*/
protected OpenSearchIndexScanBuilder(PushDownQueryBuilder translator) {
protected OpenSearchIndexScanBuilder(PushDownQueryBuilder translator,
Function<OpenSearchRequestBuilder, OpenSearchIndexScan> scanFactory) {
this.delegate = translator;
this.scanFactory = scanFactory;
}

@Override
public TableScanOperator build() {
return createScan(delegate.build());
return scanFactory.apply(delegate.build());
}

protected abstract TableScanOperator createScan(OpenSearchRequestBuilder requestBuilder);

@Override
public boolean pushDownFilter(LogicalFilter filter) {
return delegate.pushDownFilter(filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.opensearch.sql.opensearch.storage.scan;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -86,7 +85,6 @@
import org.opensearch.sql.planner.optimizer.PushDownPageSize;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;

@ExtendWith(MockitoExtension.class)
class OpenSearchIndexScanOptimizationTest {
Expand All @@ -106,12 +104,7 @@ class OpenSearchIndexScanOptimizationTest {

@BeforeEach
void setUp() {
indexScanBuilder = new OpenSearchIndexScanBuilder(requestBuilder) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder build) {
return indexScan;
}
};
indexScanBuilder = new OpenSearchIndexScanBuilder(requestBuilder, requestBuilder -> indexScan);
when(table.createScanBuilder()).thenReturn(indexScanBuilder);
}

Expand Down Expand Up @@ -698,23 +691,15 @@ void project_literal_should_not_be_pushed_down() {

private OpenSearchIndexScanBuilder indexScanBuilder(Runnable... verifyPushDownCalls) {
this.verifyPushDownCalls = verifyPushDownCalls;
return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanQueryBuilder(requestBuilder)) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder build) {
return indexScan;
}
};
return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanQueryBuilder(requestBuilder),
requestBuilder -> indexScan);
}

private OpenSearchIndexScanBuilder indexScanAggBuilder(Runnable... verifyPushDownCalls) {
this.verifyPushDownCalls = verifyPushDownCalls;
return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanAggregationBuilder(
requestBuilder, mock(LogicalAggregation.class))) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder build) {
return indexScan;
}
};
var aggregationBuilder = new OpenSearchIndexScanAggregationBuilder(
requestBuilder, mock(LogicalAggregation.class));
return new OpenSearchIndexScanBuilder(aggregationBuilder, builder -> indexScan);
}

private void assertEqualsAfterOptimization(LogicalPlan expected, LogicalPlan actual) {
Expand Down

0 comments on commit 29f99aa

Please sign in to comment.