Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix tiflash cannot find generated column #41261

Merged
merged 11 commits into from
Feb 13, 2023
21 changes: 21 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3409,8 +3409,29 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
startTS: startTs,
mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()},
memTracker: memory.NewTracker(v.ID(), -1),

// To fill virtual column.
columns: []*model.ColumnInfo{},
virtualColumnIndex: []int{},
virtualColumnRetFieldTypes: []*types.FieldType{},
}
var hasVirtualCol bool
for _, col := range v.Schema().Columns {
if col.VirtualExpr != nil {
hasVirtualCol = true
}
}
if hasVirtualCol {
ts, err := v.GetTableScan()
if err != nil {
b.err = err
return nil
}
gather.columns = ts.Columns
gather.virtualColumnIndex, gather.virtualColumnRetFieldTypes = buildVirtualColumnInfo(gather.Schema(), gather.columns)
}
gather.memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker)

return gather
}

Expand Down
16 changes: 15 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -66,6 +69,10 @@ type MPPGather struct {
respIter distsql.SelectResult

memTracker *memory.Tracker
columns []*model.ColumnInfo

virtualColumnIndex []int
virtualColumnRetFieldTypes []*types.FieldType
}

func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
Expand Down Expand Up @@ -159,7 +166,14 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// Next fills data into the chunk passed by its caller.
func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.respIter.Next(ctx, chk)
return errors.Trace(err)
if err != nil {
return err
}
err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.schema.Columns, e.columns, e.ctx, chk)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
return nil
}

// Close and release the used resources.
Expand Down
17 changes: 11 additions & 6 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,15 +457,20 @@ func buildVirtualColumnIndex(schema *expression.Schema, columns []*model.ColumnI
return virtualColumnIndex
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
func (e *TableReaderExecutor) buildVirtualColumnInfo() {
e.virtualColumnIndex = buildVirtualColumnIndex(e.Schema(), e.columns)
if len(e.virtualColumnIndex) > 0 {
e.virtualColumnRetFieldTypes = make([]*types.FieldType, len(e.virtualColumnIndex))
for i, idx := range e.virtualColumnIndex {
e.virtualColumnRetFieldTypes[i] = e.schema.Columns[idx].RetType
e.virtualColumnIndex, e.virtualColumnRetFieldTypes = buildVirtualColumnInfo(e.Schema(), e.columns)
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
func buildVirtualColumnInfo(schema *expression.Schema, columns []*model.ColumnInfo) (colIndexs []int, retTypes []*types.FieldType) {
colIndexs = buildVirtualColumnIndex(schema, columns)
if len(colIndexs) > 0 {
retTypes = make([]*types.FieldType, len(colIndexs))
for i, idx := range colIndexs {
retTypes[i] = schema.Columns[idx].RetType
}
}
return colIndexs, retTypes
}

type tableResultHandler struct {
Expand Down
35 changes: 35 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,3 +1425,38 @@ func TestMPPMemoryTracker(t *testing.T) {
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
}

func TestDisaggregatedTiFlashGenColumn(t *testing.T) {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tt1;")
tk.MustExec("create table tt1(c0 int, c1 varchar(100), c2 varchar(100) AS (lower(c1))) partition by hash(c0) partitions 2;")
tk.MustExec("insert into tt1(c0, c1) values(1, 'ABC'), (2, 'DEF');")
tk.MustExec("alter table tt1 set tiflash replica 1;")
tb := external.GetTableByName(t, tk, "test", "tt1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustQuery("explain select sum(c0) from tt1;").Check(testkit.Rows(
"HashAgg_15 1.00 root funcs:sum(Column#6)->Column#5",
"└─PartitionUnion_16 2.00 root ",
" ├─HashAgg_34 1.00 root funcs:sum(Column#8)->Column#6",
" │ └─TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35",
" │ └─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough",
" │ └─HashAgg_21 1.00 mpp[tiflash] funcs:sum(Column#15)->Column#8",
" │ └─Projection_82 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#15",
" │ └─TableFullScan_33 10000.00 mpp[tiflash] table:tt1, partition:p0 keep order:false, stats:pseudo",
" └─HashAgg_62 1.00 root funcs:sum(Column#11)->Column#6",
" └─TableReader_64 1.00 root MppVersion: 1, data:ExchangeSender_63",
" └─ExchangeSender_63 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashAgg_49 1.00 mpp[tiflash] funcs:sum(Column#16)->Column#11",
" └─Projection_83 10000.00 mpp[tiflash] cast(test.tt1.c0, decimal(10,0) BINARY)->Column#16",
" └─TableFullScan_61 10000.00 mpp[tiflash] table:tt1, partition:p1 keep order:false, stats:pseudo"))
}
16 changes: 14 additions & 2 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3771,19 +3771,31 @@ func TestShardIndexOnTiFlash(t *testing.T) {
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_enforce_mpp = 1")
rows := tk.MustQuery("explain select max(b) from t").Rows()
var hasTableFullScan bool
for _, row := range rows {
line := fmt.Sprintf("%v", row)
require.NotContains(t, line, "tiflash")
if strings.Contains(line, "TableFullScan") {
hasTableFullScan = true
require.Contains(t, line, "mpp[tiflash]")
}
}
require.True(t, hasTableFullScan)

tk.MustExec("set @@session.tidb_enforce_mpp = 0")
tk.MustExec("set @@session.tidb_allow_mpp = 0")
rows = tk.MustQuery("explain select max(b) from t").Rows()
hasTableFullScan = false
for _, row := range rows {
line := fmt.Sprintf("%v", row)
require.NotContains(t, line, "tiflash")
if strings.Contains(line, "TableFullScan") {
hasTableFullScan = true
require.Contains(t, line, "tiflash")
}
}
require.True(t, hasTableFullScan)
}

func TestExprPushdownBlacklist(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions parser/mysql/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
PreventNullInsertFlag uint = 1 << 20 /* Prevent this Field from inserting NULL values */
EnumSetAsIntFlag uint = 1 << 21 /* Internal: Used for inferring enum eval type. */
DropColumnIndexFlag uint = 1 << 22 /* Internal: Used for indicate the column is being dropped with index */
GeneratedColumnFlag uint = 1 << 23 /* Internal: TiFlash will check this flag and add a placeholder for this column */
)

// TypeInt24 bounds.
Expand Down
24 changes: 11 additions & 13 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,15 +2001,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
return invalidTask, nil
}
if ts.StoreType == kv.TiFlash {
for _, col := range ts.schema.Columns {
// In theory, TiFlash does not support virtual expr, but in non-mpp mode, if the cop request only contain table scan, then
// TiDB will fill the virtual column after decoding the cop response(executor.FillVirtualColumnValue), that is to say, the virtual
// columns in Cop request is just a placeholder, so TiFlash can support virtual column in cop request mode. However, virtual column
// with TiDBShard is special, it can be added using create index statement, TiFlash's ddl does not handle create index statement, so
// there is a chance that the TiDBShard's virtual column is not seen by TiFlash, in this case, TiFlash will throw column not found error
if ds.containExprPrefixUk && expression.GcColumnExprIsTidbShard(col.VirtualExpr) {
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
return invalidTask, nil
for _, col := range ts.Columns {
if col.IsGenerated() && !col.GeneratedStored {
col.AddFlag(mysql.GeneratedColumnFlag)
}
}
}
Expand All @@ -2028,10 +2022,14 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return invalidTask, nil
}
for _, col := range ts.schema.Columns {
if col.VirtualExpr != nil {
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
return invalidTask, nil
if !isDisaggregatedTiFlash || !canMppConvertToRootForDisaggregatedTiFlash {
// Normally, cannot generate mppTask when got virtual column.
// But in disaggregated tiflash mode, we can convert mppTask of TableScan to rootTask directly.
for _, col := range ts.schema.Columns {
if col.VirtualExpr != nil {
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
return invalidTask, nil
}
}
}
mppTask := &mppTask{
Expand Down
11 changes: 11 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,18 @@ func accumulateNetSeekCost4MPP(p PhysicalPlan) (cost float64) {
return
}

func tryExpandVirtualColumn(p PhysicalPlan) {
if ts, ok := p.(*PhysicalTableScan); ok {
ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns)
return
}
for _, child := range p.Children() {
tryExpandVirtualColumn(child)
}
}

func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
tryExpandVirtualColumn(t.p)
sender := PhysicalExchangeSender{
ExchangeType: tipb.ExchangeType_PassThrough,
}.Init(ctx, t.p.statsInfo())
Expand Down