Skip to content

Commit

Permalink
solve the compatibility problem between UnionScan and PartitionTabel
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Apr 30, 2021
1 parent c506155 commit 8db518f
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 113 deletions.
13 changes: 0 additions & 13 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,19 +953,6 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
// Note that this function may be called by inner workers of index lookup join concurrently.
// Be careful to avoid data race.
func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor {
// Adjust UnionScan->PartitionTable->Reader
// to PartitionTable->UnionScan->Reader
// The build of UnionScan executor is delay to the nextPartition() function
// because the Reader executor is available there.
if x, ok := reader.(*PartitionTableExecutor); ok {
nextPartitionForReader := x.nextPartition
x.nextPartition = nextPartitionForUnionScan{
b: b,
us: v,
child: nextPartitionForReader,
}
return x
}
// If reader is union, it means a partition table and we should transfer as above.
if x, ok := reader.(*UnionExec); ok {
for i, child := range x.children {
Expand Down
100 changes: 0 additions & 100 deletions executor/partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,10 @@ import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tipb/go-tipb"
)

// PartitionTableExecutor is a Executor for partitioned table.
// It works by wrap the underlying TableReader/IndexReader/IndexLookUpReader.
type PartitionTableExecutor struct {
baseExecutor

nextPartition
partitions []table.PhysicalTable
cursor int
curr Executor
}

type nextPartition interface {
nextPartition(context.Context, table.PhysicalTable) (Executor, error)
}

type nextPartitionForUnionScan struct {
b *executorBuilder
us *plannercore.PhysicalUnionScan
child nextPartition
}

// nextPartition implements the nextPartition interface.
// For union scan on partitioned table, the executor should be PartitionTable->UnionScan->TableReader rather than
// UnionScan->PartitionTable->TableReader
func (n nextPartitionForUnionScan) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) {
childExec, err := n.child.nextPartition(ctx, tbl)
if err != nil {
return nil, err
}

n.b.err = nil
ret := n.b.buildUnionScanFromReader(childExec, n.us)
return ret, n.b.err
}

func nextPartitionWithTrace(ctx context.Context, n nextPartition, tbl table.PhysicalTable) (Executor, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("nextPartition %d", tbl.GetPhysicalID()), opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
return n.nextPartition(ctx, tbl)
}

func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID int64, recursive bool) error {
var child *tipb.Executor
switch exec.Tp {
Expand Down Expand Up @@ -105,55 +57,3 @@ func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID
}
return nil
}

// Open implements the Executor interface.
func (e *PartitionTableExecutor) Open(ctx context.Context) error {
e.cursor = 0
e.curr = nil
return nil
}

// Next implements the Executor interface.
func (e *PartitionTableExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
var err error
for e.cursor < len(e.partitions) {
if e.curr == nil {
n := e.nextPartition
e.curr, err = nextPartitionWithTrace(ctx, n, e.partitions[e.cursor])
if err != nil {
return err
}
if err := e.curr.Open(ctx); err != nil {
return err
}
}

err = Next(ctx, e.curr, chk)
if err != nil {
return err
}

if chk.NumRows() > 0 {
break
}

err = e.curr.Close()
if err != nil {
return err
}
e.curr = nil
e.cursor++
}
return nil
}

// Close implements the Executor interface.
func (e *PartitionTableExecutor) Close() error {
var err error
if e.curr != nil {
err = e.curr.Close()
e.curr = nil
}
return err
}

0 comments on commit 8db518f

Please sign in to comment.