Skip to content

Commit

Permalink
Merge branch 'master' into fix-opaque-compare
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongjiwei authored Aug 24, 2022
2 parents 9f3b6ca + 64c30c0 commit 0f4c693
Show file tree
Hide file tree
Showing 27 changed files with 1,097 additions and 75 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,7 @@ bazel_statisticstest: failpoint-enable bazel_ci_prepare
bazel_txntest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/txntest/...

bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/addindextest/...
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
shard_count = 50,
deps = [
"//config",
"//domain",
Expand Down
3 changes: 3 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down Expand Up @@ -289,6 +290,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down Expand Up @@ -649,6 +651,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/": "enable to ddl",
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ go_library(
"//util/admin",
"//util/bitmap",
"//util/breakpoint",
"//util/channel",
"//util/chunk",
"//util/codec",
"//util/collate",
Expand Down
40 changes: 19 additions & 21 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/channel"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/disk"
Expand Down Expand Up @@ -231,7 +232,7 @@ type HashAggIntermData struct {
}

// getPartialResultBatch fetches a batch of partial results from HashAggIntermData.
func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
func (d *HashAggIntermData) getPartialResultBatch(_ *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, _ []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
keyStart := d.cursor
for ; d.cursor < len(d.groupKeys) && len(prs) < maxChunkSize; d.cursor++ {
prs = append(prs, d.partialResultMap[d.groupKeys[d.cursor]])
Expand Down Expand Up @@ -275,15 +276,12 @@ func (e *HashAggExec) Close() error {
}
close(e.finishCh)
for _, ch := range e.partialOutputChs {
for range ch {
}
channel.Clear(ch)
}
for _, ch := range e.partialInputChs {
for range ch {
}
}
for range e.finalOutputCh {
channel.Clear(ch)
}
channel.Clear(e.finalOutputCh)
e.executed = false
if e.memTracker != nil {
e.memTracker.ReplaceBytesUsed(0)
Expand All @@ -295,7 +293,7 @@ func (e *HashAggExec) Close() error {
// Open implements the Executor Open interface.
func (e *HashAggExec) Open(ctx context.Context) error {
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
}
})
Expand Down Expand Up @@ -352,7 +350,7 @@ func closeBaseExecutor(b *baseExecutor) {
}
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
sessionVars := e.ctx.GetSessionVars()
finalConcurrency := sessionVars.HashAggFinalConcurrency()
partialConcurrency := sessionVars.HashAggPartialConcurrency()
Expand Down Expand Up @@ -486,7 +484,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
}
if w.stats != nil {
w.stats.ExecTime += int64(time.Since(execStart))
w.stats.TaskNum += 1
w.stats.TaskNum++
}
// The intermData can be promised to be not empty if reaching here,
// so we set needShuffle to be true.
Expand All @@ -503,7 +501,7 @@ func getGroupKeyMemUsage(groupKey [][]byte) int64 {
return mem
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) {
func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, _ int) (err error) {
memSize := getGroupKeyMemUsage(w.groupKey)
w.groupKey, err = getGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
failpoint.Inject("ConsumeRandomPanic", nil)
Expand Down Expand Up @@ -532,7 +530,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s

// shuffleIntermData shuffles the intermediate data of partial workers to corresponded final workers.
// We only support parallel execution for single-machine, so process of encode and decode can be skipped.
func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) {
func (w *HashAggPartialWorker) shuffleIntermData(_ *stmtctx.StatementContext, finalConcurrency int) {
groupKeysSlice := make([][]string, finalConcurrency)
for groupKey := range w.partialResultsMap {
finalWorkerIdx := int(murmur3.Sum32([]byte(groupKey))) % finalConcurrency
Expand Down Expand Up @@ -605,7 +603,7 @@ func getGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte,
return groupKey, nil
}

func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
n := len(groupKey)
partialResults := make([][]aggfuncs.PartialResult, n)
allMemDelta := int64(0)
Expand Down Expand Up @@ -706,7 +704,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err
}
if w.stats != nil {
w.stats.ExecTime += int64(time.Since(execStart))
w.stats.TaskNum += 1
w.stats.TaskNum++
}
}
}
Expand Down Expand Up @@ -906,7 +904,7 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
}

failpoint.Inject("parallelHashAggError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("HashAggExec.parallelExec error"))
}
})
Expand Down Expand Up @@ -1011,7 +1009,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
}

failpoint.Inject("unparallelHashAggError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("HashAggExec.unparallelExec error"))
}
})
Expand Down Expand Up @@ -1170,7 +1168,7 @@ func (w *AggWorkerStat) Clone() *AggWorkerStat {
}
}

func (e *HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) {
func (*HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) {
var totalTime, totalWait, totalExec, totalTaskNum int64
for _, w := range workerStats {
totalTime += w.WorkerTime
Expand Down Expand Up @@ -1231,7 +1229,7 @@ func (e *HashAggRuntimeStats) Merge(other execdetails.RuntimeStats) {
}

// Tp implements the RuntimeStats interface.
func (e *HashAggRuntimeStats) Tp() int {
func (*HashAggRuntimeStats) Tp() int {
return execdetails.TpHashAggRuntimeStat
}

Expand Down Expand Up @@ -1263,7 +1261,7 @@ type StreamAggExec struct {
// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
}
})
Expand Down Expand Up @@ -1950,9 +1948,9 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
}

// GetPriority get the priority of the Action
func (a *AggSpillDiskAction) GetPriority() int64 {
func (*AggSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
func (a *AggSpillDiskAction) SetLogHook(hook func(uint64)) {}
func (*AggSpillDiskAction) SetLogHook(_ func(uint64)) {}
2 changes: 1 addition & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestSortSpillDisk(t *testing.T) {
err = exec.Close()
require.NoError(t, err)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 24000)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, 28000)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ func TestPlanReplayer(t *testing.T) {

tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
tk.MustExec("create definer=`root`@`127.0.0.1` view v1 as select * from t1")
tk.MustExec("create definer=`root`@`127.0.0.1` view v2 as select * from v1")
tk.MustExec("plan replayer dump explain with tmp as (select a from t1 group by t1.a) select * from tmp, t2 where t2.a=tmp.a;")
tk.MustExec("plan replayer dump explain select * from t1 where t1.a > (with cte1 as (select 1) select count(1) from cte1);")
tk.MustExec("plan replayer dump explain select * from v1")
tk.MustExec("plan replayer dump explain select * from v2")
}

func TestShow(t *testing.T) {
Expand Down
Loading

0 comments on commit 0f4c693

Please sign in to comment.