Skip to content

Commit

Permalink
*: show backfill type in admin show ddl jobs (#38733)
Browse files Browse the repository at this point in the history
ref #35983
  • Loading branch information
tangenta authored Oct 28, 2022
1 parent ba2e2c9 commit 6d6e9c4
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 54 deletions.
4 changes: 4 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,10 @@ func pickBackfillType(w *worker, job *model.Job) model.ReorgType {

// canUseIngest indicates whether it can use ingest way to backfill index.
func canUseIngest(w *worker) bool {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
if len(ingest.LitBackCtxMgr.Keys()) > 0 {
return false
}
ctx, err := w.sessPool.get()
if err != nil {
return false
Expand Down
12 changes: 11 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che
req.AppendInt64(0, job.ID)
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, job.Type.String())
req.AppendString(3, job.Type.String()+showAddIdxReorgTp(job))
req.AppendString(4, job.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
Expand Down Expand Up @@ -595,6 +595,16 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che
}
}

func showAddIdxReorgTp(job *model.Job) string {
if job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey {
tp := job.ReorgMeta.ReorgTp.String()
if len(tp) > 0 {
return " /* " + tp + " */"
}
}
return ""
}

func ts2Time(timestamp uint64, loc *time.Location) types.Time {
duration := time.Duration(math.Pow10(9-types.DefaultFsp)) * time.Nanosecond
t := model.TSConvert2Time(timestamp)
Expand Down
13 changes: 13 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ func (tp ReorgType) NeedMergeProcess() bool {
return tp == ReorgTypeLitMerge || tp == ReorgTypeTxnMerge
}

// String implements fmt.Stringer interface.
func (tp ReorgType) String() string {
switch tp {
case ReorgTypeTxn:
return "txn"
case ReorgTypeLitMerge:
return "ingest"
case ReorgTypeTxnMerge:
return "txn-merge"
}
return ""
}

// TimeZoneLocation represents a single time zone.
type TimeZoneLocation struct {
Name string `json:"name"`
Expand Down
99 changes: 99 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package addindextest_test

import (
"fmt"
"strings"
"sync"
"testing"

"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
)

func TestAddIndexIngestMemoryUsage(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (a int, b int, c int);")
var sb strings.Builder
sb.WriteString("insert into t values ")
size := 100
for i := 0; i < size; i++ {
sb.WriteString(fmt.Sprintf("(%d, %d, %d)", i, i, i))
if i != size-1 {
sb.WriteString(",")
}
}
sb.WriteString(";")
tk.MustExec(sb.String())
require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage())
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("alter table t add unique index idx1(b);")
tk.MustExec("admin check table t;")
require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage())
}

func TestAddIndexIngestLimitOneBackend(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use addindexlit;")
tk2.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk2.MustExec("create table t2 (a int, b int);")
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

// Mock there is a running ingest job.
ingest.LitBackCtxMgr.Store(65535, &ingest.BackendContext{})
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
tk.MustExec("alter table t add index idx(a);")
wg.Done()
}()
go func() {
tk2.MustExec("alter table t2 add index idx_b(b);")
wg.Done()
}()
wg.Wait()
rows := tk.MustQuery("admin show ddl jobs 2;").Rows()
require.Len(t, rows, 2)
require.False(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest"))
require.False(t, strings.Contains(rows[1][3].(string) /* job_type */, "ingest"))
require.Equal(t, rows[0][7].(string) /* row_count */, "3")
require.Equal(t, rows[1][7].(string) /* row_count */, "3")

// Remove the running ingest job.
ingest.LitBackCtxMgr.Delete(65535)
tk.MustExec("alter table t add index idx_a(a);")
rows = tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
require.True(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest"))
require.Equal(t, rows[0][7].(string) /* row_count */, "3")
}
53 changes: 0 additions & 53 deletions tests/realtikvtest/addindextest/memory_test.go

This file was deleted.

0 comments on commit 6d6e9c4

Please sign in to comment.