Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add post processing logic for aggregation query. (#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
penghuo authored Jan 22, 2020
1 parent 77efb0b commit 239908b
Show file tree
Hide file tree
Showing 51 changed files with 3,655 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.domain;

import com.amazon.opendistroforelasticsearch.sql.antlr.semantic.types.Type;
import com.amazon.opendistroforelasticsearch.sql.antlr.semantic.types.base.ESDataType;
import com.amazon.opendistroforelasticsearch.sql.antlr.semantic.types.special.Product;
import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* The definition of column type provider
*/
public class ColumnTypeProvider {
private final List<Schema.Type> typeList;

private static final Map<ESDataType, Schema.Type> TYPE_MAP =
new ImmutableMap.Builder<ESDataType, Schema.Type>()
.put(ESDataType.SHORT, Schema.Type.SHORT)
.put(ESDataType.LONG, Schema.Type.LONG)
.put(ESDataType.INTEGER, Schema.Type.INTEGER)
.put(ESDataType.FLOAT, Schema.Type.FLOAT)
.put(ESDataType.DOUBLE, Schema.Type.DOUBLE)
.put(ESDataType.KEYWORD, Schema.Type.KEYWORD)
.put(ESDataType.TEXT, Schema.Type.TEXT)
.put(ESDataType.STRING, Schema.Type.TEXT)
.put(ESDataType.DATE, Schema.Type.DATE)
.put(ESDataType.BOOLEAN, Schema.Type.BOOLEAN)
.put(ESDataType.UNKNOWN, Schema.Type.DOUBLE)
.build();
public static final Schema.Type COLUMN_DEFAULT_TYPE = Schema.Type.DOUBLE;

public ColumnTypeProvider(Type type) {
this.typeList = convertOutputColumnType(type);
}

public ColumnTypeProvider() {
this.typeList = new ArrayList<>();
}

/**
* Get the type of column by index.
*
* @param index column index.
* @return column type.
*/
public Schema.Type get(int index) {
if (typeList.isEmpty()) {
return COLUMN_DEFAULT_TYPE;
} else {
return typeList.get(index);
}
}

private List<Schema.Type> convertOutputColumnType(Type type) {
if (type instanceof Product) {
List<Type> types = ((Product) type).getTypes();
return types.stream().map(t -> convertType(t)).collect(Collectors.toList());
} else if (type instanceof ESDataType) {
return ImmutableList.of(convertType(type));
} else {
return ImmutableList.of(COLUMN_DEFAULT_TYPE);
}
}

private Schema.Type convertType(Type type) {
try {
return TYPE_MAP.getOrDefault(type, COLUMN_DEFAULT_TYPE);
} catch (Exception e) {
return COLUMN_DEFAULT_TYPE;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.domain;

import com.amazon.opendistroforelasticsearch.sql.executor.Format;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* The definition of QueryActionRequest.
*/
@Getter
@RequiredArgsConstructor
public class QueryActionRequest {
private final String sql;
private final ColumnTypeProvider typeProvider;
private final Format format;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException;
import com.amazon.opendistroforelasticsearch.sql.executor.join.ElasticJoinExecutor;
import com.amazon.opendistroforelasticsearch.sql.executor.multi.MultiRequestExecutorFactory;
import com.amazon.opendistroforelasticsearch.sql.executor.adapter.QueryPlanQueryAction;
import com.amazon.opendistroforelasticsearch.sql.executor.adapter.QueryPlanRequestBuilder;
import com.amazon.opendistroforelasticsearch.sql.expression.domain.BindingTuple;
import com.amazon.opendistroforelasticsearch.sql.query.AggregationQueryAction;
import com.amazon.opendistroforelasticsearch.sql.query.DefaultQueryAction;
import com.amazon.opendistroforelasticsearch.sql.query.DeleteQueryAction;
Expand All @@ -36,6 +39,7 @@
import org.elasticsearch.search.aggregations.Aggregations;

import java.io.IOException;
import java.util.List;

/**
* Created by Eliran on 3/10/2015.
Expand All @@ -60,6 +64,11 @@ public static Aggregations executeAggregationAction(AggregationQueryAction aggre
return ((SearchResponse) select.get()).getAggregations();
}

public static List<BindingTuple> executeQueryPlanQueryAction(QueryPlanQueryAction queryPlanQueryAction) {
QueryPlanRequestBuilder select = (QueryPlanRequestBuilder) queryPlanQueryAction.explain();
return select.execute();
}

public static ActionResponse executeShowQueryAction(ShowQueryAction showQueryAction) {
return showQueryAction.explain().get();
}
Expand Down Expand Up @@ -89,6 +98,9 @@ public static Object executeAnyAction(Client client, QueryAction queryAction)
if (queryAction instanceof AggregationQueryAction) {
return executeAggregationAction((AggregationQueryAction) queryAction);
}
if (queryAction instanceof QueryPlanQueryAction) {
return executeQueryPlanQueryAction((QueryPlanQueryAction) queryAction);
}
if (queryAction instanceof ShowQueryAction) {
return executeShowQueryAction((ShowQueryAction) queryAction);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.adapter;

import com.amazon.opendistroforelasticsearch.sql.query.QueryAction;
import com.amazon.opendistroforelasticsearch.sql.query.SqlElasticRequestBuilder;
import com.google.common.base.Strings;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* The definition of QueryPlan of QueryAction which works as the adapter to the current QueryAction framework.
*/
public class QueryPlanQueryAction extends QueryAction {
private final QueryPlanRequestBuilder requestBuilder;

public QueryPlanQueryAction(QueryPlanRequestBuilder requestBuilder) {
super(null, null);
this.requestBuilder = requestBuilder;
}

@Override
public SqlElasticRequestBuilder explain() {
return requestBuilder;
}

@Override
public Optional<List<String>> getFieldNames() {
List<String> fieldNames = ((QueryPlanRequestBuilder) requestBuilder).outputColumns()
.stream()
.map(node -> Strings.isNullOrEmpty(node.getAlias()) ? node.getName() : node.getAlias())
.collect(Collectors.toList());
return Optional.of(fieldNames);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.adapter;

import com.amazon.opendistroforelasticsearch.sql.expression.domain.BindingTuple;
import com.amazon.opendistroforelasticsearch.sql.query.planner.core.ColumnNode;
import com.amazon.opendistroforelasticsearch.sql.query.planner.core.BindingTupleQueryPlanner;
import com.amazon.opendistroforelasticsearch.sql.query.SqlElasticRequestBuilder;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;

import java.util.List;

/**
* The definition of QueryPlan SqlElasticRequestBuilder.
*/
@RequiredArgsConstructor
public class QueryPlanRequestBuilder implements SqlElasticRequestBuilder {
private final BindingTupleQueryPlanner queryPlanner;

public List<BindingTuple> execute() {
return queryPlanner.execute();
}

public List<ColumnNode> outputColumns() {
return queryPlanner.getColumnNodes();
}

@Override
public String explain() {
return queryPlanner.explain();
}

@Override
public ActionRequest request() {
throw new RuntimeException("unsupported operation");
}

@Override
public ActionResponse get() {
throw new RuntimeException("unsupported operation");
}

@Override
public ActionRequestBuilder getBuilder() {
throw new RuntimeException("unsupported operation");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistroforelasticsearch.sql.executor.csv;

import com.amazon.opendistroforelasticsearch.sql.expression.domain.BindingTuple;
import com.amazon.opendistroforelasticsearch.sql.expression.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.utils.Util;
import com.google.common.base.Joiner;
import org.elasticsearch.common.document.DocumentField;
Expand All @@ -38,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Created by Eliran on 27/12/2015.
Expand Down Expand Up @@ -84,6 +87,24 @@ public CSVResult extractResults(Object queryResult, boolean flat, String separat
return new CSVResult(headers, csvLines);

}
// Handle List<BindingTuple> result.
if (queryResult instanceof List) {
List<BindingTuple> bindingTuples = (List<BindingTuple>) queryResult;
List<String> csvLines = bindingTuples.stream().map(tuple -> {
Map<String, ExprValue> bindingMap = tuple.getBindingMap();
List<Object> rowValues = new ArrayList<>();
for (String fieldName : fieldNames) {
if (bindingMap.containsKey(fieldName)) {
rowValues.add(bindingMap.get(fieldName).value());
} else {
rowValues.add("");
}
}
return Joiner.on(separator).join(rowValues);
}).collect(Collectors.toList());

return new CSVResult(fieldNames, csvLines);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import com.amazon.opendistroforelasticsearch.sql.expression.domain.BindingTuple;
import com.amazon.opendistroforelasticsearch.sql.expression.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.query.planner.core.ColumnNode;
import com.google.common.annotations.VisibleForTesting;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* The definition of BindingTuple ResultSet.
*/
public class BindingTupleResultSet extends ResultSet {

public BindingTupleResultSet(List<ColumnNode> columnNodes, List<BindingTuple> bindingTuples) {
this.schema = buildSchema(columnNodes);
this.dataRows = buildDataRows(bindingTuples);
}

@VisibleForTesting
public static Schema buildSchema(List<ColumnNode> columnNodes) {
List<Schema.Column> columnList = columnNodes.stream()
.map(node -> new Schema.Column(
node.getName(),
node.getAlias(),
node.getType()))
.collect(Collectors.toList());
return new Schema("dummy", "dummy", columnList);
}

@VisibleForTesting
public static DataRows buildDataRows(List<BindingTuple> bindingTuples) {
List<DataRows.Row> rowList = bindingTuples.stream().map(tuple -> {
Map<String, ExprValue> bindingMap = tuple.getBindingMap();
Map<String, Object> rowMap = new HashMap<>();
for (String s : bindingMap.keySet()) {
rowMap.put(s, bindingMap.get(s).value());
}
return new DataRows.Row(rowMap);
}).collect(Collectors.toList());

return new DataRows(bindingTuples.size(), bindingTuples.size(), rowList);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public String execute(Client client, Map<String, String> params, QueryAction que

try {
Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction);
protocol = new Protocol(client, queryAction.getQueryStatement(), queryResult, format);
protocol = new Protocol(client, queryAction, queryResult, format);
} catch (Exception e) {
LOG.error("Error happened in pretty formatter", e);
protocol = new Protocol(e);
Expand Down
Loading

0 comments on commit 239908b

Please sign in to comment.