From 8db518f34e27ac3f26bdf210ad3d84bfca03de0a Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Fri, 30 Apr 2021 10:48:36 +0800 Subject: [PATCH] solve the compatibility problem between UnionScan and PartitionTabel --- executor/builder.go | 13 ----- executor/partition_table.go | 100 ------------------------------------ 2 files changed, 113 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 45e5447f252c1..d940c8bbf4f0e 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -953,19 +953,6 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E // Note that this function may be called by inner workers of index lookup join concurrently. // Be careful to avoid data race. func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor { - // Adjust UnionScan->PartitionTable->Reader - // to PartitionTable->UnionScan->Reader - // The build of UnionScan executor is delay to the nextPartition() function - // because the Reader executor is available there. - if x, ok := reader.(*PartitionTableExecutor); ok { - nextPartitionForReader := x.nextPartition - x.nextPartition = nextPartitionForUnionScan{ - b: b, - us: v, - child: nextPartitionForReader, - } - return x - } // If reader is union, it means a partition table and we should transfer as above. if x, ok := reader.(*UnionExec); ok { for i, child := range x.children { diff --git a/executor/partition_table.go b/executor/partition_table.go index 3cebdec6c14e6..e888332fee396 100644 --- a/executor/partition_table.go +++ b/executor/partition_table.go @@ -17,58 +17,10 @@ import ( "context" "fmt" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - plannercore "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tipb/go-tipb" ) -// PartitionTableExecutor is a Executor for partitioned table. -// It works by wrap the underlying TableReader/IndexReader/IndexLookUpReader. -type PartitionTableExecutor struct { - baseExecutor - - nextPartition - partitions []table.PhysicalTable - cursor int - curr Executor -} - -type nextPartition interface { - nextPartition(context.Context, table.PhysicalTable) (Executor, error) -} - -type nextPartitionForUnionScan struct { - b *executorBuilder - us *plannercore.PhysicalUnionScan - child nextPartition -} - -// nextPartition implements the nextPartition interface. -// For union scan on partitioned table, the executor should be PartitionTable->UnionScan->TableReader rather than -// UnionScan->PartitionTable->TableReader -func (n nextPartitionForUnionScan) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) { - childExec, err := n.child.nextPartition(ctx, tbl) - if err != nil { - return nil, err - } - - n.b.err = nil - ret := n.b.buildUnionScanFromReader(childExec, n.us) - return ret, n.b.err -} - -func nextPartitionWithTrace(ctx context.Context, n nextPartition, tbl table.PhysicalTable) (Executor, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan(fmt.Sprintf("nextPartition %d", tbl.GetPhysicalID()), opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - return n.nextPartition(ctx, tbl) -} - func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID int64, recursive bool) error { var child *tipb.Executor switch exec.Tp { @@ -105,55 +57,3 @@ func updateExecutorTableID(ctx context.Context, exec *tipb.Executor, partitionID } return nil } - -// Open implements the Executor interface. -func (e *PartitionTableExecutor) Open(ctx context.Context) error { - e.cursor = 0 - e.curr = nil - return nil -} - -// Next implements the Executor interface. -func (e *PartitionTableExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() - var err error - for e.cursor < len(e.partitions) { - if e.curr == nil { - n := e.nextPartition - e.curr, err = nextPartitionWithTrace(ctx, n, e.partitions[e.cursor]) - if err != nil { - return err - } - if err := e.curr.Open(ctx); err != nil { - return err - } - } - - err = Next(ctx, e.curr, chk) - if err != nil { - return err - } - - if chk.NumRows() > 0 { - break - } - - err = e.curr.Close() - if err != nil { - return err - } - e.curr = nil - e.cursor++ - } - return nil -} - -// Close implements the Executor interface. -func (e *PartitionTableExecutor) Close() error { - var err error - if e.curr != nil { - err = e.curr.Close() - e.curr = nil - } - return err -}