Skip to content
This repository has been archived by the owner on Dec 28, 2017. It is now read-only.

Optimize DAG executor building logic with column pruning #203

Merged
merged 6 commits into from
Dec 27, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/main/java/com/pingcap/tikv/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,47 @@

import com.pingcap.tikv.catalog.Catalog;
import com.pingcap.tikv.expression.TiColumnRef;
import com.pingcap.tikv.expression.aggregate.Max;
import com.pingcap.tikv.expression.aggregate.Min;
import com.pingcap.tikv.expression.aggregate.Sum;

import com.pingcap.tikv.meta.TiDAGRequest;
import com.pingcap.tikv.meta.TiDAGRequest.PushDownType;
import com.pingcap.tikv.meta.TiDBInfo;
import com.pingcap.tikv.meta.TiTableInfo;
import com.pingcap.tikv.operation.SchemaInfer;
import com.pingcap.tikv.predicates.ScanBuilder;
import com.pingcap.tikv.row.Row;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Iterator;

import org.apache.log4j.Logger;

public class Main {
private static final Logger logger = Logger.getLogger(Main.class);

public static void main(String[] args) throws Exception {
TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379");
TiSession session = TiSession.create(conf);
Catalog cat = session.getCatalog();
TiDBInfo db = cat.getDatabase("test");
TiTableInfo table = cat.getTable(db, "t1");
TiDBInfo db = cat.getDatabase("tispark_test");
TiTableInfo table = cat.getTable(db, "full_data_type_table");
Snapshot snapshot = session.createSnapshot();
TiDAGRequest selectRequest = new TiDAGRequest(PushDownType.NORMAL);
ScanBuilder scanBuilder = new ScanBuilder();
ScanBuilder.ScanPlan scanPlan = scanBuilder.buildScan(new ArrayList<>(), table);
selectRequest.addRequiredColumn(TiColumnRef.create("c1", table))
.addRequiredColumn(TiColumnRef.create("c2", table))
.addRequiredColumn(TiColumnRef.create("s1", table))
.addRequiredColumn(TiColumnRef.create("f1", table))
TiColumnRef col = TiColumnRef.create("tp_tinyint", table);
TiColumnRef col2 = TiColumnRef.create("tp_int", table);
selectRequest
.addAggregate(new Sum(col))
.addAggregate(new Min(col))
.addAggregate(new Max(col))
.addRequiredColumn(col)
.setStartTs(snapshot.getVersion())
.addRanges(scanPlan.getKeyRanges())
.setTableInfo(table);
System.out.println(selectRequest);
Iterator<Row> it = snapshot.tableRead(selectRequest);
while (it.hasNext()) {
Row r = it.next();
Expand All @@ -43,6 +52,7 @@ public static void main(String[] args) throws Exception {
Object val = r.get(i, schemaInfer.getType(i));
System.out.print(val);
}
System.out.println();
}
session.close();
}
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/pingcap/tikv/expression/TiColumnRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.pingcap.tikv.types.IntegerType;

public class TiColumnRef implements TiExpr {
private long offset = -1;

public static TiColumnInfo getColumnWithName(String name, TiTableInfo table) {
TiColumnInfo columnInfo = null;
for (TiColumnInfo col : table.getColumns()) {
Expand Down Expand Up @@ -74,7 +76,10 @@ public Expr toProto() {
// After switching to DAG request mode, expression value
// should be the index of table columns we provided in
// the first executor of a DAG request.
IntegerType.writeLong(cdo, columnInfo.getOffset());
//
// If offset < 0, it's not a valid offset specified by
// user, use columnInfo instead
IntegerType.writeLong(cdo, offset < 0 ? columnInfo.getOffset() : offset);
builder.setVal(cdo.toByteString());
return builder.build();
}
Expand Down Expand Up @@ -116,6 +121,14 @@ public TiColumnInfo getColumnInfo() {
return columnInfo;
}

public void setOffset(long offset) {
this.offset = offset;
}

public long getOffset() {
return offset;
}

@Override
public boolean equals(Object another) {
if (this == another) {
Expand Down
52 changes: 46 additions & 6 deletions src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.pingcap.tikv.expression.TiByItem;
import com.pingcap.tikv.expression.TiColumnRef;
import com.pingcap.tikv.expression.TiExpr;
import com.pingcap.tikv.expression.TiFunctionExpression;
import com.pingcap.tikv.kvproto.Coprocessor;
import com.pingcap.tikv.types.DataType;
import com.pingcap.tikv.util.KeyRangeUtils;
Expand Down Expand Up @@ -168,7 +169,7 @@ private DAGRequest buildIndexScan() {
dagRequestBuilder.addExecutors(executorBuilder.setIdxScan(indexScanBuilder).build());
int colCount = indexScanBuilder.getColumnsCount();
dagRequestBuilder.addOutputOffsets(
colCount != 0 ? colCount - 1 : 0
colCount != 0 ? colCount - 1 : 0
);
return dagRequestBuilder
.setFlags(flags)
Expand All @@ -178,7 +179,7 @@ private DAGRequest buildIndexScan() {
}

/**
* @return
* @return DAGRequest built
*/
private DAGRequest buildTableScan() {
checkArgument(startTs != 0, "timestamp is 0");
Expand All @@ -187,7 +188,11 @@ private DAGRequest buildTableScan() {
TableScan.Builder tblScanBuilder = TableScan.newBuilder();

// Step1. Add columns to first executor
tableInfo.getColumns().forEach(tiColumnInfo -> tblScanBuilder.addColumns(tiColumnInfo.toProto(tableInfo)));
getFields().forEach(tiColumnInfo ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

tblScanBuilder.addColumns(
tiColumnInfo.getColumnInfo().toProto(tableInfo)
)
);
executorBuilder.setTp(ExecType.TypeTableScan);
tblScanBuilder.setTableId(tableInfo.getId());
// Currently, according to TiKV's implementation, if handle
Expand All @@ -210,7 +215,10 @@ private DAGRequest buildTableScan() {
// DO NOT EDIT EXPRESSION CONSTRUCTION ORDER
// Or make sure the construction order is below:
// TableScan/IndexScan > Selection > Aggregation > TopN/Limit
TiExpr whereExpr = mergeCNFExpressions(getWhere());
TiExpr whereExpr = mergeCNFExpressions(
getWhere().stream().peek(this::setColumnOffsets).collect(Collectors.toList())
);

if (whereExpr != null) {
executorBuilder.setTp(ExecType.TypeSelection);
dagRequestBuilder.addExecutors(
Expand All @@ -223,7 +231,9 @@ private DAGRequest buildTableScan() {

if (!getGroupByItems().isEmpty() || !getAggregates().isEmpty()) {
Aggregation.Builder aggregationBuilder = Aggregation.newBuilder();
getGroupByItems().stream().map(TiByItem::getExpr).forEach(this::setColumnOffsets);
getGroupByItems().forEach(tiByItem -> aggregationBuilder.addGroupBy(tiByItem.getExpr().toProto()));
getAggregates().forEach(this::setColumnOffsets);
getAggregates().forEach(tiExpr -> aggregationBuilder.addAggFunc(tiExpr.toProto()));
executorBuilder.setTp(ExecType.TypeAggregation);
dagRequestBuilder.addExecutors(
Expand All @@ -234,6 +244,7 @@ private DAGRequest buildTableScan() {

if (!getOrderByItems().isEmpty()) {
TopN.Builder topNBuilder = TopN.newBuilder();
getOrderByItems().stream().map(TiByItem::getExpr).forEach(this::setColumnOffsets);
getOrderByItems().forEach(tiByItem -> topNBuilder.addOrderBy(tiByItem.toProto()));
executorBuilder.setTp(ExecType.TypeTopN);
topNBuilder.setLimit(getLimit());
Expand All @@ -247,7 +258,10 @@ private DAGRequest buildTableScan() {
executorBuilder.clear();
}

getFields().forEach(tiColumnInfo -> dagRequestBuilder.addOutputOffsets(tiColumnInfo.getColumnInfo().getOffset()));
// column offset should be in accordance with the
for (int i = 0; i < getFields().size(); i++) {
dagRequestBuilder.addOutputOffsets(i);
}
// if handle is needed, we should append one output offset
if (isHandleNeeded()) {
dagRequestBuilder.addOutputOffsets(tableInfo.getColumns().size());
Expand All @@ -264,13 +278,38 @@ private DAGRequest buildTableScan() {
return request;
}

private void setColumnOffsets(TiExpr expr) {
Copy link
Contributor

@zhexuany zhexuany Dec 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by seting column offset, tikv can take advantage of it. But my concern is where is the logic in tidb side, I just want to make sure this logic can do the trick.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may lie in TiDB's schema(may hard to find in source code...) or TiKV-Client developers' document.

if (expr instanceof TiFunctionExpression) {
((TiFunctionExpression) expr).getArgs().forEach(
this::setColumnOffsets
);
} else if (expr instanceof TiColumnRef) {
TiColumnRef columnRef = (TiColumnRef) expr;
long targetId = columnRef.getColumnInfo().getId();
int pos = 0;
// Set offset of each Column according to the ordering
// of fields.
for (TiColumnRef col : getFields()) {
if (col.getColumnInfo().getId() == targetId) {
break;
}
pos++;
}

if (getFields().size() == pos) {
throw new DAGRequestException("No column match id:" + targetId);
}
columnRef.setOffset(pos);
}
}

public boolean isIndexScan() {
return indexInfo != null;
}

/**
* Check if a DAG request is valid.
*
* <p>
* Note:
* When constructing a DAG request, a executor with an ExecType of higher priority
* should always be placed before those lower ones.
Expand Down Expand Up @@ -565,6 +604,7 @@ public void setHandleNeeded(boolean handleNeeded) {

/**
* Whether we use streaming processing to retrieve data
*
* @return push down type.
*/
public PushDownType getPushDownType() {
Expand Down