Skip to content

Commit

Permalink
executor: union scan refactor, introduce the the mem rows iterator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jun 27, 2023
1 parent ffb0654 commit 9d42922
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 16 deletions.
135 changes: 135 additions & 0 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *Inde
}
}

func (m *memIndexReader) getMemRowsIter(ctx context.Context) (memRowsIter, error) {
data, err := m.getMemRows(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return &defaultRowsIter{data: data}, nil
}

func (m *memIndexReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
defer tracing.StartRegion(ctx, "memIndexReader.getMemRows").End()
tps := make([]*types.FieldType, 0, len(m.index.Columns)+1)
Expand Down Expand Up @@ -229,6 +237,99 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, kvRanges []kv.K
}
}

type txnMemBufferIter struct {
*memTableReader
txn kv.Transaction
idx int
curr kv.Iterator
}

func (iter *txnMemBufferIter) Next() ([]types.Datum, error) {
var ret []types.Datum
for iter.idx < len(iter.kvRanges) {
if iter.curr == nil {
rg := iter.kvRanges[iter.idx]
tmp := iter.txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
snapCacheIter, err := getSnapIter(iter.ctx, iter.cacheTable, rg)
if err != nil {
return nil, err
}
if snapCacheIter != nil {
tmp, err = transaction.NewUnionIter(tmp, snapCacheIter, false)
if err != nil {
return nil, err
}
}
iter.curr = tmp
} else {
var err error
ret, err = iter.next()
if err != nil {
return nil, errors.Trace(err)
}
if ret != nil {
break
}
iter.idx++
iter.curr = nil
}
}
return ret, nil
}

func (iter *txnMemBufferIter) next() ([]types.Datum, error) {
var err error
curr := iter.curr
for ; err == nil && curr.Valid(); err = curr.Next() {
// check whether the key was been deleted.
if len(curr.Value()) == 0 {
continue
}

mutableRow := chunk.MutRowFromTypes(iter.retFieldTypes)
resultRows := make([]types.Datum, len(iter.columns))
resultRows, err = iter.decodeRecordKeyValue(curr.Key(), curr.Value(), &resultRows)
if err != nil {
return nil, errors.Trace(err)
}

mutableRow.SetDatums(resultRows...)
matched, _, err := expression.EvalBool(iter.ctx, iter.conditions, mutableRow.ToRow())
if err != nil {
return nil, errors.Trace(err)
}
if !matched {
continue
}
return resultRows, curr.Next()
}
return nil, err
}

func (m *memTableReader) getMemRowsIter(ctx context.Context) (memRowsIter, error) {
if !m.desc {
m.offsets = make([]int, len(m.columns))
for i, col := range m.columns {
m.offsets[i] = m.colIDs[col.ID]
}
txn, err := m.ctx.Txn(true)
if err != nil {
return nil, err
}

return &txnMemBufferIter{
memTableReader: m,
txn: txn,
}, nil
}

data, err := m.getMemRows(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return &defaultRowsIter{data: data}, nil
}

// TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row.
func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
defer tracing.StartRegion(ctx, "memTableReader.getMemRows").End()
Expand Down Expand Up @@ -515,6 +616,14 @@ func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUp
}
}

func (m *memIndexLookUpReader) getMemRowsIter(ctx context.Context) (memRowsIter, error) {
data, err := m.getMemRows(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return &defaultRowsIter{data: data}, nil
}

func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
r, ctx := tracing.StartRegionEx(ctx, "memIndexLookUpReader.getMemRows")
defer r.End()
Expand Down Expand Up @@ -643,6 +752,32 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
}
}

type memRowsIter interface {
Next() ([]types.Datum, error)
}

type defaultRowsIter struct {
data [][]types.Datum
cursor int
}

func (iter *defaultRowsIter) Next() ([]types.Datum, error) {
if iter.cursor < len(iter.data) {
ret := iter.data[iter.cursor]
iter.cursor++
return ret, nil
}
return nil, nil
}

func (m *memIndexMergeReader) getMemRowsIter(ctx context.Context) (memRowsIter, error) {
data, err := m.getMemRows(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return &defaultRowsIter{data: data}, nil
}

func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum, error) {
r, ctx := tracing.StartRegionEx(ctx, "memIndexMergeReader.getMemRows")
defer r.End()
Expand Down
37 changes: 21 additions & 16 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type UnionScanExec struct {
// belowHandleCols is the handle's position of the below scan plan.
belowHandleCols plannercore.HandleCols

addedRows [][]types.Datum
cursor4AddRows int
addedRowsIter memRowsIter
cursor4AddRows []types.Datum
snapshotRows [][]types.Datum
cursor4SnapshotRows int
snapshotChunkBuffer *chunk.Chunk
Expand Down Expand Up @@ -114,15 +114,15 @@ func (us *UnionScanExec) open(ctx context.Context) error {
// 2. build virtual columns and select with virtual columns
switch x := reader.(type) {
case *TableReaderExecutor:
us.addedRows, err = buildMemTableReader(ctx, us, x.kvRanges).getMemRows(ctx)
us.addedRowsIter, err = buildMemTableReader(ctx, us, x.kvRanges).getMemRowsIter(ctx)
case *IndexReaderExecutor:
us.addedRows, err = buildMemIndexReader(ctx, us, x).getMemRows(ctx)
us.addedRowsIter, err = buildMemIndexReader(ctx, us, x).getMemRowsIter(ctx)
case *IndexLookUpExecutor:
us.addedRows, err = buildMemIndexLookUpReader(ctx, us, x).getMemRows(ctx)
us.addedRowsIter, err = buildMemIndexLookUpReader(ctx, us, x).getMemRowsIter(ctx)
case *IndexMergeReaderExecutor:
us.addedRows, err = buildMemIndexMergeReader(ctx, us, x).getMemRows(ctx)
us.addedRowsIter, err = buildMemIndexMergeReader(ctx, us, x).getMemRowsIter(ctx)
case *MPPGather:
us.addedRows, err = buildMemTableReader(ctx, us, x.kvRanges).getMemRows(ctx)
us.addedRowsIter, err = buildMemTableReader(ctx, us, x.kvRanges).getMemRowsIter(ctx)
default:
err = fmt.Errorf("unexpected union scan children:%T", reader)
}
Expand Down Expand Up @@ -185,9 +185,8 @@ func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (us *UnionScanExec) Close() error {
us.cursor4AddRows = 0
us.cursor4AddRows = nil
us.cursor4SnapshotRows = 0
us.addedRows = us.addedRows[:0]
us.snapshotRows = us.snapshotRows[:0]
return us.children[0].Close()
}
Expand All @@ -198,7 +197,10 @@ func (us *UnionScanExec) getOneRow(ctx context.Context) ([]types.Datum, error) {
if err != nil {
return nil, err
}
addedRow := us.getAddedRow()
addedRow, err := us.getAddedRow()
if err != nil {
return nil, err
}

var row []types.Datum
var isSnapshotRow bool
Expand All @@ -225,7 +227,7 @@ func (us *UnionScanExec) getOneRow(ctx context.Context) ([]types.Datum, error) {
if isSnapshotRow {
us.cursor4SnapshotRows++
} else {
us.cursor4AddRows++
us.cursor4AddRows = nil
}
return row, nil
}
Expand Down Expand Up @@ -271,12 +273,15 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
return us.snapshotRows[0], nil
}

func (us *UnionScanExec) getAddedRow() []types.Datum {
var addedRow []types.Datum
if us.cursor4AddRows < len(us.addedRows) {
addedRow = us.addedRows[us.cursor4AddRows]
func (us *UnionScanExec) getAddedRow() ([]types.Datum, error) {
if us.cursor4AddRows == nil {
var err error
us.cursor4AddRows, err = us.addedRowsIter.Next()
if err != nil {
return nil, err
}
}
return addedRow
return us.cursor4AddRows, nil
}

// shouldPickFirstRow picks the suitable row in order.
Expand Down

0 comments on commit 9d42922

Please sign in to comment.