Skip to content

Commit

Permalink
Merge branch 'master' into paging-unistore
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Jun 10, 2022
2 parents 712780f + fb67ee4 commit b2221dc
Show file tree
Hide file tree
Showing 40 changed files with 386 additions and 597 deletions.
1 change: 1 addition & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ build:release --config=ci
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --remote_cache=http://172.16.4.3:8084/tidb
test:ci --verbose_failures
test:ci --test_timeout=180
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --remote_cache=http://172.16.4.3:8084/tidb
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,13 @@ bazel_prepare:
bazel_test: failpoint-enable bazel_ci_prepare
bazel --output_user_root=/home/jenkins/.tidb/tmp test --config=ci \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/lightning/log:log_test
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test


bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/lightning/log:log_test
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
"filter_test.go",
"log_test.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/log_test",
deps = [
":log",
"@com_github_stretchr_testify//require",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,6 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//keepalive",
],
)
12 changes: 11 additions & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,17 @@ func RunStreamRestore(
g glue.Glue,
cmdName string,
cfg *RestoreConfig,
) error {
) (err error) {
startTime := time.Now()
defer func() {
dur := time.Since(startTime)
if err != nil {
summary.Log(cmdName+" failed summary", zap.Error(err))
} else {
summary.Log(cmdName+" success summary", zap.Duration("total-take", dur),
zap.Uint64("restore-from", cfg.StartTS), zap.Uint64("restore-to", cfg.RestoreTS))
}
}()
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()

Expand Down
34 changes: 32 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,49 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
atomicutil "go.uber.org/atomic"
)

type testCancelJob struct {
sql string
ok bool
cancelState model.SchemaState
cancelState interface{} // model.SchemaState | []model.SchemaState
onJobBefore bool
onJobUpdate bool
prepareSQL []string
}

type subStates = []model.SchemaState

func testMatchCancelState(t *testing.T, job *model.Job, cancelState interface{}, sql string) bool {
switch v := cancelState.(type) {
case model.SchemaState:
if job.Type == model.ActionMultiSchemaChange {
msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v)
assert.Failf(t, msg, "use []model.SchemaState as cancel states instead")
return false
}
return job.SchemaState == v
case subStates: // For multi-schema change sub-jobs.
if job.MultiSchemaInfo == nil {
msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v)
assert.Failf(t, msg, "use model.SchemaState as the cancel state instead")
return false
}
assert.Equal(t, len(job.MultiSchemaInfo.SubJobs), len(v), sql)
for i, subJobSchemaState := range v {
if job.MultiSchemaInfo.SubJobs[i].SchemaState != subJobSchemaState {
return false
}
}
return true
default:
return false
}
}

var allTestCase = []testCancelJob{
// Add index.
{"create unique index c3_index on t_partition (c1)", true, model.StateWriteReorganization, true, true, nil},
Expand Down Expand Up @@ -246,7 +276,7 @@ func TestCancel(t *testing.T) {
cancelWhenReorgNotStart := false

hookFunc := func(job *model.Job) {
if job.SchemaState == allTestCase[i.Load()].cancelState && !cancel {
if testMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel {
if !cancelWhenReorgNotStart && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 {
return
}
Expand Down
11 changes: 7 additions & 4 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -63,7 +64,6 @@ func TestColumnAdd(t *testing.T) {
var jobID int64
tc.OnJobUpdatedExported = func(job *model.Job) {
jobID = job.ID
require.NoError(t, dom.Reload())
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
switch job.SchemaState {
Expand Down Expand Up @@ -96,11 +96,14 @@ func TestColumnAdd(t *testing.T) {
}
}
tc.OnJobUpdatedExported = func(job *model.Job) {
if job.NotStarted() {
return
}
jobID = job.ID
tbl := external.GetTableByName(t, internal, "test", "t")
if job.SchemaState != model.StatePublic {
for _, col := range tbl.Cols() {
require.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped")
assert.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped")
}
}
}
Expand Down Expand Up @@ -224,7 +227,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t
return errors.Trace(err)
}
err = checkResult(ctx, writeOnlyTable, writeOnlyTable.WritableCols(), [][]string{
{"1", "2", "<nil>"},
{"1", "2", "3"},
{"2", "3", "3"},
})
if err != nil {
Expand All @@ -236,7 +239,7 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t
return errors.Trace(err)
}
got := fmt.Sprintf("%v", row)
expect := fmt.Sprintf("%v", []types.Datum{types.NewDatum(1), types.NewDatum(2), types.NewDatum(nil)})
expect := fmt.Sprintf("%v", []types.Datum{types.NewDatum(1), types.NewDatum(2), types.NewDatum(3)})
if got != expect {
return errors.Errorf("expect %v, got %v", expect, got)
}
Expand Down
1 change: 1 addition & 0 deletions ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestMain(m *testing.M) {

autoid.SetStep(5000)
ddl.ReorgWaitTimeout = 30 * time.Millisecond
ddl.RunInGoTest = true
ddl.SetBatchInsertDeleteRangeSize(2)

config.UpdateGlobal(func(conf *config.Config) {
Expand Down
13 changes: 0 additions & 13 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
hook.(func(*kv.Request))(kvReq)
}

kvReq.Streaming = false
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
Expand Down Expand Up @@ -111,20 +110,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}

// kvReq.MemTracker is used to trace and control memory usage in DistSQL layer;
// for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it;
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
if kvReq.Streaming {
return &streamResult{
label: "dag-stream",
sqlType: label,
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
feedback: fb,
}, nil
}
return &selectResult{
label: "dag",
resp: resp,
Expand Down
6 changes: 0 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,6 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
return builder
}

// SetStreaming sets "Streaming" flag for "kv.Request".
func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
builder.Request.Streaming = streaming
return builder
}

// SetPaging sets "Paging" flag for "kv.Request".
func (builder *RequestBuilder) SetPaging(paging bool) *RequestBuilder {
builder.Request.Paging = paging
Expand Down
8 changes: 0 additions & 8 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func TestRequestBuilder1(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down Expand Up @@ -352,7 +351,6 @@ func TestRequestBuilder2(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down Expand Up @@ -400,7 +398,6 @@ func TestRequestBuilder3(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down Expand Up @@ -432,7 +429,6 @@ func TestRequestBuilder4(t *testing.T) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetStreaming(true).
SetFromSessionVars(variable.NewSessionVars()).
Build()
require.NoError(t, err)
Expand All @@ -447,7 +443,6 @@ func TestRequestBuilder4(t *testing.T) {
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
Streaming: true,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
Expand Down Expand Up @@ -493,7 +488,6 @@ func TestRequestBuilder5(t *testing.T) {
IsolationLevel: kv.RC,
Priority: 1,
NotFillCache: true,
Streaming: false,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
Expand Down Expand Up @@ -523,7 +517,6 @@ func TestRequestBuilder6(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: true,
Streaming: false,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
Expand Down Expand Up @@ -559,7 +552,6 @@ func TestRequestBuilder7(t *testing.T) {
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
Streaming: false,
ReplicaRead: replicaRead.replicaReadType,
ReadReplicaScope: kv.GlobalReplicaScope,
}
Expand Down
1 change: 0 additions & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ var (

var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*streamResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
)

Expand Down
Loading

0 comments on commit b2221dc

Please sign in to comment.