diff --git a/.bazelrc b/.bazelrc index 3735abb9e09c1..32ebe61fc8dca 100644 --- a/.bazelrc +++ b/.bazelrc @@ -7,6 +7,7 @@ build:release --config=ci build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution build:ci --remote_cache=http://172.16.4.3:8084/tidb test:ci --verbose_failures +test:ci --test_timeout=180 test:ci --test_env=GO_TEST_WRAP_TESTV=1 test:ci --remote_cache=http://172.16.4.3:8084/tidb test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600 diff --git a/Makefile b/Makefile index 0799b8cb17055..ed0615474d101 100644 --- a/Makefile +++ b/Makefile @@ -430,13 +430,13 @@ bazel_prepare: bazel_test: failpoint-enable bazel_ci_prepare bazel --output_user_root=/home/jenkins/.tidb/tmp test --config=ci \ -- //... -//cmd/... -//tests/graceshutdown/... \ - -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/lightning/log:log_test + -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test bazel_coverage_test: failpoint-enable bazel_ci_prepare bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover \ -- //... -//cmd/... -//tests/graceshutdown/... \ - -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/lightning/log:log_test + -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test bazel_build: bazel_ci_prepare mkdir -p bin diff --git a/br/pkg/lightning/log/BUILD.bazel b/br/pkg/lightning/log/BUILD.bazel index 53d93581f1627..3e3865487f3e0 100644 --- a/br/pkg/lightning/log/BUILD.bazel +++ b/br/pkg/lightning/log/BUILD.bazel @@ -27,6 +27,7 @@ go_test( "filter_test.go", "log_test.go", ], + importpath = "github.com/pingcap/tidb/br/pkg/lightning/log_test", deps = [ ":log", "@com_github_stretchr_testify//require", diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 12291d7fef005..334babfd6d85c 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -92,5 +92,6 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//keepalive", ], ) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index d257eb868fb55..7d8b717f07e0d 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -932,7 +932,17 @@ func RunStreamRestore( g glue.Glue, cmdName string, cfg *RestoreConfig, -) error { +) (err error) { + startTime := time.Now() + defer func() { + dur := time.Since(startTime) + if err != nil { + summary.Log(cmdName+" failed summary", zap.Error(err)) + } else { + summary.Log(cmdName+" success summary", zap.Duration("total-take", dur), + zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS)) + } + }() ctx, cancelFn := context.WithCancel(c) defer cancelFn() diff --git a/ddl/cancel_test.go b/ddl/cancel_test.go index c217ef91f8885..b0b9f580477f6 100644 --- a/ddl/cancel_test.go +++ b/ddl/cancel_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" atomicutil "go.uber.org/atomic" ) @@ -32,12 +33,41 @@ import ( type testCancelJob struct { sql string ok bool - cancelState model.SchemaState + cancelState interface{} // model.SchemaState | []model.SchemaState onJobBefore bool onJobUpdate bool prepareSQL []string } +type subStates = []model.SchemaState + +func testMatchCancelState(t *testing.T, job *model.Job, cancelState interface{}, sql string) bool { + switch v := cancelState.(type) { + case model.SchemaState: + if job.Type == model.ActionMultiSchemaChange { + msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v) + assert.Failf(t, msg, "use []model.SchemaState as cancel states instead") + return false + } + return job.SchemaState == v + case subStates: // For multi-schema change sub-jobs. + if job.MultiSchemaInfo == nil { + msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v) + assert.Failf(t, msg, "use model.SchemaState as the cancel state instead") + return false + } + assert.Equal(t, len(job.MultiSchemaInfo.SubJobs), len(v), sql) + for i, subJobSchemaState := range v { + if job.MultiSchemaInfo.SubJobs[i].SchemaState != subJobSchemaState { + return false + } + } + return true + default: + return false + } +} + var allTestCase = []testCancelJob{ // Add index. {"create unique index c3_index on t_partition (c1)", true, model.StateWriteReorganization, true, true, nil}, @@ -246,7 +276,7 @@ func TestCancel(t *testing.T) { cancelWhenReorgNotStart := false hookFunc := func(job *model.Job) { - if job.SchemaState == allTestCase[i.Load()].cancelState && !cancel { + if testMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel { if !cancelWhenReorgNotStart && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 { return } diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index bd9afd6a5a0b7..44b44c8e24140 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/mock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -63,7 +64,6 @@ func TestColumnAdd(t *testing.T) { var jobID int64 tc.OnJobUpdatedExported = func(job *model.Job) { jobID = job.ID - require.NoError(t, dom.Reload()) tbl, exist := dom.InfoSchema().TableByID(job.TableID) require.True(t, exist) switch job.SchemaState { @@ -96,11 +96,14 @@ func TestColumnAdd(t *testing.T) { } } tc.OnJobUpdatedExported = func(job *model.Job) { + if job.NotStarted() { + return + } jobID = job.ID tbl := external.GetTableByName(t, internal, "test", "t") if job.SchemaState != model.StatePublic { for _, col := range tbl.Cols() { - require.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped") + assert.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped") } } } @@ -224,7 +227,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t return errors.Trace(err) } err = checkResult(ctx, writeOnlyTable, writeOnlyTable.WritableCols(), [][]string{ - {"1", "2", ""}, + {"1", "2", "3"}, {"2", "3", "3"}, }) if err != nil { @@ -236,7 +239,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t return errors.Trace(err) } got := fmt.Sprintf("%v", row) - expect := fmt.Sprintf("%v", []types.Datum{types.NewDatum(1), types.NewDatum(2), types.NewDatum(nil)}) + expect := fmt.Sprintf("%v", []types.Datum{types.NewDatum(1), types.NewDatum(2), types.NewDatum(3)}) if got != expect { return errors.Errorf("expect %v, got %v", expect, got) } diff --git a/ddl/main_test.go b/ddl/main_test.go index 2db5d9668ca6d..895b9aeb1f9e0 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -40,6 +40,7 @@ func TestMain(m *testing.M) { autoid.SetStep(5000) ddl.ReorgWaitTimeout = 30 * time.Millisecond + ddl.RunInGoTest = true ddl.SetBatchInsertDeleteRangeSize(2) config.UpdateGlobal(func(conf *config.Config) { diff --git a/distsql/distsql.go b/distsql/distsql.go index 7840f5470cd0e..6a502b58478b6 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -75,7 +75,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie hook.(func(*kv.Request))(kvReq) } - kvReq.Streaming = false enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL eventCb := func(event trxevents.TransactionEvent) { @@ -111,20 +110,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie } // kvReq.MemTracker is used to trace and control memory usage in DistSQL layer; - // for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it; // for selectResult, we just use the kvReq.MemTracker prepared for co-processor // instead of creating a new one for simplification. - if kvReq.Streaming { - return &streamResult{ - label: "dag-stream", - sqlType: label, - resp: resp, - rowLen: len(fieldTypes), - fieldTypes: fieldTypes, - ctx: sctx, - feedback: fb, - }, nil - } return &selectResult{ label: "dag", resp: resp, diff --git a/distsql/request_builder.go b/distsql/request_builder.go index d7a7f2a267492..c4840ca8741a3 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -270,12 +270,6 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req return builder } -// SetStreaming sets "Streaming" flag for "kv.Request". -func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder { - builder.Request.Streaming = streaming - return builder -} - // SetPaging sets "Paging" flag for "kv.Request". func (builder *RequestBuilder) SetPaging(paging bool) *RequestBuilder { builder.Request.Paging = paging diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 3a2c7e60b200f..8baf74422b8a9 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -270,7 +270,6 @@ func TestRequestBuilder1(t *testing.T) { IsolationLevel: 0, Priority: 0, NotFillCache: false, - Streaming: false, ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } @@ -352,7 +351,6 @@ func TestRequestBuilder2(t *testing.T) { IsolationLevel: 0, Priority: 0, NotFillCache: false, - Streaming: false, ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } @@ -400,7 +398,6 @@ func TestRequestBuilder3(t *testing.T) { IsolationLevel: 0, Priority: 0, NotFillCache: false, - Streaming: false, ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, } @@ -432,7 +429,6 @@ func TestRequestBuilder4(t *testing.T) { SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetStreaming(true). SetFromSessionVars(variable.NewSessionVars()). Build() require.NoError(t, err) @@ -447,7 +443,6 @@ func TestRequestBuilder4(t *testing.T) { Concurrency: variable.DefDistSQLScanConcurrency, IsolationLevel: 0, Priority: 0, - Streaming: true, NotFillCache: false, ReplicaRead: kv.ReplicaReadLeader, ReadReplicaScope: kv.GlobalReplicaScope, @@ -493,7 +488,6 @@ func TestRequestBuilder5(t *testing.T) { IsolationLevel: kv.RC, Priority: 1, NotFillCache: true, - Streaming: false, ReadReplicaScope: kv.GlobalReplicaScope, } require.Equal(t, expect, actual) @@ -523,7 +517,6 @@ func TestRequestBuilder6(t *testing.T) { IsolationLevel: 0, Priority: 0, NotFillCache: true, - Streaming: false, ReadReplicaScope: kv.GlobalReplicaScope, } require.Equal(t, expect, actual) @@ -559,7 +552,6 @@ func TestRequestBuilder7(t *testing.T) { IsolationLevel: 0, Priority: 0, NotFillCache: false, - Streaming: false, ReplicaRead: replicaRead.replicaReadType, ReadReplicaScope: kv.GlobalReplicaScope, } diff --git a/distsql/select_result.go b/distsql/select_result.go index 645ae3d175e9f..58d0abf49d04c 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -60,7 +60,6 @@ var ( var ( _ SelectResult = (*selectResult)(nil) - _ SelectResult = (*streamResult)(nil) _ SelectResult = (*serialSelectResults)(nil) ) diff --git a/distsql/stream.go b/distsql/stream.go deleted file mode 100644 index 526b2693de54d..0000000000000 --- a/distsql/stream.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package distsql - -import ( - "context" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/dbterror" - "github.com/pingcap/tipb/go-tipb" -) - -// streamResult implements the SelectResult interface. -type streamResult struct { - label string - sqlType string - - resp kv.Response - rowLen int - fieldTypes []*types.FieldType - ctx sessionctx.Context - - // NOTE: curr == nil means stream finish, while len(curr.RowsData) == 0 doesn't. - curr *tipb.Chunk - partialCount int64 - feedback *statistics.QueryFeedback - - fetchDuration time.Duration - durationReported bool -} - -func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error { - chk.Reset() - for !chk.IsFull() { - err := r.readDataIfNecessary(ctx) - if err != nil { - return err - } - if r.curr == nil { - return nil - } - - err = r.flushToChunk(chk) - if err != nil { - return err - } - } - return nil -} - -// readDataFromResponse read the data to result. Returns true means the resp is finished. -func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Response, result *tipb.Chunk) (bool, error) { - startTime := time.Now() - resultSubset, err := resp.Next(ctx) - duration := time.Since(startTime) - r.fetchDuration += duration - if err != nil { - return false, err - } - if resultSubset == nil { - if !r.durationReported { - // TODO: Add a label to distinguish between success or failure. - // https://github.com/pingcap/tidb/issues/11397 - metrics.DistSQLQueryHistogram.WithLabelValues(r.label, r.sqlType, "streaming").Observe(r.fetchDuration.Seconds()) - r.durationReported = true - } - return true, nil - } - - var stream tipb.StreamResponse - err = stream.Unmarshal(resultSubset.GetData()) - if err != nil { - return false, errors.Trace(err) - } - if stream.Error != nil { - return false, errors.Errorf("stream response error: [%d]%s", stream.Error.Code, stream.Error.Msg) - } - for _, warning := range stream.Warnings { - r.ctx.GetSessionVars().StmtCtx.AppendWarning(dbterror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) - } - - err = result.Unmarshal(stream.Data) - if err != nil { - return false, errors.Trace(err) - } - r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts, stream.Ndvs) - r.partialCount++ - - hasStats, ok := resultSubset.(CopRuntimeStats) - if ok { - copStats := hasStats.GetCopRuntimeStats() - if copStats != nil { - copStats.CopTime = duration - r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil) - } - } - return false, nil -} - -// readDataIfNecessary ensures there are some data in current chunk. If no more data, r.curr == nil. -func (r *streamResult) readDataIfNecessary(ctx context.Context) error { - if r.curr != nil && len(r.curr.RowsData) > 0 { - return nil - } - - tmp := new(tipb.Chunk) - finish, err := r.readDataFromResponse(ctx, r.resp, tmp) - if err != nil { - return err - } - if finish { - r.curr = nil - return nil - } - r.curr = tmp - return nil -} - -func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) { - remainRowsData := r.curr.RowsData - decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location()) - for !chk.IsFull() && len(remainRowsData) > 0 { - for i := 0; i < r.rowLen; i++ { - remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i]) - if err != nil { - return err - } - } - } - r.curr.RowsData = remainRowsData - if len(remainRowsData) == 0 { - r.curr = nil // Current chunk is finished. - } - return nil -} - -func (r *streamResult) NextRaw(ctx context.Context) ([]byte, error) { - r.partialCount++ - r.feedback.Invalidate() - resultSubset, err := r.resp.Next(ctx) - if resultSubset == nil || err != nil { - return nil, err - } - return resultSubset.GetData(), err -} - -func (r *streamResult) Close() error { - if r.feedback.Actual() > 0 { - metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual())) - } - metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount)) - if r.resp != nil { - return r.resp.Close() - } - return nil -} diff --git a/executor/builder.go b/executor/builder.go index d64d316681b83..9445565e1c26c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2791,20 +2791,16 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor { return e } -func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) { - streaming := true +func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) { executors := make([]*tipb.Executor, 0, len(plans)) for _, p := range plans { execPB, err := p.ToPB(sctx, kv.TiKV) if err != nil { - return nil, false, err - } - if !plannercore.SupportStreaming(p) { - streaming = false + return nil, err } executors = append(executors, execPB) } - return executors, streaming, nil + return executors, nil } // markChildrenUsedCols compares each child with the output schema, and mark @@ -2817,13 +2813,13 @@ func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expre return } -func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) { +func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) { execPB, err := p.ToPB(sctx, kv.TiFlash) - return []*tipb.Executor{execPB}, false, err + return []*tipb.Executor{execPB}, err } -func constructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, streaming bool, err error) { +func constructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, err error) { dagReq = &tipb.DAGRequest{} dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(ctx.GetSessionVars().Location()) sc := ctx.GetSessionVars().StmtCtx @@ -2834,14 +2830,14 @@ func constructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s dagReq.Flags = sc.PushDownFlags() if storeType == kv.TiFlash { var executors []*tipb.Executor - executors, streaming, err = constructDistExecForTiFlash(ctx, plans[0]) + executors, err = constructDistExecForTiFlash(ctx, plans[0]) dagReq.RootExecutor = executors[0] } else { - dagReq.Executors, streaming, err = constructDistExec(ctx, plans) + dagReq.Executors, err = constructDistExec(ctx, plans) } distsql.SetEncodeType(ctx, dagReq) - return dagReq, streaming, err + return dagReq, err } func (b *executorBuilder) corColInDistPlan(plans []plannercore.PhysicalPlan) bool { @@ -3152,7 +3148,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea if v.StoreType == kv.TiFlash { tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()} } - dagReq, streaming, err := constructDAGReq(b.ctx, tablePlans, v.StoreType) + dagReq, err := constructDAGReq(b.ctx, tablePlans, v.StoreType) if err != nil { return nil, err } @@ -3185,7 +3181,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea keepOrder: ts.KeepOrder, desc: ts.Desc, columns: ts.Columns, - streaming: streaming, paging: paging, corColInFilter: b.corColInDistPlan(v.TablePlans), corColInAccess: b.corColInAccess(v.TablePlans[0]), @@ -3438,7 +3433,7 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table } func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) { - dagReq, streaming, err := constructDAGReq(b.ctx, v.IndexPlans, kv.TiKV) + dagReq, err := constructDAGReq(b.ctx, v.IndexPlans, kv.TiKV) if err != nil { return nil, err } @@ -3468,7 +3463,6 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea keepOrder: is.KeepOrder, desc: is.Desc, columns: is.Columns, - streaming: streaming, paging: paging, corColInFilter: b.corColInDistPlan(v.IndexPlans), corColInAccess: b.corColInAccess(v.IndexPlans[0]), @@ -3552,10 +3546,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E return ret } -func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) { - tableReq, tableStreaming, err := constructDAGReq(b.ctx, plans, kv.TiKV) +func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, val table.Table, err error) { + tableReq, err := constructDAGReq(b.ctx, plans, kv.TiKV) if err != nil { - return nil, false, nil, err + return nil, nil, err } for i := 0; i < schemaLen; i++ { tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i)) @@ -3567,13 +3561,13 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic pt := tbl.(table.PartitionedTable) tbl = pt.GetPartition(physicalTableID) } - return tableReq, tableStreaming, tbl, err + return tableReq, tbl, err } -func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { - indexReq, indexStreaming, err := constructDAGReq(ctx, plans, kv.TiKV) +func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) { + indexReq, err := constructDAGReq(ctx, plans, kv.TiKV) if err != nil { - return nil, false, err + return nil, err } indexReq.OutputOffsets = []uint32{} for i := 0; i < handleLen; i++ { @@ -3582,7 +3576,7 @@ func buildIndexReq(ctx sessionctx.Context, schemaLen, handleLen int, plans []pla if len(indexReq.OutputOffsets) == 0 { indexReq.OutputOffsets = []uint32{uint32(schemaLen)} } - return indexReq, indexStreaming, err + return indexReq, err } func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) { @@ -3597,16 +3591,15 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn // Should output pid col. handleLen++ } - indexReq, indexStreaming, err := buildIndexReq(b.ctx, len(is.Index.Columns), handleLen, v.IndexPlans) + indexReq, err := buildIndexReq(b.ctx, len(is.Index.Columns), handleLen, v.IndexPlans) if err != nil { return nil, err } indexPaging := false if v.Paging { indexPaging = true - indexStreaming = false } - tableReq, tableStreaming, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) + tableReq, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) if err != nil { return nil, err } @@ -3631,8 +3624,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn desc: is.Desc, tableRequest: tableReq, columns: ts.Columns, - indexStreaming: indexStreaming, - tableStreaming: tableStreaming, indexPaging: indexPaging, dataReaderBuilder: readerBuilder, corColInIdxSide: b.corColInDistPlan(v.IndexPlans), @@ -3732,7 +3723,6 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalIndexMergeReader) (*IndexMergeReaderExecutor, error) { partialPlanCount := len(v.PartialPlans) partialReqs := make([]*tipb.DAGRequest, 0, partialPlanCount) - partialStreamings := make([]bool, 0, partialPlanCount) indexes := make([]*model.IndexInfo, 0, partialPlanCount) descs := make([]bool, 0, partialPlanCount) feedbacks := make([]*statistics.QueryFeedback, 0, partialPlanCount) @@ -3741,7 +3731,6 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd isCorColInPartialAccess := make([]bool, 0, partialPlanCount) for i := 0; i < partialPlanCount; i++ { var tempReq *tipb.DAGRequest - var tempStreaming bool var err error feedback := statistics.NewQueryFeedback(0, nil, 0, ts.Desc) @@ -3749,12 +3738,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd feedbacks = append(feedbacks, feedback) if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok { - tempReq, tempStreaming, err = buildIndexReq(b.ctx, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i]) + tempReq, err = buildIndexReq(b.ctx, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i]) descs = append(descs, is.Desc) indexes = append(indexes, is.Index) } else { ts := v.PartialPlans[i][0].(*plannercore.PhysicalTableScan) - tempReq, tempStreaming, _, err = buildTableReq(b, len(ts.Columns), v.PartialPlans[i]) + tempReq, _, err = buildTableReq(b, len(ts.Columns), v.PartialPlans[i]) descs = append(descs, ts.Desc) indexes = append(indexes, nil) } @@ -3764,11 +3753,10 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd collect := false tempReq.CollectRangeCounts = &collect partialReqs = append(partialReqs, tempReq) - partialStreamings = append(partialStreamings, tempStreaming) isCorColInPartialFilters = append(isCorColInPartialFilters, b.corColInDistPlan(v.PartialPlans[i])) isCorColInPartialAccess = append(isCorColInPartialAccess, b.corColInAccess(v.PartialPlans[i][0])) } - tableReq, tableStreaming, tblInfo, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) + tableReq, tblInfo, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) isCorColInTableFilter := b.corColInDistPlan(v.TablePlans) if err != nil { return nil, err @@ -3793,8 +3781,6 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd descs: descs, tableRequest: tableReq, columns: ts.Columns, - partialStreamings: partialStreamings, - tableStreaming: tableStreaming, partialPlans: v.PartialPlans, tblPlans: v.TablePlans, dataReaderBuilder: readerBuilder, @@ -4140,7 +4126,6 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T SetStartTS(startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetStreaming(e.streaming). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). SetFromSessionVars(e.ctx.GetSessionVars()). diff --git a/executor/distsql.go b/executor/distsql.go index bf7484a094656..d20e11ae65df4 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -180,9 +180,8 @@ type IndexReaderExecutor struct { // outputColumns are only required by union scan. outputColumns []*expression.Column - feedback *statistics.QueryFeedback - streaming bool - paging bool + feedback *statistics.QueryFeedback + paging bool keepOrder bool desc bool @@ -283,7 +282,7 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { - e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) + e.dagPB.Executors, err = constructDistExec(e.ctx, e.plans) if err != nil { return err } @@ -309,7 +308,6 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetStreaming(e.streaming). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). SetFromSessionVars(e.ctx.GetSessionVars()). @@ -374,9 +372,7 @@ type IndexLookUpExecutor struct { keepOrder bool desc bool - indexStreaming bool - tableStreaming bool - indexPaging bool + indexPaging bool corColInIdxSide bool corColInTblSide bool @@ -500,14 +496,14 @@ func (e *IndexLookUpExecutor) open(ctx context.Context) error { var err error if e.corColInIdxSide { - e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.idxPlans) + e.dagPB.Executors, err = constructDistExec(e.ctx, e.idxPlans) if err != nil { return err } } if e.corColInTblSide { - e.tableRequest.Executors, _, err = constructDistExec(e.ctx, e.tblPlans) + e.tableRequest.Executors, err = constructDistExec(e.ctx, e.tblPlans) if err != nil { return err } @@ -585,7 +581,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetStreaming(e.indexStreaming). SetPaging(e.indexPaging). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). @@ -688,7 +683,6 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup readReplicaScope: e.readReplicaScope, isStaleness: e.isStaleness, columns: e.columns, - streaming: e.tableStreaming, feedback: statistics.NewQueryFeedback(0, nil, 0, false), corColInFilter: e.corColInTblSide, plans: e.tblPlans, diff --git a/executor/executor_test.go b/executor/executor_test.go index 59bbdd3291744..c8e4304f1c22c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2069,42 +2069,6 @@ func TestCheckTableClusterIndex(t *testing.T) { tk.MustExec("admin check table admin_test;") } -func TestCoprocessorStreamingFlag(t *testing.T) { - store, clean := testkit.CreateMockStore(t) - defer clean() - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t (id int, value int, index idx(id))") - // Add some data to make statistics work. - for i := 0; i < 100; i++ { - tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i)) - } - - tests := []struct { - sql string - expect bool - }{ - {"select * from t", true}, // TableReader - {"select * from t where id = 5", true}, // IndexLookup - {"select * from t where id > 5", true}, // Filter - {"select * from t limit 3", false}, // Limit - {"select avg(id) from t", false}, // Aggregate - {"select * from t order by value limit 3", false}, // TopN - } - - ctx := context.Background() - for _, test := range tests { - ctx1 := context.WithValue(ctx, "CheckSelectRequestHook", func(req *kv.Request) { - comment := fmt.Sprintf("sql=%s, expect=%v, get=%v", test.sql, test.expect, req.Streaming) - require.Equal(t, test.expect, req.Streaming, comment) - }) - rs, err := tk.Session().Execute(ctx1, test.sql) - require.NoError(t, err) - tk.ResultSetToResult(rs[0], fmt.Sprintf("sql: %v", test.sql)) - } -} - func TestIncorrectLimitArg(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 4ce7f4f24a888..919aee6f15d86 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -75,9 +75,7 @@ type IndexMergeReaderExecutor struct { startTS uint64 tableRequest *tipb.DAGRequest // columns are only required by union scan. - columns []*model.ColumnInfo - partialStreamings []bool - tableStreaming bool + columns []*model.ColumnInfo *dataReaderBuilder // fields about accessing partition tables @@ -295,7 +293,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. var err error - if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { + if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { worker.syncErr(e.resultCh, err) return } @@ -306,7 +304,6 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, SetStartTS(e.startTS). SetDesc(e.descs[workID]). SetKeepOrder(false). - SetStreaming(e.partialStreamings[workID]). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). SetFromSessionVars(e.ctx.GetSessionVars()). @@ -388,7 +385,6 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, startTS: e.startTS, readReplicaScope: e.readReplicaScope, isStaleness: e.isStaleness, - streaming: e.partialStreamings[workID], feedback: statistics.NewQueryFeedback(0, nil, 0, false), plans: e.partialPlans[workID], ranges: e.ranges[workID], @@ -404,7 +400,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } if e.isCorColInPartialFilters[workID] { - if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { + if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { worker.syncErr(e.resultCh, err) return } @@ -609,13 +605,12 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tb startTS: e.startTS, readReplicaScope: e.readReplicaScope, isStaleness: e.isStaleness, - streaming: e.tableStreaming, columns: e.columns, feedback: statistics.NewQueryFeedback(0, nil, 0, false), plans: e.tblPlans, } if e.isCorColInTableFilter { - if tableReaderExec.dagPB.Executors, _, err = constructDistExec(e.ctx, e.tblPlans); err != nil { + if tableReaderExec.dagPB.Executors, err = constructDistExec(e.ctx, e.tblPlans); err != nil { return nil, err } } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 3a8ce366742b6..a9a6032d1f779 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -52,7 +52,7 @@ type MPPGather struct { } func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { - dagReq, _, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash) + dagReq, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash) if err != nil { return errors.Trace(err) } diff --git a/executor/set_test.go b/executor/set_test.go index e06ab579be68e..4f0151b6d2129 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -643,6 +643,15 @@ func TestSetVar(t *testing.T) { tk.MustQuery("select @@tidb_max_auto_analyze_time").Check(testkit.Rows("60")) tk.MustExec("set global tidb_max_auto_analyze_time = -1") tk.MustQuery("select @@tidb_max_auto_analyze_time").Check(testkit.Rows("0")) + + // test variables for cost model ver2 + tk.MustQuery("select @@tidb_cost_model_version").Check(testkit.Rows("1")) + tk.MustExec("set tidb_cost_model_version=3") + tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|1292|Truncated incorrect tidb_cost_model_version value: '3'")) + tk.MustExec("set tidb_cost_model_version=0") + tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|1292|Truncated incorrect tidb_cost_model_version value: '0'")) + tk.MustExec("set tidb_cost_model_version=2") + tk.MustQuery("select @@tidb_cost_model_version").Check(testkit.Rows("2")) } func TestTruncateIncorrectIntSessionVar(t *testing.T) { diff --git a/executor/simple_test.go b/executor/simple_test.go index 847d34668b8d1..b9464c53f59cc 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -773,16 +773,6 @@ func TestDropStats(t *testing.T) { h.SetLease(0) } -func TestFlushTables(t *testing.T) { - store, clean := testkit.CreateMockStore(t) - defer clean() - tk := testkit.NewTestKit(t, store) - - tk.MustExec("FLUSH TABLES") - err := tk.ExecToErr("FLUSH TABLES WITH READ LOCK") - require.Error(t, err) -} - func TestUseDB(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() diff --git a/executor/simpletest/BUILD.bazel b/executor/simpletest/BUILD.bazel new file mode 100644 index 0000000000000..d43780d70ec0a --- /dev/null +++ b/executor/simpletest/BUILD.bazel @@ -0,0 +1,17 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "simpletest_test", + srcs = [ + "main_test.go", + "simple_test.go", + ], + deps = [ + "//config", + "//meta/autoid", + "//testkit", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/executor/simpletest/main_test.go b/executor/simpletest/main_test.go new file mode 100644 index 0000000000000..cbb53ea24757f --- /dev/null +++ b/executor/simpletest/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package simpletest + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/executor/simpletest/simple_test.go b/executor/simpletest/simple_test.go new file mode 100644 index 0000000000000..9a6dec858f9c5 --- /dev/null +++ b/executor/simpletest/simple_test.go @@ -0,0 +1,32 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package simpletest + +import ( + "testing" + + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +func TestFlushTables(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + tk.MustExec("FLUSH TABLES") + err := tk.ExecToErr("FLUSH TABLES WITH READ LOCK") + require.Error(t, err) +} diff --git a/executor/table_reader.go b/executor/table_reader.go index 2f08a989d5816..30fedbd85737d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -96,7 +96,6 @@ type TableReaderExecutor struct { keepOrder bool desc bool - streaming bool paging bool storeType kv.StoreType // corColInFilter tells whether there's correlated column in filter. @@ -143,13 +142,13 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { var err error if e.corColInFilter { if e.storeType == kv.TiFlash { - execs, _, err := constructDistExecForTiFlash(e.ctx, e.tablePlan) + execs, err := constructDistExecForTiFlash(e.ctx, e.tablePlan) if err != nil { return err } e.dagPB.RootExecutor = execs[0] } else { - e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) + e.dagPB.Executors, err = constructDistExec(e.ctx, e.plans) if err != nil { return err } @@ -333,7 +332,6 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetStreaming(e.streaming). SetReadReplicaScope(e.readReplicaScope). SetFromSessionVars(e.ctx.GetSessionVars()). SetFromInfoSchema(e.ctx.GetInfoSchema()). @@ -372,7 +370,6 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetStreaming(e.streaming). SetReadReplicaScope(e.readReplicaScope). SetFromSessionVars(e.ctx.GetSessionVars()). SetFromInfoSchema(e.ctx.GetInfoSchema()). @@ -403,7 +400,6 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R SetStartTS(e.startTS). SetDesc(e.desc). SetKeepOrder(e.keepOrder). - SetStreaming(e.streaming). SetReadReplicaScope(e.readReplicaScope). SetIsStaleness(e.isStaleness). SetFromSessionVars(e.ctx.GetSessionVars()). diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go index 406723b8e51af..8382f460342e1 100644 --- a/executor/table_readers_required_rows_test.go +++ b/executor/table_readers_required_rows_test.go @@ -132,7 +132,7 @@ func buildTableReader(sctx sessionctx.Context) Executor { } func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest { - req, _, err := constructDAGReq(sctx, []core.PhysicalPlan{&core.PhysicalTableScan{ + req, err := constructDAGReq(sctx, []core.PhysicalPlan{&core.PhysicalTableScan{ Columns: []*model.ColumnInfo{}, Table: &model.TableInfo{ID: 12345, PKIsHandle: false}, Desc: false, diff --git a/kv/kv.go b/kv/kv.go index 5d1b6ba95ded8..486b93007217d 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -341,9 +341,6 @@ type Request struct { Desc bool // NotFillCache makes this request do not touch the LRU cache of the underlying storage. NotFillCache bool - // Streaming indicates using streaming API for this request, result in that one Next() - // call would not corresponds to a whole region result. - Streaming bool // ReplicaRead is used for reading data from replicas, only follower is supported at this time. ReplicaRead ReplicaReadType // StoreType represents this request is sent to the which type of store. diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 3b17a9bd69fe8..617f5e3fbb803 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -95,6 +95,7 @@ const ( ActionAlterTableStatsOptions ActionType = 58 ActionAlterNoCacheTable ActionType = 59 ActionCreateTables ActionType = 60 + ActionMultiSchemaChange ActionType = 61 ) var actionMap = map[ActionType]string{ @@ -157,6 +158,7 @@ var actionMap = map[ActionType]string{ ActionAlterCacheTable: "alter table cache", ActionAlterNoCacheTable: "alter table nocache", ActionAlterTableStatsOptions: "alter table statistics options", + ActionMultiSchemaChange: "alter table multi-schema change", // `ActionAlterTableAlterPartition` is removed and will never be used. // Just left a tombstone here for compatibility. @@ -256,6 +258,22 @@ func NewDDLReorgMeta() *DDLReorgMeta { // MultiSchemaInfo keeps some information for multi schema change. type MultiSchemaInfo struct { Warnings []*errors.Error + + SubJobs []*SubJob `json:"sub_jobs"` + Revertible bool `json:"revertible"` +} + +type SubJob struct { + Type ActionType `json:"type"` + Args []interface{} `json:"-"` + RawArgs json.RawMessage `json:"raw_args"` + SchemaState SchemaState `json:"schema_state"` + SnapshotVer uint64 `json:"snapshot_ver"` + Revertible bool `json:"revertible"` + State JobState `json:"state"` + RowCount int64 `json:"row_count"` + Warning *terror.Error `json:"warning"` + CtxVars []interface{} `json:"-"` } // Job is for a DDL operation. diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 63faa537fad32..3b214ed71e930 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -575,15 +575,3 @@ func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnIn } return nil } - -// SupportStreaming returns true if a pushed down operation supports using coprocessor streaming API. -// Note that this function handle pushed down physical plan only! It's called in constructDAGReq. -// Some plans are difficult (if possible) to implement streaming, and some are pointless to do so. -// TODO: Support more kinds of physical plan. -func SupportStreaming(p PhysicalPlan) bool { - switch p.(type) { - case *PhysicalIndexScan, *PhysicalSelection, *PhysicalTableScan: - return true - } - return false -} diff --git a/server/server.go b/server/server.go index 3221b6edc2acb..90b4a7e27101b 100644 --- a/server/server.go +++ b/server/server.go @@ -833,8 +833,12 @@ func (s *Server) GetInternalSessionStartTSList() []uint64 { s.sessionMapMutex.Lock() defer s.sessionMapMutex.Unlock() tsList := make([]uint64, 0, len(s.internalSessions)) + analyzeProcID := util.GetAutoAnalyzeProcID(s.ServerID) for se := range s.internalSessions { - if ts := session.GetStartTSFromSession(se); ts != 0 { + if ts, processInfoID := session.GetStartTSFromSession(se); ts != 0 { + if processInfoID == analyzeProcID { + continue + } tsList = append(tsList, ts) } } diff --git a/session/session.go b/session/session.go index e6b5997b78c4d..90936a2a15a1a 100644 --- a/session/session.go +++ b/session/session.go @@ -3240,12 +3240,16 @@ func (s *session) ShowProcess() *util.ProcessInfo { } // GetStartTSFromSession returns the startTS in the session `se` -func GetStartTSFromSession(se interface{}) uint64 { - var startTS uint64 +func GetStartTSFromSession(se interface{}) (uint64, uint64) { + var startTS, processInfoID uint64 tmp, ok := se.(*session) if !ok { logutil.BgLogger().Error("GetStartTSFromSession failed, can't transform to session struct") - return 0 + return 0, 0 + } + processInfo := tmp.ShowProcess() + if processInfo != nil { + processInfoID = processInfo.ID } txnInfo := tmp.TxnInfo() if txnInfo != nil { @@ -3256,7 +3260,7 @@ func GetStartTSFromSession(se interface{}) uint64 { "GetStartTSFromSession getting startTS of internal session", zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS))) - return startTS + return startTS, processInfoID } // logStmt logs some crucial SQL including: CREATE USER/GRANT PRIVILEGE/CHANGE PASSWORD/DDL etc and normal SQL @@ -3310,8 +3314,9 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { zap.Uint64("txnStartTS", vars.TxnCtx.StartTS), zap.Uint64("forUpdateTS", vars.TxnCtx.GetForUpdateTS()), zap.Bool("isReadConsistency", vars.IsIsolation(ast.ReadCommitted)), - zap.String("current_db", vars.CurrentDB), - zap.String("txn_mode", vars.GetReadableTxnMode()), + zap.String("currentDB", vars.CurrentDB), + zap.Bool("isPessimistic", vars.TxnCtx.IsPessimistic), + zap.String("sessionTxnMode", vars.GetReadableTxnMode()), zap.String("sql", query)) } } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 00966a55a0c69..edc2942459fa2 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -759,6 +759,30 @@ type SessionVars struct { // concurrencyFactor is the CPU cost of additional one goroutine. concurrencyFactor float64 + // factors for cost model v2 + // cpuFactorV2 is the CPU factor for the Cost Model Ver2. + cpuFactorV2 float64 + // copCPUFactorV2 is the cop-cpu factor for the Cost Model Ver2. + copCPUFactorV2 float64 + // tiflashCPUFactorV2 is the cop-cpu factor for the Cost Model Ver2. + tiflashCPUFactorV2 float64 + // networkFactorV2 is the network factor for the Cost Model Ver2. + networkFactorV2 float64 + // scanFactorV2 is the scan factor for the Cost Model Ver2. + scanFactorV2 float64 + // descScanFactorV2 is the desc-scan factor for the Cost Model Ver2. + descScanFactorV2 float64 + // tiflashScanFactorV2 is the tiflash-scan factor for the Cost Model Ver2. + tiflashScanFactorV2 float64 + // seekFactorV2 is the seek factor for the Cost Model Ver2. + seekFactorV2 float64 + // memoryFactorV2 is the memory factor for the Cost Model Ver2. + memoryFactorV2 float64 + // diskFactorV2 is the disk factor for the Cost Model Ver2. + diskFactorV2 float64 + // concurrencyFactorV2 is the concurrency factor for the Cost Model Ver2. + concurrencyFactorV2 float64 + // CopTiFlashConcurrencyFactor is the concurrency number of computation in tiflash coprocessor. CopTiFlashConcurrencyFactor float64 @@ -1117,6 +1141,8 @@ type SessionVars struct { IgnorePreparedCacheCloseStmt bool // EnableNewCostInterface is a internal switch to indicates whether to use the new cost calculation interface. EnableNewCostInterface bool + // CostModelVersion is a internal switch to indicates the Cost Model Version. + CostModelVersion int // BatchPendingTiFlashCount shows the threshold of pending TiFlash tables when batch adding. BatchPendingTiFlashCount int // RcReadCheckTS indicates if ts check optimization is enabled for current session. @@ -2464,26 +2490,46 @@ func (s *SessionVars) CleanupTxnReadTSIfUsed() { // GetCPUFactor returns the session variable cpuFactor func (s *SessionVars) GetCPUFactor() float64 { + if s.CostModelVersion == 2 { + return s.cpuFactorV2 + } return s.cpuFactor } // GetCopCPUFactor returns the session variable copCPUFactor func (s *SessionVars) GetCopCPUFactor() float64 { + if s.CostModelVersion == 2 { + return s.copCPUFactorV2 + } return s.copCPUFactor } +// GetTiFlashCPUFactor returns the session +func (s *SessionVars) GetTiFlashCPUFactor() float64 { + return s.tiflashCPUFactorV2 +} + // GetMemoryFactor returns the session variable memoryFactor func (s *SessionVars) GetMemoryFactor() float64 { + if s.CostModelVersion == 2 { + return s.memoryFactorV2 + } return s.memoryFactor } // GetDiskFactor returns the session variable diskFactor func (s *SessionVars) GetDiskFactor() float64 { + if s.CostModelVersion == 2 { + return s.diskFactorV2 + } return s.diskFactor } // GetConcurrencyFactor returns the session variable concurrencyFactor func (s *SessionVars) GetConcurrencyFactor() float64 { + if s.CostModelVersion == 2 { + return s.concurrencyFactorV2 + } return s.concurrencyFactor } @@ -2495,6 +2541,9 @@ func (s *SessionVars) GetNetworkFactor(tbl *model.TableInfo) float64 { return 0 } } + if s.CostModelVersion == 2 { + return s.networkFactorV2 + } return s.networkFactor } @@ -2506,6 +2555,9 @@ func (s *SessionVars) GetScanFactor(tbl *model.TableInfo) float64 { return 0 } } + if s.CostModelVersion == 2 { + return s.scanFactorV2 + } return s.scanFactor } @@ -2517,9 +2569,17 @@ func (s *SessionVars) GetDescScanFactor(tbl *model.TableInfo) float64 { return 0 } } + if s.CostModelVersion == 2 { + return s.descScanFactorV2 + } return s.descScanFactor } +// GetTiFlashScanFactor returns the session variable tiflashScanFactorV2 +func (s *SessionVars) GetTiFlashScanFactor() float64 { + return s.tiflashScanFactorV2 +} + // GetSeekFactor returns the session variable seekFactor // returns 0 when tbl is a temporary table. func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 { @@ -2528,5 +2588,8 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 { return 0 } } + if s.CostModelVersion == 2 { + return s.seekFactorV2 + } return s.seekFactor } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c6171ce7c12dc..06e206c98ab8a 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1113,6 +1113,50 @@ var defaultSysVars = []*SysVar{ s.concurrencyFactor = tidbOptFloat64(val, DefOptConcurrencyFactor) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCPUFactorV2, Value: strconv.FormatFloat(DefOptCPUFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.cpuFactorV2 = tidbOptFloat64(val, DefOptCPUFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCopCPUFactorV2, Value: strconv.FormatFloat(DefOptCopCPUFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.copCPUFactorV2 = tidbOptFloat64(val, DefOptCopCPUFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashCPUFactorV2, Value: strconv.FormatFloat(DefOptTiFlashCPUFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.tiflashCPUFactorV2 = tidbOptFloat64(val, DefOptTiFlashCPUFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptNetworkFactorV2, Value: strconv.FormatFloat(DefOptNetworkFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.networkFactorV2 = tidbOptFloat64(val, DefOptNetworkFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptScanFactorV2, Value: strconv.FormatFloat(DefOptScanFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.scanFactorV2 = tidbOptFloat64(val, DefOptScanFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptDescScanFactorV2, Value: strconv.FormatFloat(DefOptDescScanFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.descScanFactorV2 = tidbOptFloat64(val, DefOptDescScanFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashScanFactorV2, Value: strconv.FormatFloat(DefOptTiFlashScanFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.tiflashScanFactorV2 = tidbOptFloat64(val, DefOptTiFlashScanFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptSeekFactorV2, Value: strconv.FormatFloat(DefOptSeekFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.seekFactorV2 = tidbOptFloat64(val, DefOptSeekFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptMemoryFactorV2, Value: strconv.FormatFloat(DefOptMemoryFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.memoryFactorV2 = tidbOptFloat64(val, DefOptMemoryFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptDiskFactorV2, Value: strconv.FormatFloat(DefOptDiskFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.diskFactorV2 = tidbOptFloat64(val, DefOptDiskFactorV2) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptConcurrencyFactorV2, Value: strconv.FormatFloat(DefOptConcurrencyFactorV2, 'f', -1, 64), Hidden: true, Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + s.concurrencyFactorV2 = tidbOptFloat64(val, DefOptConcurrencyFactorV2) + return nil + }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexJoinBatchSize, Value: strconv.Itoa(DefIndexJoinBatchSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { s.IndexJoinBatchSize = tidbOptPositiveInt32(val, DefIndexJoinBatchSize) return nil @@ -1561,6 +1605,12 @@ var defaultSysVars = []*SysVar{ return nil }, }, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBCostModelVersion, Value: strconv.Itoa(1), Hidden: true, Type: TypeInt, MinValue: 1, MaxValue: 2, + SetSession: func(vars *SessionVars, s string) error { + vars.CostModelVersion = int(TidbOptInt64(s, 1)) + return nil + }, + }, {Scope: ScopeGlobal | ScopeSession, Name: TiDBRCReadCheckTS, Type: TypeBool, Value: BoolToOnOff(DefRCReadCheckTS), SetSession: func(s *SessionVars, val string) error { s.RcReadCheckTS = TiDBOptOn(val) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 97fadaae74812..64c1916292c73 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -273,6 +273,30 @@ const ( // TiDBOptConcurrencyFactor is the CPU cost of additional one goroutine. TiDBOptConcurrencyFactor = "tidb_opt_concurrency_factor" + // Variables for the Cost Model Ver2 + // TiDBOptCPUFactorV2 is the CPU factor for the Cost Model Ver2 + TiDBOptCPUFactorV2 = "tidb_opt_cpu_factor_v2" + // TiDBOptCopCPUFactorV2 is the CopCPU factor for the Cost Model Ver2 + TiDBOptCopCPUFactorV2 = "tidb_opt_copcpu_factor_v2" + // TiDBOptTiFlashCPUFactorV2 is the TiFlashCPU factor for the Cost Model Ver2 + TiDBOptTiFlashCPUFactorV2 = "tidb_opt_tiflash_cpu_factor_v2" + // TiDBOptNetworkFactorV2 is the network factor for the Cost Model Ver2 + TiDBOptNetworkFactorV2 = "tidb_opt_network_factor_v2" + // TiDBOptScanFactorV2 is the scan factor for the Cost Model Ver2 + TiDBOptScanFactorV2 = "tidb_opt_scan_factor_v2" + // TiDBOptDescScanFactorV2 is the desc scan factor for the Cost Model Ver2 + TiDBOptDescScanFactorV2 = "tidb_opt_desc_factor_v2" + // TiDBOptTiFlashScanFactorV2 is the TiFlashScan factor for the Cost Model Ver2 + TiDBOptTiFlashScanFactorV2 = "tidb_opt_tiflash_scan_factor_v2" + // TiDBOptSeekFactorV2 is the seek factor for the Cost Model Ver2 + TiDBOptSeekFactorV2 = "tidb_opt_seek_factor_v2" + // TiDBOptMemoryFactorV2 is the memory factor for the Cost Model Ver2 + TiDBOptMemoryFactorV2 = "tidb_opt_memory_factor_v2" + // TiDBOptDiskFactorV2 is the disk factor for the Cost Model Ver2 + TiDBOptDiskFactorV2 = "tidb_opt_disk_factor_v2" + // TiDBOptConcurrencyFactorV2 is the concurrency factor for the Cost Model Ver2 + TiDBOptConcurrencyFactorV2 = "tidb_opt_concurrency_factor_v2" + // TiDBIndexJoinBatchSize is used to set the batch size of an index lookup join. // The index lookup join fetches batches of data from outer executor and constructs ranges for inner executor. // This value controls how much of data in a batch to do the index join. @@ -637,6 +661,9 @@ const ( // TiDBEnableNewCostInterface is a internal switch to indicates whether to use the new cost calculation interface. TiDBEnableNewCostInterface = "tidb_enable_new_cost_interface" + // TiDBCostModelVersion is a internal switch to indicates the cost model version. + TiDBCostModelVersion = "tidb_cost_model_version" + // TiDBBatchPendingTiFlashCount indicates the maximum count of non-available TiFlash tables. TiDBBatchPendingTiFlashCount = "tidb_batch_pending_tiflash_count" @@ -756,6 +783,17 @@ const ( DefOptMemoryFactor = 0.001 DefOptDiskFactor = 1.5 DefOptConcurrencyFactor = 3.0 + DefOptCPUFactorV2 = 30.0 + DefOptCopCPUFactorV2 = 30.0 + DefOptTiFlashCPUFactorV2 = 2.0 + DefOptNetworkFactorV2 = 4.0 + DefOptScanFactorV2 = 100.0 + DefOptDescScanFactorV2 = 150.0 + DefOptTiFlashScanFactorV2 = 15.0 + DefOptSeekFactorV2 = 9500000.0 + DefOptMemoryFactorV2 = 0.001 + DefOptDiskFactorV2 = 1.5 + DefOptConcurrencyFactorV2 = 3.0 DefOptInSubqToJoinAndAgg = true DefOptPreferRangeScan = false DefBatchInsert = false diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index bd862118706f4..bef71c20aeb51 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -17,7 +17,6 @@ package copr import ( "context" "fmt" - "io" "strconv" "strings" "sync" @@ -93,9 +92,6 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa // coprocessor request but type is not DAG req.Paging = false } - if req.Streaming && req.Paging { - return copErrorResponse{errors.New("streaming and paging are both on")} - } ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := NewKeyRanges(req.KeyRanges) @@ -133,9 +129,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa // 2*it.concurrency to avoid deadlock in the unit test caused by the `MustExec` or `Exec` capacity = it.concurrency * 2 } - // in streaming or paging request, a request will be returned in multi batches, + // in paging request, a request will be returned in multi batches, // enlarge the channel size to avoid the request blocked by buffer full. - if req.Streaming || req.Paging { + if req.Paging { if capacity < 2048 { capacity = 2048 } @@ -148,9 +144,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa sessionMemTracker.FallbackOldAndSetNewAction(it.actionOnExceed) } - if !it.req.Streaming { - ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, it.rpcCancel) - } + ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, it.rpcCancel) it.open(ctx, enabledRateLimitAction, option.EnableCollectExecutionInfo) return it } @@ -184,10 +178,6 @@ const rangesPerTask = 25000 func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { start := time.Now() cmdType := tikvrpc.CmdCop - if req.Streaming { - cmdType = tikvrpc.CmdCopStream - } - if req.StoreType == kv.TiDB { return buildTiDBMemCopTasks(ranges, req) } @@ -202,9 +192,9 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv // Channel buffer is 2 for handling region split. // In a common case, two region split tasks will not be blocked. chanSize := 2 - // in streaming or paging request, a request will be returned in multi batches, + // in paging request, a request will be returned in multi batches, // enlarge the channel size to avoid the request blocked by buffer full. - if req.Streaming || req.Paging { + if req.Paging { chanSize = 128 } @@ -255,9 +245,6 @@ func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error return nil, err } cmdType := tikvrpc.CmdCop - if req.Streaming { - cmdType = tikvrpc.CmdCopStream - } tasks := make([]*copTask, 0, len(servers)) for _, ser := range servers { if req.TiDBServerID > 0 && req.TiDBServerID != ser.ServerIDGetter() { @@ -791,15 +778,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) metrics.TiKVCoprocessorHistogram.WithLabelValues(storeID, strconv.FormatBool(staleRead)).Observe(costTime.Seconds()) - if task.cmdType == tikvrpc.CmdCopStream { - return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime) - } - if worker.req.Paging { return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime) } - // Handles the response for non-streaming copTask. + // Handles the response for non-paging copTask. return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, nil, costTime) } @@ -821,12 +804,6 @@ func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *co case *coprocessor.Response: detailV2 = r.ExecDetailsV2 detail = r.ExecDetails - case *tikvrpc.CopStreamResponse: - // streaming request returns io.EOF, so the first CopStreamResponse.Response maybe nil. - if r.Response != nil { - detailV2 = r.Response.ExecDetailsV2 - detail = r.Response.ExecDetails - } default: panic("unreachable") } @@ -871,52 +848,6 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan return logStr } -func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *tikv.RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { - defer stream.Close() - var resp *coprocessor.Response - var lastRange *coprocessor.KeyRange - resp = stream.Response - if resp == nil { - // streaming request returns io.EOF, so the first Response is nil. - return nil, nil - } - for { - remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, nil, nil, task, ch, lastRange, costTime) - if err != nil || len(remainedTasks) != 0 { - return remainedTasks, errors.Trace(err) - } - resp, err = stream.Recv() - if err != nil { - if errors.Cause(err) == io.EOF { - return nil, nil - } - - err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task) - if task.storeType == kv.TiFlash { - err1 = bo.Backoff(tikv.BoTiFlashRPC(), err1) - } else { - err1 = bo.Backoff(tikv.BoTiKVRPC(), err1) - } - - if err1 != nil { - return nil, errors.Trace(err) - } - - // No coprocessor.Response for network error, rebuild task based on the last success one. - if errors.Cause(err) == context.Canceled { - logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) - } else { - logutil.BgLogger().Info("stream unknown error", zap.Error(err)) - } - task.ranges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) - return []*copTask{task}, nil - } - if resp.Range != nil { - lastRange = resp.Range - } - } -} - func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime) if err != nil || len(remainedTasks) != 0 { @@ -942,7 +873,7 @@ func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *ti // handleCopResponse checks coprocessor Response for region split and lock, // returns more tasks when that happens, or handles the response if no error. -// if we're handling streaming coprocessor response, lastRange is the range of last +// if we're handling coprocessor paging response, lastRange is the range of last // successful response, otherwise it's nil. func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) { if ver := resp.pbResp.GetLatestBucketsVersion(); task.bucketsVer < ver { @@ -982,9 +913,6 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err) } } - if worker.req.Streaming { - task.ranges = worker.calculateRetry(task.ranges, lastRange, worker.req.Desc) - } return []*copTask{task}, nil } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { @@ -1008,7 +936,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } return nil, errors.Trace(err) } - // When the request is using streaming API, the `Range` is not nil. + // When the request is using paging API, the `Range` is not nil. if resp.pbResp.Range != nil { resp.startKey = resp.pbResp.Range.Start } else if task.ranges != nil && task.ranges.Len() > 0 { @@ -1140,7 +1068,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, } // calculateRetry splits the input ranges into two, and take one of them according to desc flag. -// It's used in streaming API, to calculate which range is consumed and what needs to be retry. +// It's used in paging API, to calculate which range is consumed and what needs to be retry. // For example: // ranges: [r1 --> r2) [r3 --> r4) // split: [s1 --> s2) @@ -1158,7 +1086,7 @@ func (worker *copIteratorWorker) calculateRetry(ranges *KeyRanges, split *coproc return right } -// calculateRemain calculates the remain ranges to be processed, it's used in streaming and paging API. +// calculateRemain calculates the remain ranges to be processed, it's used in paging API. // For example: // ranges: [r1 --> r2) [r3 --> r4) // split: [s1 --> s2) diff --git a/store/mockstore/mockcopr/cop_handler_dag.go b/store/mockstore/mockcopr/cop_handler_dag.go index 1a408d78daf9b..3ab21aadbb9f4 100644 --- a/store/mockstore/mockcopr/cop_handler_dag.go +++ b/store/mockstore/mockcopr/cop_handler_dag.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" @@ -39,7 +38,6 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/collate" - mockpkg "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" @@ -136,20 +134,6 @@ func constructTimeZone(name string, offset int) (*time.Location, error) { return timeutil.ConstructTimeZone(name, offset) } -func (h coprHandler) handleCopStream(ctx context.Context, req *coprocessor.Request) (tikvpb.Tikv_CoprocessorStreamClient, error) { - dagCtx, e, dagReq, err := h.buildDAGExecutor(req) - if err != nil { - return nil, errors.Trace(err) - } - - return &mockCopStreamClient{ - exec: e, - req: dagReq, - ctx: ctx, - dagCtx: dagCtx, - }, nil -} - func (h coprHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, *tipb.Executor, error) { var currExec executor var err error @@ -522,16 +506,6 @@ func (mockClientStream) SendMsg(m interface{}) error { return nil } // RecvMsg implements grpc.ClientStream interface func (mockClientStream) RecvMsg(m interface{}) error { return nil } -type mockCopStreamClient struct { - mockClientStream - - req *tipb.DAGRequest - exec executor - ctx context.Context - dagCtx *dagContext - finished bool -} - type mockBathCopErrClient struct { mockClientStream @@ -568,110 +542,6 @@ func (mock *mockBatchCopDataClient) Recv() (*coprocessor.BatchResponse, error) { return nil, io.EOF } -type mockCopStreamErrClient struct { - mockClientStream - - *errorpb.Error -} - -func (mock *mockCopStreamErrClient) Recv() (*coprocessor.Response, error) { - return &coprocessor.Response{ - RegionError: mock.Error, - }, nil -} - -func (mock *mockCopStreamClient) Recv() (*coprocessor.Response, error) { - select { - case <-mock.ctx.Done(): - return nil, mock.ctx.Err() - default: - } - - if mock.finished { - return nil, io.EOF - } - - if hook := mock.ctx.Value(mockpkg.HookKeyForTest("mockTiKVStreamRecvHook")); hook != nil { - hook.(func(context.Context))(mock.ctx) - } - - var resp coprocessor.Response - chunk, finish, ran, counts, warnings, err := mock.readBlockFromExecutor() - resp.Range = ran - if err != nil { - if locked, ok := errors.Cause(err).(*testutils.ErrLocked); ok { - resp.Locked = &kvrpcpb.LockInfo{ - Key: locked.Key, - PrimaryLock: locked.Primary, - LockVersion: locked.StartTS, - LockTtl: locked.TTL, - } - } else { - resp.OtherError = err.Error() - } - return &resp, nil - } - if finish { - // Just mark it, need to handle the last chunk. - mock.finished = true - } - - data, err := chunk.Marshal() - if err != nil { - resp.OtherError = err.Error() - return &resp, nil - } - var Warnings []*tipb.Error - if len(warnings) > 0 { - Warnings = make([]*tipb.Error, 0, len(warnings)) - for i := range warnings { - Warnings = append(Warnings, toPBError(warnings[i].Err)) - } - } - streamResponse := tipb.StreamResponse{ - Error: toPBError(err), - Data: data, - Warnings: Warnings, - OutputCounts: counts, - } - resp.Data, err = proto.Marshal(&streamResponse) - if err != nil { - resp.OtherError = err.Error() - } - return &resp, nil -} - -func (mock *mockCopStreamClient) readBlockFromExecutor() (tipb.Chunk, bool, *coprocessor.KeyRange, []int64, []stmtctx.SQLWarn, error) { - var chunk tipb.Chunk - var ran coprocessor.KeyRange - var finish bool - var desc bool - mock.exec.ResetCounts() - ran.Start, desc = mock.exec.Cursor() - for count := 0; count < rowsPerChunk; count++ { - row, err := mock.exec.Next(mock.ctx) - if err != nil { - ran.End, _ = mock.exec.Cursor() - return chunk, false, &ran, nil, nil, errors.Trace(err) - } - if row == nil { - finish = true - break - } - for _, offset := range mock.req.OutputOffsets { - chunk.RowsData = append(chunk.RowsData, row[offset]...) - } - } - - ran.End, _ = mock.exec.Cursor() - if desc { - ran.Start, ran.End = ran.End, ran.Start - } - warnings := mock.dagCtx.evalCtx.sc.GetWarnings() - mock.dagCtx.evalCtx.sc.SetWarnings(nil) - return chunk, finish, &ran, mock.exec.Counts(), warnings, nil -} - func (h coprHandler) initSelectResponse(err error, warnings []stmtctx.SQLWarn, counts []int64) *tipb.SelectResponse { selResp := &tipb.SelectResponse{ Error: toPBError(err), diff --git a/store/mockstore/mockcopr/rpc_copr.go b/store/mockstore/mockcopr/rpc_copr.go index b883106240c8f..84ddd951d2040 100644 --- a/store/mockstore/mockcopr/rpc_copr.go +++ b/store/mockstore/mockcopr/rpc_copr.go @@ -43,6 +43,10 @@ func NewCoprRPCHandler() testutils.CoprRPCHandler { } } +func (mc *coprRPCHandler) HandleCopStream(ctx context.Context, reqCtx *kvrpcpb.Context, session *testutils.RPCSession, r *coprocessor.Request, timeout time.Duration) (*tikvrpc.CopStreamResponse, error) { + panic("CopStream API is deprecated") +} + func (mc *coprRPCHandler) HandleCmdCop(reqCtx *kvrpcpb.Context, session *testutils.RPCSession, r *coprocessor.Request) *coprocessor.Response { if err := session.CheckRequestContext(reqCtx); err != nil { return &coprocessor.Response{RegionError: err} @@ -89,37 +93,6 @@ func (mc *coprRPCHandler) HandleBatchCop(ctx context.Context, reqCtx *kvrpcpb.Co return batchResp, nil } -func (mc *coprRPCHandler) HandleCopStream(ctx context.Context, reqCtx *kvrpcpb.Context, session *testutils.RPCSession, r *coprocessor.Request, timeout time.Duration) (*tikvrpc.CopStreamResponse, error) { - if err := session.CheckRequestContext(reqCtx); err != nil { - return &tikvrpc.CopStreamResponse{ - Tikv_CoprocessorStreamClient: &mockCopStreamErrClient{Error: err}, - Response: &coprocessor.Response{ - RegionError: err, - }, - }, nil - } - ctx1, cancel := context.WithCancel(ctx) - copStream, err := coprHandler{session}.handleCopStream(ctx1, r) - if err != nil { - cancel() - return nil, errors.Trace(err) - } - - streamResp := &tikvrpc.CopStreamResponse{ - Tikv_CoprocessorStreamClient: copStream, - } - streamResp.Lease.Cancel = cancel - streamResp.Timeout = timeout - mc.streamTimeout <- &streamResp.Lease - - first, err := streamResp.Recv() - if err != nil { - return nil, errors.Trace(err) - } - streamResp.Response = first - return streamResp, nil -} - func (mc *coprRPCHandler) Close() { close(mc.done) } diff --git a/table/BUILD.bazel b/table/BUILD.bazel index ecabcc59a8f09..e5ad2bb8c1b12 100644 --- a/table/BUILD.bazel +++ b/table/BUILD.bazel @@ -44,7 +44,7 @@ go_test( ], embed = [":table"], flaky = True, - shard_count = 10, + shard_count = 50, deps = [ "//errno", "//expression", diff --git a/table/tables/tables.go b/table/tables/tables.go index f09b44c2a824d..aab32e1f18d8f 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -1456,9 +1456,6 @@ func GetColDefaultValue(ctx sessionctx.Context, col *table.Column, defaultVals [ if col.GetOriginDefaultValue() == nil && mysql.HasNotNullFlag(col.GetFlag()) { return colVal, errors.New("Miss column") } - if col.State != model.StatePublic { - return colVal, nil - } if defaultVals[col.Offset].IsNull() { colVal, err = table.GetColOriginDefaultValue(ctx, col.ToInfo()) if err != nil { diff --git a/util/chunk/BUILD.bazel b/util/chunk/BUILD.bazel index 11cbef9d35f1a..948adc6666223 100644 --- a/util/chunk/BUILD.bazel +++ b/util/chunk/BUILD.bazel @@ -55,6 +55,7 @@ go_test( "row_container_test.go", ], embed = [":chunk"], + race = "on", deps = [ "//config", "//parser/mysql", diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index 4eefe434d4fd5..31ec5c309f327 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -34,6 +34,10 @@ var ( ErrCancelledDDLJob = ClassDDL.NewStd(mysql.ErrCancelledDDLJob) // ErrRunMultiSchemaChanges means we run multi schema changes. ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change"), nil)) + // ErrOperateSameColumn means we change the same columns multiple times in a DDL. + ErrOperateSameColumn = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "operate same column '%s'"), nil)) + // ErrOperateSameIndex means we change the same indexes multiple times in a DDL. + ErrOperateSameIndex = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "operate same index '%s'"), nil)) // ErrWaitReorgTimeout means we wait for reorganization timeout. ErrWaitReorgTimeout = ClassDDL.NewStdErr(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrWaitReorgTimeout]) // ErrInvalidStoreVer means invalid store version.