diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index e6b75eb3a73f8..1696195d4abe4 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -53,6 +53,7 @@ go_test( embed = [":kv"], flaky = True, race = "on", + shard_count = 19, deps = [ "//br/pkg/lightning/common", "//br/pkg/lightning/log", diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 92d54d723e9d0..1ff8390f6c564 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -45,6 +45,10 @@ import ( "golang.org/x/exp/slices" ) +const ( + maxLogLength = 512 * 1024 +) + var ExtraHandleColumnInfo = model.NewExtraHandleColInfo() type genCol struct { @@ -223,6 +227,7 @@ var kindStr = [...]string{ // MarshalLogArray implements the zapcore.ArrayMarshaler interface func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error { for _, datum := range row { + var totalLength = 0 kind := datum.Kind() var str string var err error @@ -239,6 +244,14 @@ func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error return err } } + if len(str) > maxLogLength { + str = str[0:1024] + " (truncated)" + } + totalLength += len(str) + if totalLength >= maxLogLength { + encoder.AppendString("The row has been truncated, and the log has exited early.") + return nil + } if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { enc.AddString("kind", kindStr[kind]) enc.AddString("val", redact.String(str)) @@ -250,7 +263,7 @@ func (row RowArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error return nil } -func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *model.ColumnInfo, err error) error { +func LogKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *model.ColumnInfo, err error) error { var original types.Datum if 0 <= j && j < len(row) { original = row[j] @@ -265,9 +278,16 @@ func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *mo log.ShortError(err), ) - logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()), - zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), - zap.Int("columnID", j+1)) + if len(original.GetString()) >= maxLogLength { + originalPrefix := original.GetString()[0:1024] + " (truncated)" + logger.Error("failed to convert kv value", logutil.RedactAny("origVal", originalPrefix), + zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), + zap.Int("columnID", j+1)) + } else { + logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()), + zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), + zap.Int("columnID", j+1)) + } return errors.Annotatef( err, "failed to cast value as %s for column `%s` (#%d)", &colInfo.FieldType, colInfo.Name.O, j+1, @@ -381,7 +401,7 @@ func (kvcodec *tableKVEncoder) Encode( } value, err = kvcodec.getActualDatum(rowID, i, theDatum) if err != nil { - return nil, logKVConvertFailed(logger, row, j, col.ToInfo(), err) + return nil, LogKVConvertFailed(logger, row, j, col.ToInfo(), err) } record = append(record, value) @@ -412,7 +432,7 @@ func (kvcodec *tableKVEncoder) Encode( value, err = types.NewIntDatum(rowID), nil } if err != nil { - return nil, logKVConvertFailed(logger, row, j, ExtraHandleColumnInfo, err) + return nil, LogKVConvertFailed(logger, row, j, ExtraHandleColumnInfo, err) } record = append(record, value) alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType) diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index c08f63919ef77..d4ab801721921 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -17,7 +17,10 @@ package kv_test import ( "errors" "fmt" + "os" + "path/filepath" "reflect" + "strings" "testing" lkv "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -718,3 +721,37 @@ func BenchmarkSQL2KV(b *testing.B) { require.Equal(b, l, 2) } } + +func TestLogKVConvertFailed(t *testing.T) { + tempPath := filepath.Join(t.TempDir(), "/temp.txt") + logCfg := &log.Config{File: tempPath, FileMaxSize: 1} + err := log.InitLogger(logCfg, "info") + require.NoError(t, err) + + modelName := model.NewCIStr("c1") + modelState := model.StatePublic + modelFieldType := *types.NewFieldType(mysql.TypeTiny) + c1 := &model.ColumnInfo{ID: 1, Name: modelName, State: modelState, Offset: 0, FieldType: modelFieldType} + cols := []*model.ColumnInfo{c1} + tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} + _, err = tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + require.NoError(t, err) + + var newString strings.Builder + for i := 0; i < 100000; i++ { + newString.WriteString("test_test_test_test_") + } + newDatum := types.NewStringDatum(newString.String()) + rows := []types.Datum{} + for i := 0; i <= 10; i++ { + rows = append(rows, newDatum) + } + err = lkv.LogKVConvertFailed(log.L(), rows, 6, c1, err) + require.NoError(t, err) + + var content []byte + content, err = os.ReadFile(tempPath) + require.NoError(t, err) + require.LessOrEqual(t, 500, len(string(content))) + require.NotContains(t, content, "exceeds maximum file size") +}