diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index 006ff7ea876ef..d9ad7497ca579 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -48,6 +48,7 @@ go_test( name = "kv_test", timeout = "short", srcs = [ + "base_test.go", "kv2sql_test.go", "session_internal_test.go", "session_test.go", @@ -56,7 +57,7 @@ go_test( embed = [":kv"], flaky = True, race = "on", - shard_count = 18, + shard_count = 19, deps = [ "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go index a9185bc48166a..dece47edc9788 100644 --- a/br/pkg/lightning/backend/kv/base.go +++ b/br/pkg/lightning/backend/kv/base.go @@ -37,6 +37,10 @@ import ( "go.uber.org/zap/zapcore" ) +const ( + maxLogLength = 512 * 1024 +) + // ExtraHandleColumnInfo is the column info of extra handle column. var ExtraHandleColumnInfo = model.NewExtraHandleColInfo() @@ -77,6 +81,7 @@ var kindStr = [...]string{ // MarshalLogArray implements the zapcore.ArrayMarshaler interface func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + var totalLength = 0 for _, datum := range row { kind := datum.Kind() var str string @@ -94,6 +99,14 @@ func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) erro return err } } + if len(str) > maxLogLength { + str = str[0:1024] + " (truncated)" + } + totalLength += len(str) + if totalLength >= maxLogLength { + encoder.AppendString("The row has been truncated, and the log has exited early.") + return nil + } if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { enc.AddString("kind", kindStr[kind]) enc.AddString("val", redact.String(str)) @@ -307,9 +320,16 @@ func (e *BaseKVEncoder) LogKVConvertFailed(row []types.Datum, j int, colInfo *mo log.ShortError(err), ) - e.logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()), - zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), - zap.Int("columnID", j+1)) + if len(original.GetString()) >= maxLogLength { + originalPrefix := original.GetString()[0:1024] + " (truncated)" + e.logger.Error("failed to convert kv value", logutil.RedactAny("origVal", originalPrefix), + zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), + zap.Int("columnID", j+1)) + } else { + e.logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()), + zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O), + zap.Int("columnID", j+1)) + } return errors.Annotatef( err, "failed to cast value as %s for column `%s` (#%d)", &colInfo.FieldType, colInfo.Name.O, j+1, diff --git a/br/pkg/lightning/backend/kv/base_test.go b/br/pkg/lightning/backend/kv/base_test.go new file mode 100644 index 0000000000000..d02956e4016f0 --- /dev/null +++ b/br/pkg/lightning/backend/kv/base_test.go @@ -0,0 +1,75 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" +) + +func TestLogKVConvertFailed(t *testing.T) { + tempPath := filepath.Join(t.TempDir(), "/temp.txt") + logCfg := &log.Config{File: tempPath, FileMaxSize: 1} + err := log.InitLogger(logCfg, "info") + require.NoError(t, err) + + modelName := model.NewCIStr("c1") + modelState := model.StatePublic + modelFieldType := *types.NewFieldType(mysql.TypeTiny) + c1 := &model.ColumnInfo{ID: 1, Name: modelName, State: modelState, Offset: 0, FieldType: modelFieldType} + cols := []*model.ColumnInfo{c1} + tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} + var tbl table.Table + tbl, err = tables.TableFromMeta(NewPanickingAllocators(0), tblInfo) + require.NoError(t, err) + + var baseKVEncoder *BaseKVEncoder + baseKVEncoder, err = NewBaseKVEncoder(&encode.EncodingConfig{ + Table: tbl, + SessionOptions: encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + Timestamp: 1234567890, + }, + Logger: log.L(), + }) + var newString strings.Builder + for i := 0; i < 100000; i++ { + newString.WriteString("test_test_test_test_") + } + newDatum := types.NewStringDatum(newString.String()) + rows := []types.Datum{} + for i := 0; i <= 10; i++ { + rows = append(rows, newDatum) + } + err = baseKVEncoder.LogKVConvertFailed(rows, 6, c1, err) + require.NoError(t, err) + + var content []byte + content, err = os.ReadFile(tempPath) + require.NoError(t, err) + require.LessOrEqual(t, 500, len(string(content))) + require.NotContains(t, content, "exceeds maximum file size") +}