Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: clean up the log in backfill procedure #39246

Merged
merged 3 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 28 additions & 56 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ddl

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -167,8 +166,8 @@ func (r *reorgBackfillTask) excludedEndKey() kv.Key {

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
startKey := hex.EncodeToString(r.startKey)
endKey := hex.EncodeToString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
Expand Down Expand Up @@ -277,10 +276,10 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
logutil.BgLogger().Info("[ddl] backfill worker back fill index",
zap.Int("workerID", w.id),
zap.Int("addedCount", result.addedCount),
zap.Int("scanCount", result.scanCount),
zap.String("nextHandle", tryDecodeToHandleString(taskCtx.nextKey)),
zap.Int("worker ID", w.id),
zap.Int("added count", result.addedCount),
zap.Int("scan count", result.scanCount),
zap.String("next key", hex.EncodeToString(taskCtx.nextKey)),
zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds()))
lastLogTime = time.Now()
}
Expand All @@ -292,12 +291,12 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
}
logutil.BgLogger().Info("[ddl] backfill worker finish task",
zap.Stringer("type", w.tp),
zap.Int("workerID", w.id),
zap.Int("worker ID", w.id),
zap.String("task", task.String()),
zap.Int("addedCount", result.addedCount),
zap.Int("scanCount", result.scanCount),
zap.String("nextHandle", tryDecodeToHandleString(result.nextKey)),
zap.String("takeTime", time.Since(startTime).String()))
zap.Int("added count", result.addedCount),
zap.Int("scan count", result.scanCount),
zap.String("next key", hex.EncodeToString(result.nextKey)),
zap.String("take time", time.Since(startTime).String()))
if ResultCounterForTest != nil && result.err == nil {
ResultCounterForTest.Add(1)
}
Expand Down Expand Up @@ -363,8 +362,8 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) {
logutil.BgLogger().Info("[ddl] split table range from PD",
zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("endHandle", tryDecodeToHandleString(endKey)))
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("end key", hex.EncodeToString(endKey)))
kvRange := kv.KeyRange{StartKey: startKey, EndKey: endKey}
s, ok := store.(tikv.Storage)
if !ok {
Expand Down Expand Up @@ -456,14 +455,14 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("nextHandle", tryDecodeToHandleString(nextKey)),
zap.Int64("batchAddedCount", taskAddedCount),
zap.String("taskFailedError", err.Error()),
zap.String("takeTime", elapsedTime.String()),
zap.ByteString("element type", reorgInfo.currElement.TypeKey),
zap.Int64("element ID", reorgInfo.currElement.ID),
zap.Int64("total added count", *totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("next key", hex.EncodeToString(nextKey)),
zap.Int64("batch added count", taskAddedCount),
zap.String("task failed error", err.Error()),
zap.String("take time", elapsedTime.String()),
zap.NamedError("updateHandleError", err1))
return errors.Trace(err)
}
Expand All @@ -472,43 +471,16 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("nextHandle", tryDecodeToHandleString(nextKey)),
zap.Int64("batchAddedCount", taskAddedCount),
zap.String("takeTime", elapsedTime.String()))
zap.ByteString("element type", reorgInfo.currElement.TypeKey),
zap.Int64("element ID", reorgInfo.currElement.ID),
zap.Int64("total added count", *totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("next key", hex.EncodeToString(nextKey)),
zap.Int64("batch added count", taskAddedCount),
zap.String("take time", elapsedTime.String()))
return nil
}

func tryDecodeToHandleString(key kv.Key) string {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("tryDecodeToHandleString panic",
zap.Any("recover()", r),
zap.Binary("key", key))
}
}()
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
recordPrefixIdx := bytes.Index(key, []byte("_r"))
if recordPrefixIdx == -1 {
return fmt.Sprintf("key: %x", key)
}
handleBytes := key[recordPrefixIdx+2:]
terminatedWithZero := len(handleBytes) > 0 && handleBytes[len(handleBytes)-1] == 0
if terminatedWithZero {
handle, err := tablecodec.DecodeRowKey(key[:len(key)-1])
if err == nil {
return handle.String() + ".next"
}
}
return fmt.Sprintf("%x", handleBytes)
}
return handle.String()
}

// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
Expand Down
11 changes: 6 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math/bits"
"strings"
Expand Down Expand Up @@ -1116,11 +1117,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
// Write the reorg info to store so the whole reorganize process can recover from panic.
err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool)
logutil.BgLogger().Info("[ddl] update column and indexes",
zap.Int64("jobID", reorgInfo.Job.ID),
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
zap.Int64("elementID", reorgInfo.currElement.ID),
zap.String("startHandle", tryDecodeToHandleString(reorgInfo.StartKey)),
zap.String("endHandle", tryDecodeToHandleString(reorgInfo.EndKey)))
zap.Int64("job ID", reorgInfo.Job.ID),
zap.ByteString("element type", reorgInfo.currElement.TypeKey),
zap.Int64("element ID", reorgInfo.currElement.ID),
zap.String("start key", hex.EncodeToString(reorgInfo.StartKey)),
zap.String("end key", hex.EncodeToString(reorgInfo.EndKey)))
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,10 +1845,10 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r

// Write the reorg info to store so the whole reorganize process can recover from panic.
err = reorg.UpdateReorgMeta(reorg.StartKey, w.sessPool)
logutil.BgLogger().Info("[ddl] job update reorgInfo", zap.Int64("jobID", reorg.Job.ID),
zap.ByteString("elementType", reorg.currElement.TypeKey), zap.Int64("elementID", reorg.currElement.ID),
zap.Int64("partitionTableID", pid), zap.String("startHandle", tryDecodeToHandleString(start)),
zap.String("endHandle", tryDecodeToHandleString(end)), zap.Error(err))
logutil.BgLogger().Info("[ddl] job update reorg info", zap.Int64("jobID", reorg.Job.ID),
zap.ByteString("element type", reorg.currElement.TypeKey), zap.Int64("element ID", reorg.currElement.ID),
zap.Int64("partition table ID", pid), zap.String("start key", hex.EncodeToString(start)),
zap.String("end key", hex.EncodeToString(end)), zap.Error(err))
return false, errors.Trace(err)
}

Expand Down
24 changes: 13 additions & 11 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
err := rh.UpdateDDLReorgStartHandle(job, currentElement, doneKey)

logutil.BgLogger().Info("[ddl] run reorg job wait timeout",
zap.Duration("waitTime", waitTimeout),
zap.ByteString("elementType", currentElement.TypeKey),
zap.Int64("elementID", currentElement.ID),
zap.Int64("totalAddedRowCount", rowCount),
zap.String("doneKey", tryDecodeToHandleString(doneKey)),
zap.Duration("wait time", waitTimeout),
zap.ByteString("element type", currentElement.TypeKey),
zap.Int64("element ID", currentElement.ID),
zap.Int64("total added row count", rowCount),
zap.String("done key", hex.EncodeToString(doneKey)),
zap.Error(err))
// If timeout, we will return, check the owner and retry to wait job done again.
return dbterror.ErrWaitReorgTimeout
Expand Down Expand Up @@ -559,10 +559,12 @@ func getTableRange(ctx *JobContext, d *ddlCtx, tbl table.PhysicalTable, snapshot
endHandleKey = tablecodec.EncodeRecordKey(tbl.RecordPrefix(), maxHandle)
}
if isEmptyTable || endHandleKey.Cmp(startHandleKey) < 0 {
logutil.BgLogger().Info("[ddl] get table range, endHandle < startHandle", zap.String("table", fmt.Sprintf("%v", tbl.Meta())),
logutil.BgLogger().Info("[ddl] get noop table range",
zap.String("table", fmt.Sprintf("%v", tbl.Meta())),
zap.Int64("table/partition ID", tbl.GetPhysicalID()),
zap.String("endHandle", tryDecodeToHandleString(endHandleKey)),
zap.String("startHandle", tryDecodeToHandleString(startHandleKey)))
zap.String("start key", hex.EncodeToString(startHandleKey)),
zap.String("end key", hex.EncodeToString(endHandleKey)),
zap.Bool("is empty table", isEmptyTable))
endHandleKey = startHandleKey
}
return
Expand Down Expand Up @@ -706,9 +708,9 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo
return nil, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] job get table range",
zap.Int64("jobID", job.ID), zap.Int64("physicalTableID", pid),
zap.String("startHandle", tryDecodeToHandleString(start)),
zap.String("endHandle", tryDecodeToHandleString(end)))
zap.Int64("job ID", job.ID), zap.Int64("physical table ID", pid),
zap.String("start key", hex.EncodeToString(start)),
zap.String("end key", hex.EncodeToString(end)))

err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0])
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion ddl/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ddl

import (
"encoding/hex"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx/variable"
)
Expand Down Expand Up @@ -79,7 +81,7 @@ func (d *ddl) Stats(vars *variable.SessionVars) (map[string]interface{}, error)
m[ddlJobSchemaID] = job.SchemaID
m[ddlJobTableID] = job.TableID
m[ddlJobSnapshotVer] = job.SnapshotVer
m[ddlJobReorgHandle] = tryDecodeToHandleString(ddlInfo.ReorgHandle)
m[ddlJobReorgHandle] = hex.EncodeToString(ddlInfo.ReorgHandle)
m[ddlJobArgs] = job.Args
return m, nil
}
8 changes: 7 additions & 1 deletion ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test

import (
"context"
"encoding/hex"
"fmt"
"strconv"
"testing"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -89,7 +91,11 @@ func TestDDLStatsInfo(t *testing.T) {
varMap, err := d.Stats(nil)
wg.Done()
require.NoError(t, err)
require.Equal(t, "1", varMap[ddlJobReorgHandle])
key, err := hex.DecodeString(varMap[ddlJobReorgHandle].(string))
require.NoError(t, err)
_, h, err := tablecodec.DecodeRecordKey(key)
require.NoError(t, err)
require.Equal(t, h.IntValue(), int64(1))
}
}
}
Expand Down