diff --git a/pkg/lightning/backend/kv/sql2kv.go b/pkg/lightning/backend/kv/sql2kv.go index 2f66834c2..6466149d3 100644 --- a/pkg/lightning/backend/kv/sql2kv.go +++ b/pkg/lightning/backend/kv/sql2kv.go @@ -406,6 +406,14 @@ func (kvcodec *tableKVEncoder) Encode( return kvPairs(pairs), nil } +func (kvs kvPairs) Size() uint64 { + size := uint64(0) + for _, kv := range kvs { + size += uint64(len(kv.Key) + len(kv.Val)) + } + return size +} + func (kvs kvPairs) ClassifyAndAppend( data *Rows, dataChecksum *verification.KVChecksum, diff --git a/pkg/lightning/backend/kv/types.go b/pkg/lightning/backend/kv/types.go index 299f4a8cb..4ebf65f90 100644 --- a/pkg/lightning/backend/kv/types.go +++ b/pkg/lightning/backend/kv/types.go @@ -35,6 +35,9 @@ type Row interface { indices *Rows, indexChecksum *verification.KVChecksum, ) + + // Size represents the total kv size of this Row. + Size() uint64 } // Rows represents a collection of encoded rows. diff --git a/pkg/lightning/backend/noop/noop.go b/pkg/lightning/backend/noop/noop.go index 00e344b84..b026f4212 100644 --- a/pkg/lightning/backend/noop/noop.go +++ b/pkg/lightning/backend/noop/noop.go @@ -154,6 +154,10 @@ func (e noopEncoder) Encode(log.Logger, []types.Datum, int64, []int) (kv.Row, er type noopRow struct{} +func (r noopRow) Size() uint64 { + return 0 +} + func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) { } diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index 91ff5e00f..86ad569c5 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -93,6 +93,10 @@ func NewTiDBBackend(db *sql.DB, onDuplicate string) backend.Backend { return backend.MakeBackend(&tidbBackend{db: db, onDuplicate: onDuplicate}) } +func (row tidbRow) Size() uint64 { + return uint64(len(row)) +} + func (row tidbRow) ClassifyAndAppend(data *kv.Rows, checksum *verification.KVChecksum, _ *kv.Rows, _ *verification.KVChecksum) { rows := (*data).(tidbRows) // Cannot do `rows := data.(*tidbRows); *rows = append(*rows, row)`. diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 93a244c48..f18dd851f 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -2519,6 +2519,7 @@ func (cr *chunkRestore) encodeLoop( canDeliver := false kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt) var newOffset, rowID int64 + var kvSize uint64 outLoop: for !canDeliver { readDurStart := time.Now() @@ -2554,8 +2555,16 @@ func (cr *chunkRestore) encodeLoop( return } kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}) - if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset { + kvSize += kvs.Size() + failpoint.Inject("mock-kv-size", func(val failpoint.Value) { + kvSize += uint64(val.(int)) + }) + // pebble cannot allow > 4.0G kv in one batch. + // we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt. + // so add this check. + if kvSize >= minDeliverBytes || len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset { canDeliver = true + kvSize = 0 } } encodeTotalDur += encodeDur diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index 61bce987b..c5249a9f1 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -1150,6 +1150,57 @@ func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) { c.Assert(kvsCh, HasLen, 0) } +func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { + ctx := context.Background() + kvsCh := make(chan []deliveredKVs, 4) + deliverCompleteCh := make(chan deliverResult) + kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{ + SQLMode: s.cfg.TiDB.SQLMode, + Timestamp: 1234567898, + }) + c.Assert(err, IsNil) + + dir := c.MkDir() + fileName := "db.limit.000.csv" + err = ioutil.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3\r\n4,5,6\r\n7,8,9\r"), 0o644) + c.Assert(err, IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + cfg := config.NewConfig() + + reader, err := store.Open(ctx, fileName) + c.Assert(err, IsNil) + w := worker.NewPool(ctx, 1, "io") + p := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, false) + s.cr.parser = p + + rc := &Controller{pauser: DeliverPauser, cfg: cfg} + c.Assert(failpoint.Enable( + "github.com/pingcap/br/pkg/lightning/restore/mock-kv-size", "return(110000000)"), IsNil) + _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) + + // we have 3 kvs total. after the failpoint injected. + // we will send one kv each time. + count := 0 + for { + kvs, ok := <-kvsCh + if !ok { + break + } + count += 1 + if count <= 3 { + c.Assert(kvs, HasLen, 1) + } + if count == 4 { + // we will send empty kvs before encodeLoop exists + // so, we can receive 4 batch and 1 is empty + c.Assert(kvs, HasLen, 0) + break + } + } +} + func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) { ctx := context.Background() kvsCh := make(chan []deliveredKVs) diff --git a/tests/lightning_checkpoint_parquet/run.sh b/tests/lightning_checkpoint_parquet/run.sh index 9f1004095..31666bd3b 100755 --- a/tests/lightning_checkpoint_parquet/run.sh +++ b/tests/lightning_checkpoint_parquet/run.sh @@ -41,9 +41,9 @@ set +e run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null set -e run_sql 'SELECT count(*), sum(iVal) FROM `cppq_tsr`.tbl' -check_contains "count(*): 32" -# sum(0..31) -check_contains "sum(iVal): 496" +check_contains "count(*): 1" +# sum(0) +check_contains "sum(iVal): 0" # check chunk offset and update checkpoint current row id to a higher value so that # if parse read from start, the generated rows will be different