From 05beea0674d5f4100de3ae80cb81948029eb65a3 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 4 Jun 2021 16:08:29 +0800 Subject: [PATCH] lightning: support restore data into tables that contains data (#784) --- pkg/lightning/backend/tidb/tidb.go | 108 ++- pkg/lightning/checkpoints/checkpoints.go | 63 +- .../checkpoints/checkpoints_file_test.go | 5 + .../checkpoints/checkpoints_sql_test.go | 15 +- .../checkpointspb/file_checkpoints.pb.go | 195 +++-- .../checkpointspb/file_checkpoints.proto | 4 +- pkg/lightning/common/util.go | 22 +- pkg/lightning/config/config.go | 17 +- pkg/lightning/restore/meta_manager.go | 809 ++++++++++++++++++ pkg/lightning/restore/meta_manager_test.go | 244 ++++++ pkg/lightning/restore/restore.go | 499 +++++++---- pkg/lightning/restore/restore_test.go | 18 +- pkg/pdutil/pd.go | 78 +- tests/lightning_checkpoint/run.sh | 2 +- tests/lightning_checkpoint_chunks/run.sh | 2 +- .../data/error_summary.c.sql | 2 +- tests/lightning_incremental/config.toml | 0 .../data/incr-schema-create.sql | 1 + .../data/incr.auto_random-schema.sql | 5 + .../data/incr.auto_random.sql | 5 + .../data/incr.pk_auto_inc-schema.sql | 4 + .../data/incr.pk_auto_inc.sql | 5 + .../data/incr.rowid_uk_inc-schema.sql | 4 + .../data/incr.rowid_uk_inc.sql | 5 + .../data/incr.uk_auto_inc-schema.sql | 4 + .../data/incr.uk_auto_inc.sql | 5 + .../data1/incr-schema-create.sql | 1 + .../data1/incr.auto_random-schema.sql | 5 + .../data1/incr.auto_random.sql | 5 + .../data1/incr.pk_auto_inc-schema.sql | 4 + .../data1/incr.pk_auto_inc.sql | 5 + .../data1/incr.rowid_uk_inc-schema.sql | 4 + .../data1/incr.rowid_uk_inc.sql | 5 + .../data1/incr.uk_auto_inc-schema.sql | 4 + .../data1/incr.uk_auto_inc.sql | 5 + tests/lightning_incremental/run.sh | 76 ++ tests/lightning_local_backend/run.sh | 2 +- tests/lightning_tidb_rowid/run.sh | 9 +- tidb-lightning.toml | 4 + 39 files changed, 1944 insertions(+), 306 deletions(-) create mode 100644 pkg/lightning/restore/meta_manager.go create mode 100644 pkg/lightning/restore/meta_manager_test.go create mode 100644 tests/lightning_incremental/config.toml create mode 100644 tests/lightning_incremental/data/incr-schema-create.sql create mode 100644 tests/lightning_incremental/data/incr.auto_random-schema.sql create mode 100644 tests/lightning_incremental/data/incr.auto_random.sql create mode 100644 tests/lightning_incremental/data/incr.pk_auto_inc-schema.sql create mode 100644 tests/lightning_incremental/data/incr.pk_auto_inc.sql create mode 100644 tests/lightning_incremental/data/incr.rowid_uk_inc-schema.sql create mode 100644 tests/lightning_incremental/data/incr.rowid_uk_inc.sql create mode 100644 tests/lightning_incremental/data/incr.uk_auto_inc-schema.sql create mode 100644 tests/lightning_incremental/data/incr.uk_auto_inc.sql create mode 100644 tests/lightning_incremental/data1/incr-schema-create.sql create mode 100644 tests/lightning_incremental/data1/incr.auto_random-schema.sql create mode 100644 tests/lightning_incremental/data1/incr.auto_random.sql create mode 100644 tests/lightning_incremental/data1/incr.pk_auto_inc-schema.sql create mode 100644 tests/lightning_incremental/data1/incr.pk_auto_inc.sql create mode 100644 tests/lightning_incremental/data1/incr.rowid_uk_inc-schema.sql create mode 100644 tests/lightning_incremental/data1/incr.rowid_uk_inc.sql create mode 100644 tests/lightning_incremental/data1/incr.uk_auto_inc-schema.sql create mode 100644 tests/lightning_incremental/data1/incr.uk_auto_inc.sql create mode 100644 tests/lightning_incremental/run.sh diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index 10818e707..b39a96f34 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -504,48 +504,23 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st if rows.Err() != nil { return rows.Err() } - // for version < v4.0.0 we can use `show table next_row_id` to fetch auto id info, so about should be enough + // shard_row_id/auto random is only available after tidb v4.0.0 + // `show table next_row_id` is also not available before tidb v4.0.0 if tidbVersion.Major < 4 { return nil } + // init auto id column for each table for _, tbl := range tables { tblName := common.UniqueTable(schemaName, tbl.Name.O) - rows, e = tx.Query(fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tblName)) - if e != nil { - return e + autoIDInfos, err := FetchTableAutoIDInfos(ctx, tx, tblName) + if err != nil { + return errors.Trace(err) } - for rows.Next() { - var ( - dbName, tblName, columnName, idType string - nextID int64 - ) - columns, err := rows.Columns() - if err != nil { - return err - } - - // +--------------+------------+-------------+--------------------+----------------+ - // | DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE | - // +--------------+------------+-------------+--------------------+----------------+ - // | testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT | - // +--------------+------------+-------------+--------------------+----------------+ - - // if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT' - // for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns. - if len(columns) == 4 { - err = rows.Scan(&dbName, &tblName, &columnName, &nextID) - idType = "AUTO_INCREMENT" - } else { - err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType) - } - if err != nil { - return err - } - + for _, info := range autoIDInfos { for _, col := range tbl.Columns { - if col.Name.O == columnName { - switch idType { + if col.Name.O == info.Column { + switch info.Type { case "AUTO_INCREMENT": col.Flag |= mysql.AutoIncrementFlag case "AUTO_RANDOM": @@ -557,14 +532,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st } } } - // Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration. - //nolint:sqlclosecheck - if err := rows.Close(); err != nil { - return errors.Trace(err) - } - if rows.Err() != nil { - return errors.Trace(rows.Err()) - } + } return nil }) @@ -607,3 +575,59 @@ func (w *Writer) Close(ctx context.Context) error { func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, arg1 uint64, rows kv.Rows) error { return w.be.WriteRows(ctx, w.engineUUID, tableName, columnNames, arg1, rows) } + +type TableAutoIDInfo struct { + Column string + NextID int64 + Type string +} + +func FetchTableAutoIDInfos(ctx context.Context, exec common.QueryExecutor, tableName string) ([]*TableAutoIDInfo, error) { + rows, e := exec.QueryContext(ctx, fmt.Sprintf("SHOW TABLE %s NEXT_ROW_ID", tableName)) + if e != nil { + return nil, errors.Trace(e) + } + var autoIDInfos []*TableAutoIDInfo + for rows.Next() { + var ( + dbName, tblName, columnName, idType string + nextID int64 + ) + columns, err := rows.Columns() + if err != nil { + return nil, errors.Trace(err) + } + + //+--------------+------------+-------------+--------------------+----------------+ + //| DB_NAME | TABLE_NAME | COLUMN_NAME | NEXT_GLOBAL_ROW_ID | ID_TYPE | + //+--------------+------------+-------------+--------------------+----------------+ + //| testsysbench | t | _tidb_rowid | 1 | AUTO_INCREMENT | + //+--------------+------------+-------------+--------------------+----------------+ + + // if columns length is 4, it doesn't contains the last column `ID_TYPE`, and it will always be 'AUTO_INCREMENT' + // for v4.0.0~v4.0.2 show table t next_row_id only returns 4 columns. + if len(columns) == 4 { + err = rows.Scan(&dbName, &tblName, &columnName, &nextID) + idType = "AUTO_INCREMENT" + } else { + err = rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType) + } + if err != nil { + return nil, errors.Trace(err) + } + autoIDInfos = append(autoIDInfos, &TableAutoIDInfo{ + Column: columnName, + NextID: nextID, + Type: idType, + }) + } + // Defer in for-loop would be costly, anyway, we don't need those rows after this turn of iteration. + //nolint:sqlclosecheck + if err := rows.Close(); err != nil { + return nil, errors.Trace(err) + } + if rows.Err() != nil { + return nil, errors.Trace(rows.Err()) + } + return autoIDInfos, nil +} diff --git a/pkg/lightning/checkpoints/checkpoints.go b/pkg/lightning/checkpoints/checkpoints.go index 70c9053bb..d412553e5 100644 --- a/pkg/lightning/checkpoints/checkpoints.go +++ b/pkg/lightning/checkpoints/checkpoints.go @@ -23,7 +23,6 @@ import ( "math" "os" "sort" - "strings" "sync" "github.com/joho/sqltocsv" @@ -63,7 +62,7 @@ const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. CheckpointTableNameTask = "task_v2" - CheckpointTableNameTable = "table_v6" + CheckpointTableNameTable = "table_v7" CheckpointTableNameEngine = "engine_v5" CheckpointTableNameChunk = "chunk_v5" @@ -99,6 +98,9 @@ const ( table_id bigint NOT NULL DEFAULT 0, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + kv_bytes bigint unsigned NOT NULL DEFAULT 0, + kv_kvs bigint unsigned NOT NULL DEFAULT 0, + kv_checksum bigint unsigned NOT NULL DEFAULT 0, INDEX(task_id) );` CreateEngineTableTemplate = ` @@ -154,7 +156,7 @@ const ( FROM %s.%s WHERE table_name = ? ORDER BY engine_id, path, offset;` ReadTableRemainTemplate = ` - SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;` + SELECT status, alloc_base, table_id, kv_bytes, kv_kvs, kv_checksum FROM %s.%s WHERE table_name = ?;` ReplaceEngineTemplate = ` REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);` ReplaceChunkTemplate = ` @@ -176,7 +178,8 @@ const ( UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;` UpdateTableStatusTemplate = ` UPDATE %s.%s SET status = ? WHERE table_name = ?;` - UpdateEngineTemplate = ` + UpdateTableChecksumTemplate = `UPDATE %s.%s SET kv_bytes = ?, kv_kvs = ?, kv_checksum = ? WHERE table_name = ?;` + UpdateEngineTemplate = ` UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);` DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;" ) @@ -278,6 +281,8 @@ type TableCheckpoint struct { AllocBase int64 Engines map[int32]*EngineCheckpoint TableID int64 + // remote checksum before restore + Checksum verify.KVChecksum } func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint { @@ -290,6 +295,7 @@ func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint { AllocBase: cp.AllocBase, Engines: engines, TableID: cp.TableID, + Checksum: cp.Checksum, } } @@ -315,11 +321,13 @@ type engineCheckpointDiff struct { } type TableCheckpointDiff struct { - hasStatus bool - hasRebase bool - status CheckpointStatus - allocBase int64 - engines map[int32]engineCheckpointDiff + hasStatus bool + hasRebase bool + hasChecksum bool + status CheckpointStatus + allocBase int64 + engines map[int32]engineCheckpointDiff + checksum verify.KVChecksum } func NewTableCheckpointDiff() *TableCheckpointDiff { @@ -438,6 +446,15 @@ func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { }) } +type TableChecksumMerger struct { + Checksum verify.KVChecksum +} + +func (m *TableChecksumMerger) MergeInto(cpd *TableCheckpointDiff) { + cpd.hasChecksum = true + cpd.checksum = m.Checksum +} + type RebaseCheckpointMerger struct { AllocBase int64 } @@ -591,10 +608,7 @@ type MySQLCheckpointsDB struct { } func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error) { - var escapedSchemaName strings.Builder - common.WriteMySQLIdentifier(&escapedSchemaName, schemaName) - schema := escapedSchemaName.String() - + schema := common.EscapeIdentifier(schemaName) sql := common.SQLWithRetry{ DB: db, Logger: log.With(zap.String("schema", schemaName)), @@ -780,12 +794,13 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab tableRow := tx.QueryRowContext(c, tableQuery, tableName) var status uint8 - if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID); err != nil { + var kvs, bytes, checksum uint64 + if err := tableRow.Scan(&status, &cp.AllocBase, &cp.TableID, &bytes, &kvs, &checksum); err != nil { if err == sql.ErrNoRows { return errors.NotFoundf("checkpoint for table %s", tableName) } - return errors.Trace(err) } + cp.Checksum = verify.MakeKVChecksum(bytes, kvs, checksum) cp.Status = CheckpointStatus(status) return nil }) @@ -849,6 +864,7 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi chunkQuery := fmt.Sprintf(UpdateChunkTemplate, cpdb.schema, CheckpointTableNameChunk) rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, cpdb.schema, CheckpointTableNameTable) tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, cpdb.schema, CheckpointTableNameTable) + tableChecksumQuery := fmt.Sprintf(UpdateTableChecksumTemplate, cpdb.schema, CheckpointTableNameTable) engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, cpdb.schema, CheckpointTableNameEngine) s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()} @@ -868,12 +884,16 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi return errors.Trace(e) } defer tableStatusStmt.Close() + tableChecksumStmt, e := tx.PrepareContext(c, tableChecksumQuery) + if e != nil { + return errors.Trace(e) + } + defer tableChecksumStmt.Close() engineStatusStmt, e := tx.PrepareContext(c, engineStatusQuery) if e != nil { return errors.Trace(e) } defer engineStatusStmt.Close() - for tableName, cpd := range checkpointDiffs { if cpd.hasStatus { if _, e := tableStatusStmt.ExecContext(c, cpd.status, tableName); e != nil { @@ -885,6 +905,11 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi return errors.Trace(e) } } + if cpd.hasChecksum { + if _, e := tableChecksumStmt.ExecContext(c, cpd.checksum.SumSize(), cpd.checksum.SumKVS(), cpd.checksum.Sum(), tableName); e != nil { + return errors.Trace(e) + } + } for engineID, engineDiff := range cpd.engines { if engineDiff.hasStatus { if _, e := engineStatusStmt.ExecContext(c, engineDiff.status, tableName, engineID); e != nil { @@ -1054,6 +1079,7 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC AllocBase: tableModel.AllocBase, Engines: make(map[int32]*EngineCheckpoint, len(tableModel.Engines)), TableID: tableModel.TableID, + Checksum: verify.MakeKVChecksum(tableModel.KvBytes, tableModel.KvKvs, tableModel.KvChecksum), } for engineID, engineModel := range tableModel.Engines { @@ -1152,6 +1178,11 @@ func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoin if cpd.hasRebase { tableModel.AllocBase = cpd.allocBase } + if cpd.hasChecksum { + tableModel.KvBytes = cpd.checksum.SumSize() + tableModel.KvKvs = cpd.checksum.SumKVS() + tableModel.KvChecksum = cpd.checksum.Sum() + } for engineID, engineDiff := range cpd.engines { engineModel := tableModel.Engines[engineID] if engineDiff.hasStatus { diff --git a/pkg/lightning/checkpoints/checkpoints_file_test.go b/pkg/lightning/checkpoints/checkpoints_file_test.go index 32dfc3647..a3df9c35b 100644 --- a/pkg/lightning/checkpoints/checkpoints_file_test.go +++ b/pkg/lightning/checkpoints/checkpoints_file_test.go @@ -117,6 +117,10 @@ func (s *cpFileSuite) SetUpTest(c *C) { AllocBase: 132861, } rcm.MergeInto(cpd) + cksum := checkpoints.TableChecksumMerger{ + Checksum: verification.MakeKVChecksum(4492, 686, 486070148910), + } + cksum.MergeInto(cpd) ccm := checkpoints.ChunkCheckpointMerger{ EngineID: 0, Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0}, @@ -158,6 +162,7 @@ func (s *cpFileSuite) TestGet(c *C) { c.Assert(cp, DeepEquals, &checkpoints.TableCheckpoint{ Status: checkpoints.CheckpointStatusAllWritten, AllocBase: 132861, + Checksum: verification.MakeKVChecksum(4492, 686, 486070148910), Engines: map[int32]*checkpoints.EngineCheckpoint{ -1: { Status: checkpoints.CheckpointStatusLoaded, diff --git a/pkg/lightning/checkpoints/checkpoints_sql_test.go b/pkg/lightning/checkpoints/checkpoints_sql_test.go index a9d18aa0b..146d1dffa 100644 --- a/pkg/lightning/checkpoints/checkpoints_sql_test.go +++ b/pkg/lightning/checkpoints/checkpoints_sql_test.go @@ -175,6 +175,10 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) { AllocBase: 132861, } rcm.MergeInto(cpd) + cksum := checkpoints.TableChecksumMerger{ + Checksum: verification.MakeKVChecksum(4492, 686, 486070148910), + } + cksum.MergeInto(cpd) ccm := checkpoints.ChunkCheckpointMerger{ EngineID: 0, Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0}, @@ -208,6 +212,12 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) { ExpectExec(). WithArgs(60, "`db1`.`t2`"). WillReturnResult(sqlmock.NewResult(14, 1)) + s.mock. + ExpectPrepare("UPDATE `mock-schema`\\.table_v\\d+ SET kv_bytes = .+"). + ExpectExec(). + WithArgs(4492, 686, 486070148910, "`db1`.`t2`"). + WillReturnResult(sqlmock.NewResult(15, 1)) + s.mock.ExpectCommit() s.mock.MatchExpectationsInOrder(false) @@ -245,8 +255,8 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) { ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+"). WithArgs("`db1`.`t2`"). WillReturnRows( - sqlmock.NewRows([]string{"status", "alloc_base", "table_id"}). - AddRow(60, 132861, int64(2)), + sqlmock.NewRows([]string{"status", "alloc_base", "table_id", "kv_bytes", "kv_kvs", "kv_checksum"}). + AddRow(60, 132861, int64(2), uint64(4492), uint64(686), uint64(486070148910)), ) s.mock.ExpectCommit() @@ -282,6 +292,7 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) { }}, }, }, + Checksum: verification.MakeKVChecksum(4492, 686, 486070148910), }) c.Assert(s.mock.ExpectationsWereMet(), IsNil) } diff --git a/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go b/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go index 10d9d5539..523a01fd2 100644 --- a/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go +++ b/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.pb.go @@ -109,11 +109,14 @@ func (m *TaskCheckpointModel) XXX_DiscardUnknown() { var xxx_messageInfo_TaskCheckpointModel proto.InternalMessageInfo type TableCheckpointModel struct { - Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` - Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` - AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` - Engines map[int32]*EngineCheckpointModel `protobuf:"bytes,8,rep,name=engines,proto3" json:"engines,omitempty" protobuf_key:"zigzag32,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - TableID int64 `protobuf:"varint,9,opt,name=tableID,proto3" json:"tableID,omitempty"` + Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` + Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` + AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` + Engines map[int32]*EngineCheckpointModel `protobuf:"bytes,8,rep,name=engines,proto3" json:"engines,omitempty" protobuf_key:"zigzag32,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + TableID int64 `protobuf:"varint,9,opt,name=tableID,proto3" json:"tableID,omitempty"` + KvBytes uint64 `protobuf:"varint,10,opt,name=kv_bytes,json=kvBytes,proto3" json:"kv_bytes,omitempty"` + KvKvs uint64 `protobuf:"varint,11,opt,name=kv_kvs,json=kvKvs,proto3" json:"kv_kvs,omitempty"` + KvChecksum uint64 `protobuf:"fixed64,12,opt,name=kv_checksum,json=kvChecksum,proto3" json:"kv_checksum,omitempty"` } func (m *TableCheckpointModel) Reset() { *m = TableCheckpointModel{} } @@ -255,59 +258,62 @@ func init() { } var fileDescriptor_c57c7b77a714394c = []byte{ - // 829 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x41, 0x6f, 0x1b, 0x45, - 0x14, 0xce, 0x7a, 0x63, 0xc7, 0x1e, 0xdb, 0xa9, 0x33, 0xa4, 0x65, 0x08, 0x60, 0x19, 0x97, 0x83, - 0xa5, 0x52, 0x5b, 0x2a, 0x17, 0x54, 0x01, 0x12, 0x49, 0x2a, 0x51, 0x45, 0x15, 0xd1, 0x50, 0x38, - 0x70, 0x59, 0xcd, 0xee, 0x4c, 0xec, 0xd5, 0x7a, 0x77, 0x56, 0x33, 0xb3, 0x4b, 0xdd, 0xff, 0x80, - 0xc4, 0xcf, 0xe0, 0x4f, 0x70, 0xaf, 0x38, 0xf5, 0xc8, 0x11, 0x92, 0x3b, 0xbf, 0x01, 0xcd, 0x9b, - 0x75, 0xbc, 0xae, 0xac, 0xa8, 0xb7, 0xf7, 0xbe, 0xf7, 0xbd, 0x6f, 0xde, 0x7b, 0xfb, 0x66, 0x16, - 0x7d, 0x9b, 0x27, 0xf3, 0xd9, 0x32, 0x9e, 0x2f, 0x4c, 0x16, 0x67, 0xf3, 0x59, 0xb4, 0x10, 0x51, - 0x92, 0xcb, 0x38, 0x33, 0xba, 0x6e, 0xe7, 0xe1, 0xec, 0x2a, 0x5e, 0x8a, 0xa0, 0x06, 0x4d, 0x73, - 0x25, 0x8d, 0x3c, 0x79, 0x3c, 0x8f, 0xcd, 0xa2, 0x08, 0xa7, 0x91, 0x4c, 0x67, 0x73, 0x39, 0x97, - 0x33, 0x80, 0xc3, 0xe2, 0x0a, 0x3c, 0x70, 0xc0, 0x72, 0xf4, 0xf1, 0x7f, 0x1e, 0x1a, 0x9c, 0x6d, - 0x44, 0x5e, 0x48, 0x2e, 0x96, 0xf8, 0x1c, 0x75, 0x6b, 0xc2, 0xc4, 0x1b, 0xf9, 0x93, 0xee, 0x93, - 0xf1, 0xf4, 0x5d, 0x5e, 0x1d, 0x78, 0x96, 0x19, 0xb5, 0xa2, 0xf5, 0x34, 0xfc, 0x0d, 0xba, 0x67, - 0x98, 0x4e, 0x6a, 0x35, 0x92, 0xc6, 0xc8, 0x9b, 0x74, 0x9f, 0x1c, 0x4f, 0x5f, 0x32, 0x9d, 0x6c, - 0x92, 0x41, 0x8c, 0x1e, 0x9a, 0x2d, 0xf0, 0xe4, 0xa7, 0xad, 0xc2, 0x40, 0x1f, 0x0f, 0x90, 0x9f, - 0x88, 0x15, 0xf1, 0x46, 0xde, 0xa4, 0x43, 0xad, 0x89, 0x1f, 0xa1, 0x66, 0xc9, 0x96, 0x85, 0xa8, - 0xa4, 0xef, 0x4f, 0x5f, 0xb2, 0x70, 0x29, 0xde, 0xd5, 0x76, 0x9c, 0xa7, 0x8d, 0xaf, 0xbc, 0xf1, - 0x1f, 0x0d, 0xf4, 0xc1, 0x8e, 0xe3, 0xf1, 0x87, 0xe8, 0x00, 0xaa, 0x8d, 0x39, 0xc8, 0xfb, 0xb4, - 0x65, 0xdd, 0xe7, 0x1c, 0x7f, 0x8a, 0x90, 0x96, 0x85, 0x8a, 0x44, 0xc0, 0x63, 0x05, 0xc7, 0x74, - 0x68, 0xc7, 0x21, 0xe7, 0xb1, 0xc2, 0x04, 0x1d, 0x84, 0x2c, 0x4a, 0x44, 0xc6, 0x89, 0x0f, 0xb1, - 0xb5, 0x8b, 0x1f, 0xa2, 0x7e, 0x9c, 0xe6, 0x52, 0x19, 0xa1, 0x02, 0xc6, 0xb9, 0x22, 0xfb, 0x10, - 0xef, 0xad, 0xc1, 0xef, 0x38, 0x57, 0xf8, 0x63, 0xd4, 0x31, 0x31, 0x0f, 0x83, 0x85, 0xd4, 0x86, - 0x34, 0x81, 0xd0, 0xb6, 0xc0, 0xf7, 0x52, 0x9b, 0xdb, 0xa0, 0xe5, 0x93, 0xd6, 0xc8, 0x9b, 0x34, - 0x5d, 0xf0, 0x52, 0x2a, 0x63, 0x0b, 0xce, 0xb9, 0x13, 0x3e, 0x80, 0xbc, 0x56, 0xce, 0x41, 0x72, - 0x8c, 0xfa, 0xda, 0x1e, 0xc0, 0x83, 0xa4, 0x84, 0x9a, 0xdb, 0x10, 0xee, 0x3a, 0xf0, 0xa2, 0xb4, - 0x55, 0x3f, 0x44, 0xfd, 0xdb, 0x1d, 0x0b, 0x4a, 0xa1, 0x48, 0xc7, 0xd5, 0x76, 0x0b, 0xfe, 0x2c, - 0xd4, 0xf8, 0xb7, 0x06, 0x3a, 0xde, 0x35, 0x4e, 0x8c, 0xd1, 0xfe, 0x82, 0xe9, 0x05, 0x0c, 0xaa, - 0x47, 0xc1, 0xc6, 0x0f, 0x50, 0x4b, 0x1b, 0x66, 0x0a, 0x0d, 0x63, 0xe8, 0xd3, 0xca, 0xb3, 0xe3, - 0x63, 0xcb, 0xa5, 0x8c, 0x82, 0x90, 0x69, 0x01, 0x23, 0xf0, 0x69, 0x07, 0x90, 0x53, 0xa6, 0x05, - 0xfe, 0x1a, 0x1d, 0x88, 0x6c, 0x1e, 0x67, 0x42, 0x93, 0x76, 0xb5, 0x66, 0xbb, 0x8e, 0x9c, 0x3e, - 0x73, 0x24, 0xb7, 0x66, 0xeb, 0x14, 0x3b, 0x7c, 0x63, 0xd9, 0xcf, 0xcf, 0xa1, 0x01, 0x9f, 0xae, - 0xdd, 0x13, 0x8a, 0x7a, 0xf5, 0x94, 0xfa, 0xe6, 0x1c, 0xb9, 0xcd, 0xf9, 0x62, 0x7b, 0x73, 0x1e, - 0x54, 0x47, 0xdc, 0xb1, 0x3a, 0x7f, 0x7a, 0xe8, 0xfe, 0x4e, 0x52, 0xad, 0x79, 0x6f, 0xab, 0xf9, - 0xa7, 0xa8, 0x15, 0x2d, 0x8a, 0x2c, 0xd1, 0xa4, 0x51, 0x35, 0xb7, 0x33, 0x7f, 0x7a, 0x06, 0x24, - 0xd7, 0x5c, 0x95, 0x71, 0x72, 0x89, 0xba, 0x35, 0xf8, 0x7d, 0x56, 0x1f, 0xe8, 0x77, 0xd4, 0xff, - 0x97, 0x8f, 0x8e, 0x77, 0x71, 0xec, 0xf7, 0xcc, 0x99, 0x59, 0x54, 0xe2, 0x60, 0xdb, 0x96, 0xe4, - 0xd5, 0x95, 0x16, 0xee, 0xd2, 0xfa, 0xb4, 0xf2, 0xf0, 0x63, 0x84, 0x23, 0xb9, 0x2c, 0xd2, 0x2c, - 0xc8, 0x85, 0x4a, 0x0b, 0xc3, 0x4c, 0x2c, 0x33, 0xd2, 0x1b, 0xf9, 0x93, 0x26, 0x3d, 0x72, 0x91, - 0xcb, 0x4d, 0xc0, 0x7e, 0x7e, 0x91, 0xf1, 0xa0, 0x92, 0x6a, 0xba, 0xcf, 0x2f, 0x32, 0xfe, 0x83, - 0x53, 0x1b, 0x20, 0x3f, 0x97, 0x1a, 0x76, 0xdb, 0xa7, 0xd6, 0xc4, 0x9f, 0xa3, 0xc3, 0x5c, 0x89, - 0x32, 0x50, 0xf2, 0xd7, 0x98, 0x07, 0x29, 0x7b, 0x05, 0xdb, 0xed, 0xd3, 0x9e, 0x45, 0xa9, 0x05, - 0x5f, 0xb0, 0x57, 0xf6, 0x66, 0x6c, 0x08, 0x6d, 0x20, 0xb4, 0x55, 0x2d, 0x98, 0x94, 0x51, 0x10, - 0xae, 0x8c, 0xd0, 0xb0, 0x17, 0xfb, 0xb4, 0x9d, 0x94, 0xd1, 0xa9, 0xf5, 0xed, 0xb5, 0xb1, 0xc1, - 0xa4, 0xd4, 0x04, 0x41, 0xa8, 0x95, 0x94, 0xd1, 0x45, 0xa9, 0xf1, 0x67, 0xa8, 0x67, 0x03, 0xf0, - 0x5a, 0xe9, 0x22, 0x25, 0xdd, 0x91, 0x37, 0x69, 0xd1, 0x6e, 0x52, 0x46, 0x67, 0x15, 0x84, 0x3f, - 0xb1, 0xf7, 0x31, 0x15, 0xda, 0xb0, 0x34, 0x27, 0xfd, 0x91, 0x37, 0x19, 0xd0, 0x0d, 0x60, 0xa7, - 0x68, 0x56, 0xb9, 0x20, 0x87, 0x70, 0x51, 0xc1, 0xc6, 0x23, 0xd4, 0x8d, 0x64, 0x9a, 0x2b, 0xa1, - 0xb5, 0x1d, 0xd3, 0x3d, 0x08, 0xd5, 0x21, 0xfc, 0x11, 0x6a, 0xdb, 0x8b, 0x19, 0xd8, 0x8f, 0x3b, - 0x70, 0x0f, 0x88, 0xf5, 0x2f, 0xc4, 0xca, 0xf6, 0x01, 0x8f, 0xbc, 0x8e, 0x5f, 0x0b, 0x72, 0xe4, - 0x9a, 0xb4, 0xc0, 0x8f, 0xf1, 0x6b, 0x71, 0xfa, 0xe8, 0xcd, 0xbf, 0xc3, 0xbd, 0x37, 0xd7, 0x43, - 0xef, 0xed, 0xf5, 0xd0, 0xfb, 0xe7, 0x7a, 0xe8, 0xfd, 0x7e, 0x33, 0xdc, 0x7b, 0x7b, 0x33, 0xdc, - 0xfb, 0xfb, 0x66, 0xb8, 0xf7, 0x4b, 0x7f, 0xeb, 0x5f, 0x11, 0xb6, 0xe0, 0xb1, 0xff, 0xf2, 0xff, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x7b, 0xd6, 0xd3, 0x15, 0x5d, 0x06, 0x00, 0x00, + // 870 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcd, 0x6e, 0xdb, 0x46, + 0x10, 0x36, 0x45, 0x8b, 0x92, 0x96, 0x92, 0x23, 0x6f, 0xed, 0x64, 0xeb, 0xb6, 0x2a, 0xab, 0xf4, + 0x20, 0x20, 0x8d, 0x04, 0xa4, 0x97, 0x22, 0x68, 0x0b, 0xd4, 0x76, 0x80, 0x06, 0x46, 0x50, 0x63, + 0x9b, 0xf6, 0xd0, 0x0b, 0xc1, 0x9f, 0xb5, 0x44, 0xac, 0xc8, 0x25, 0xb8, 0x4b, 0x36, 0xca, 0x53, + 0xf4, 0x31, 0xfa, 0x12, 0xbd, 0x07, 0x3d, 0xe5, 0xd8, 0x63, 0x6a, 0xdf, 0xfb, 0x0c, 0xc5, 0xce, + 0x52, 0x16, 0x15, 0x08, 0x46, 0x6e, 0x33, 0xdf, 0x7c, 0x3b, 0x3b, 0xfb, 0xe9, 0x1b, 0x11, 0x7d, + 0x9f, 0xf3, 0xf9, 0x6c, 0x99, 0xcc, 0x17, 0x2a, 0x4b, 0xb2, 0xf9, 0x2c, 0x5a, 0xb0, 0x88, 0xe7, + 0x22, 0xc9, 0x94, 0x6c, 0xc6, 0x79, 0x38, 0xbb, 0x4a, 0x96, 0xcc, 0x6f, 0x40, 0xd3, 0xbc, 0x10, + 0x4a, 0x9c, 0x3c, 0x9e, 0x27, 0x6a, 0x51, 0x86, 0xd3, 0x48, 0xa4, 0xb3, 0xb9, 0x98, 0x8b, 0x19, + 0xc0, 0x61, 0x79, 0x05, 0x19, 0x24, 0x10, 0x19, 0xfa, 0xf8, 0x3f, 0x0b, 0x0d, 0xcf, 0x36, 0x4d, + 0x5e, 0x88, 0x98, 0x2d, 0xf1, 0x39, 0x72, 0x1b, 0x8d, 0x89, 0xe5, 0xd9, 0x13, 0xf7, 0xc9, 0x78, + 0xfa, 0x3e, 0xaf, 0x09, 0x3c, 0xcb, 0x54, 0xb1, 0xa2, 0xcd, 0x63, 0xf8, 0x3b, 0x74, 0x4f, 0x05, + 0x92, 0x37, 0x66, 0x24, 0x2d, 0xcf, 0x9a, 0xb8, 0x4f, 0x8e, 0xa6, 0x2f, 0x03, 0xc9, 0x37, 0x87, + 0xa1, 0x19, 0x3d, 0x50, 0x5b, 0xe0, 0xc9, 0x2f, 0x5b, 0x83, 0x41, 0x7f, 0x3c, 0x44, 0x36, 0x67, + 0x2b, 0x62, 0x79, 0xd6, 0xa4, 0x47, 0x75, 0x88, 0x1f, 0xa1, 0x76, 0x15, 0x2c, 0x4b, 0x56, 0xb7, + 0x3e, 0x9e, 0xbe, 0x0c, 0xc2, 0x25, 0x7b, 0xbf, 0xb7, 0xe1, 0x3c, 0x6d, 0x7d, 0x63, 0x8d, 0xff, + 0x6c, 0xa1, 0x8f, 0x76, 0x5c, 0x8f, 0x1f, 0xa0, 0x0e, 0x4c, 0x9b, 0xc4, 0xd0, 0xde, 0xa6, 0x8e, + 0x4e, 0x9f, 0xc7, 0xf8, 0x33, 0x84, 0xa4, 0x28, 0x8b, 0x88, 0xf9, 0x71, 0x52, 0xc0, 0x35, 0x3d, + 0xda, 0x33, 0xc8, 0x79, 0x52, 0x60, 0x82, 0x3a, 0x61, 0x10, 0x71, 0x96, 0xc5, 0xc4, 0x86, 0xda, + 0x3a, 0xc5, 0x0f, 0xd1, 0x20, 0x49, 0x73, 0x51, 0x28, 0x56, 0xf8, 0x41, 0x1c, 0x17, 0x64, 0x1f, + 0xea, 0xfd, 0x35, 0xf8, 0x43, 0x1c, 0x17, 0xf8, 0x13, 0xd4, 0x53, 0x49, 0x1c, 0xfa, 0x0b, 0x21, + 0x15, 0x69, 0x03, 0xa1, 0xab, 0x81, 0x1f, 0x85, 0x54, 0xb7, 0x45, 0xcd, 0x27, 0x8e, 0x67, 0x4d, + 0xda, 0xa6, 0x78, 0x29, 0x0a, 0xa5, 0x07, 0xce, 0x63, 0xd3, 0xb8, 0x03, 0xe7, 0x9c, 0x3c, 0x86, + 0x96, 0x63, 0x34, 0x90, 0xfa, 0x82, 0xd8, 0xe7, 0x15, 0xcc, 0xdc, 0x85, 0xb2, 0x6b, 0xc0, 0x8b, + 0x4a, 0x4f, 0xfd, 0x10, 0x0d, 0x6e, 0x3d, 0xe6, 0x57, 0xac, 0x20, 0x3d, 0x33, 0xdb, 0x2d, 0xf8, + 0x2b, 0x2b, 0xc6, 0xef, 0x5a, 0xe8, 0x68, 0x97, 0x9c, 0x18, 0xa3, 0xfd, 0x45, 0x20, 0x17, 0x20, + 0x54, 0x9f, 0x42, 0x8c, 0xef, 0x23, 0x47, 0xaa, 0x40, 0x95, 0x12, 0x64, 0x18, 0xd0, 0x3a, 0xd3, + 0xf2, 0x05, 0xcb, 0xa5, 0x88, 0xfc, 0x30, 0x90, 0x0c, 0x24, 0xb0, 0x69, 0x0f, 0x90, 0xd3, 0x40, + 0x32, 0xfc, 0x2d, 0xea, 0xb0, 0x6c, 0x9e, 0x64, 0x4c, 0x92, 0x6e, 0x6d, 0xb3, 0x5d, 0x57, 0x4e, + 0x9f, 0x19, 0x92, 0xb1, 0xd9, 0xfa, 0x88, 0x16, 0x5f, 0x69, 0xf6, 0xf3, 0x73, 0x78, 0x80, 0x4d, + 0xd7, 0x29, 0xfe, 0x18, 0x75, 0x79, 0xe5, 0x87, 0x2b, 0xc5, 0x24, 0x41, 0x9e, 0x35, 0xd9, 0xa7, + 0x1d, 0x5e, 0x9d, 0xea, 0x14, 0x1f, 0x23, 0x87, 0x57, 0x3e, 0xaf, 0x24, 0x71, 0xa1, 0xd0, 0xe6, + 0xd5, 0x45, 0x25, 0xf1, 0xe7, 0xc8, 0xe5, 0x95, 0x31, 0xab, 0x2c, 0x53, 0xd2, 0xf7, 0xac, 0x89, + 0x43, 0x11, 0xaf, 0xce, 0x6a, 0xe4, 0x84, 0xa2, 0x7e, 0x73, 0x8a, 0xa6, 0x19, 0x0f, 0x8d, 0x19, + 0xbf, 0xda, 0x36, 0xe3, 0xfd, 0x7a, 0xea, 0x3b, 0xdc, 0xf8, 0x97, 0x85, 0x8e, 0x77, 0x92, 0x1a, + 0x7a, 0x5a, 0x5b, 0x7a, 0x3e, 0x45, 0x4e, 0xb4, 0x28, 0x33, 0x2e, 0x49, 0xab, 0xd6, 0x6b, 0xe7, + 0xf9, 0xe9, 0x19, 0x90, 0x8c, 0x5e, 0xf5, 0x89, 0x93, 0x4b, 0xe4, 0x36, 0xe0, 0x0f, 0xd9, 0x26, + 0xa0, 0xdf, 0x31, 0xff, 0xdf, 0x36, 0x3a, 0xda, 0xc5, 0xd1, 0x16, 0xc9, 0x03, 0xb5, 0xa8, 0x9b, + 0x43, 0xac, 0x9f, 0x24, 0xae, 0xae, 0x24, 0x33, 0xff, 0x03, 0x36, 0xad, 0x33, 0xfc, 0x18, 0xe1, + 0x48, 0x2c, 0xcb, 0x34, 0xf3, 0x73, 0x56, 0xa4, 0xa5, 0x0a, 0x54, 0x22, 0x32, 0xd2, 0xf7, 0xec, + 0x49, 0x9b, 0x1e, 0x9a, 0xca, 0xe5, 0xa6, 0xa0, 0x1d, 0xc5, 0xb2, 0xd8, 0xaf, 0x5b, 0xb5, 0x8d, + 0xa3, 0x58, 0x16, 0xff, 0x64, 0xba, 0x0d, 0x91, 0x9d, 0x0b, 0x09, 0xeb, 0x62, 0x53, 0x1d, 0xe2, + 0x2f, 0xd1, 0x41, 0x5e, 0xb0, 0xca, 0x2f, 0xc4, 0xef, 0x49, 0xec, 0xa7, 0xc1, 0x2b, 0x58, 0x18, + 0x9b, 0xf6, 0x35, 0x4a, 0x35, 0xf8, 0x22, 0x78, 0xa5, 0x97, 0x6d, 0x43, 0xe8, 0x02, 0xa1, 0x5b, + 0x34, 0x8a, 0xbc, 0x8a, 0x6a, 0x3f, 0xf5, 0xc0, 0x36, 0x5d, 0x5e, 0x45, 0xc6, 0x50, 0x0f, 0x50, + 0x47, 0x17, 0xb5, 0xa3, 0x8c, 0xd5, 0x1c, 0x5e, 0x45, 0xda, 0x52, 0x5f, 0xa0, 0xbe, 0x2e, 0xdc, + 0x7a, 0xca, 0x05, 0x4f, 0xb9, 0xbc, 0x8a, 0xd6, 0xa6, 0xc2, 0x9f, 0xea, 0x15, 0x4f, 0x99, 0x54, + 0x41, 0x9a, 0x93, 0x81, 0x67, 0x4d, 0x86, 0x74, 0x03, 0x68, 0x15, 0xd5, 0x2a, 0x67, 0xe4, 0x00, + 0x76, 0x1f, 0x62, 0xec, 0x21, 0x37, 0x12, 0x69, 0x5e, 0x30, 0x29, 0xb5, 0x4c, 0xf7, 0xa0, 0xd4, + 0x84, 0xb4, 0xf7, 0xf5, 0xae, 0xfb, 0xfa, 0xc7, 0x1d, 0x9a, 0xff, 0x24, 0x9d, 0x5f, 0xb0, 0x95, + 0x7e, 0x07, 0x7c, 0x37, 0x64, 0xf2, 0x9a, 0x91, 0x43, 0xf3, 0x48, 0x0d, 0xfc, 0x9c, 0xbc, 0x66, + 0xa7, 0x8f, 0xde, 0xfc, 0x3b, 0xda, 0x7b, 0x73, 0x3d, 0xb2, 0xde, 0x5e, 0x8f, 0xac, 0x77, 0xd7, + 0x23, 0xeb, 0x8f, 0x9b, 0xd1, 0xde, 0xdb, 0x9b, 0xd1, 0xde, 0x3f, 0x37, 0xa3, 0xbd, 0xdf, 0x06, + 0x5b, 0x9f, 0x9f, 0xd0, 0x81, 0xef, 0xc7, 0xd7, 0xff, 0x07, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xff, + 0xf2, 0x75, 0xb0, 0x06, 0x00, 0x00, } func (m *CheckpointsModel) Marshal() (dAtA []byte, err error) { @@ -473,6 +479,22 @@ func (m *TableCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.KvChecksum != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(m.KvChecksum)) + i-- + dAtA[i] = 0x61 + } + if m.KvKvs != 0 { + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.KvKvs)) + i-- + dAtA[i] = 0x58 + } + if m.KvBytes != 0 { + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.KvBytes)) + i-- + dAtA[i] = 0x50 + } if m.TableID != 0 { i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.TableID)) i-- @@ -810,6 +832,15 @@ func (m *TableCheckpointModel) Size() (n int) { if m.TableID != 0 { n += 1 + sovFileCheckpoints(uint64(m.TableID)) } + if m.KvBytes != 0 { + n += 1 + sovFileCheckpoints(uint64(m.KvBytes)) + } + if m.KvKvs != 0 { + n += 1 + sovFileCheckpoints(uint64(m.KvKvs)) + } + if m.KvChecksum != 0 { + n += 9 + } return n } @@ -1675,6 +1706,54 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { break } } + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field KvBytes", wireType) + } + m.KvBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.KvBytes |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field KvKvs", wireType) + } + m.KvKvs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.KvKvs |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 12: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field KvChecksum", wireType) + } + m.KvChecksum = 0 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + m.KvChecksum = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 default: iNdEx = preIndex skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) diff --git a/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto b/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto index 261a405da..9f8708a1f 100644 --- a/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto +++ b/pkg/lightning/checkpoints/checkpointspb/file_checkpoints.proto @@ -42,6 +42,9 @@ message TableCheckpointModel { int64 alloc_base = 4; map engines = 8; int64 tableID = 9; + uint64 kv_bytes = 10; + uint64 kv_kvs = 11; + fixed64 kv_checksum = 12; } message EngineCheckpointModel { @@ -67,4 +70,3 @@ message ChunkCheckpointModel { string sort_key = 16; int64 file_size = 17; } - diff --git a/pkg/lightning/common/util.go b/pkg/lightning/common/util.go index c0b6b17c6..c0ea2622e 100644 --- a/pkg/lightning/common/util.go +++ b/pkg/lightning/common/util.go @@ -99,9 +99,21 @@ func IsEmptyDir(name string) bool { return len(entries) == 0 } +type QueryExecutor interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row +} + +type DBExecutor interface { + QueryExecutor + BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) +} + // SQLWithRetry constructs a retryable transaction. type SQLWithRetry struct { - DB *sql.DB + // either *sql.DB or *sql.Conn + DB DBExecutor Logger log.Logger HideQueryLog bool } @@ -130,6 +142,7 @@ outside: logger.Warn(purpose+" failed but going to try again", log.ShortError(err)) continue default: + logger.Warn(purpose+" failed with no retry", log.ShortError(err)) break outside } } @@ -261,6 +274,13 @@ func UniqueTable(schema string, table string) string { return builder.String() } +// EscapeIdentifier quote and escape an sql identifier +func EscapeIdentifier(identifier string) string { + var builder strings.Builder + WriteMySQLIdentifier(&builder, identifier) + return builder.String() +} + // Writes a MySQL identifier into the string builder. // The identifier is always escaped into the form "`foo`". func WriteMySQLIdentifier(builder *strings.Builder, identifier string) { diff --git a/pkg/lightning/config/config.go b/pkg/lightning/config/config.go index 3803c05d6..c649de098 100644 --- a/pkg/lightning/config/config.go +++ b/pkg/lightning/config/config.go @@ -71,6 +71,9 @@ const ( defaultIndexSerialScanConcurrency = 20 defaultChecksumTableConcurrency = 2 + // defaultMetaSchemaName is the default database name used to store lightning metadata + defaultMetaSchemaName = "lightning_metadata" + // autoDiskQuotaLocalReservedSpeed is the estimated size increase per // millisecond per write thread the local backend may gain on all engines. // This is used to compute the maximum size overshoot between two disk quota @@ -148,11 +151,12 @@ func (cfg *Config) ToTLS() (*common.TLS, error) { } type Lightning struct { - TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"` - IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"` - RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"` - IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"` - CheckRequirements bool `toml:"check-requirements" json:"check-requirements"` + TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"` + IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"` + RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"` + IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"` + CheckRequirements bool `toml:"check-requirements" json:"check-requirements"` + MetaSchemaName string `toml:"meta-schema-name" json:"meta-schema-name"` } type PostOpLevel int @@ -656,6 +660,9 @@ func (cfg *Config) DefaultVarsForImporterAndLocalBackend() { if cfg.App.TableConcurrency == 0 { cfg.App.TableConcurrency = 6 } + if len(cfg.App.MetaSchemaName) == 0 { + cfg.App.MetaSchemaName = defaultMetaSchemaName + } if cfg.TikvImporter.RangeConcurrency == 0 { cfg.TikvImporter.RangeConcurrency = 16 } diff --git a/pkg/lightning/restore/meta_manager.go b/pkg/lightning/restore/meta_manager.go new file mode 100644 index 000000000..bbef6fa6e --- /dev/null +++ b/pkg/lightning/restore/meta_manager.go @@ -0,0 +1,809 @@ +// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/lightning/backend/tidb" + "github.com/pingcap/br/pkg/lightning/common" + "github.com/pingcap/br/pkg/lightning/log" + verify "github.com/pingcap/br/pkg/lightning/verification" + "github.com/pingcap/br/pkg/pdutil" + "github.com/pingcap/br/pkg/redact" +) + +type metaMgrBuilder interface { + Init(ctx context.Context) error + TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr + TableMetaMgr(tr *TableRestore) tableMetaMgr +} + +type dbMetaMgrBuilder struct { + db *sql.DB + taskID int64 + schema string +} + +func (b *dbMetaMgrBuilder) Init(ctx context.Context) error { + exec := common.SQLWithRetry{ + DB: b.db, + Logger: log.L(), + HideQueryLog: redact.NeedRedact(), + } + metaDBSQL := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", common.EscapeIdentifier(b.schema)) + if err := exec.Exec(ctx, "create meta schema", metaDBSQL); err != nil { + return errors.Annotate(err, "create meta schema failed") + } + taskMetaSQL := fmt.Sprintf(CreateTaskMetaTable, common.UniqueTable(b.schema, taskMetaTableName)) + if err := exec.Exec(ctx, "create meta table", taskMetaSQL); err != nil { + return errors.Annotate(err, "create task meta table failed") + } + tableMetaSQL := fmt.Sprintf(CreateTableMetadataTable, common.UniqueTable(b.schema, tableMetaTableName)) + if err := exec.Exec(ctx, "create meta table", tableMetaSQL); err != nil { + return errors.Annotate(err, "create table meta table failed") + } + return nil +} + +func (b *dbMetaMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { + return &dbTaskMetaMgr{ + session: b.db, + taskID: b.taskID, + pd: pd, + tableName: common.UniqueTable(b.schema, taskMetaTableName), + schemaName: b.schema, + } +} + +func (b *dbMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { + return &dbTableMetaMgr{ + session: b.db, + taskID: b.taskID, + tr: tr, + tableName: common.UniqueTable(b.schema, tableMetaTableName), + } +} + +type tableMetaMgr interface { + InitTableMeta(ctx context.Context) error + AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) + UpdateTableStatus(ctx context.Context, status metaStatus) error + UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error + CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum) (bool, *verify.KVChecksum, error) + FinishTable(ctx context.Context) error +} + +type dbTableMetaMgr struct { + session *sql.DB + taskID int64 + tr *TableRestore + tableName string +} + +func (m *dbTableMetaMgr) InitTableMeta(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + // avoid override existing metadata if the meta is already inserted. + stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, table_id, table_name, status) values (?, ?, ?, ?)`, m.tableName) + task := m.tr.logger.Begin(zap.DebugLevel, "init table meta") + err := exec.Exec(ctx, "init table meta", stmt, m.taskID, m.tr.tableInfo.ID, m.tr.tableName, metaStatusInitial.String()) + task.End(zap.ErrorLevel, err) + return errors.Trace(err) +} + +type metaStatus uint32 + +const ( + metaStatusInitial metaStatus = iota + metaStatusRowIDAllocated + metaStatusRestoreStarted + metaStatusRestoreFinished + metaStatusChecksuming + metaStatusChecksumSkipped + metaStatusFinished +) + +func (m metaStatus) String() string { + switch m { + case metaStatusInitial: + return "initialized" + case metaStatusRowIDAllocated: + return "allocated" + case metaStatusRestoreStarted: + return "restore" + case metaStatusRestoreFinished: + return "restore_finished" + case metaStatusChecksuming: + return "checksuming" + case metaStatusChecksumSkipped: + return "checksum_skipped" + case metaStatusFinished: + return "finish" + default: + panic(fmt.Sprintf("unexpected metaStatus value '%d'", m)) + } +} + +func parseMetaStatus(s string) (metaStatus, error) { + switch s { + case "", "initialized": + return metaStatusInitial, nil + case "allocated": + return metaStatusRowIDAllocated, nil + case "restore": + return metaStatusRestoreStarted, nil + case "restore_finished": + return metaStatusRestoreFinished, nil + case "checksuming": + return metaStatusChecksuming, nil + case "checksum_skipped": + return metaStatusChecksumSkipped, nil + case "finish": + return metaStatusFinished, nil + default: + return metaStatusInitial, errors.Errorf("invalid meta status '%s'", s) + } +} + +func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { + conn, err := m.session.Conn(ctx) + if err != nil { + return nil, 0, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + var newRowIDBase, newRowIDMax int64 + curStatus := metaStatusInitial + newStatus := metaStatusRowIDAllocated + var baseTotalKvs, baseTotalBytes, baseChecksum uint64 + err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") + if err != nil { + return nil, 0, errors.Annotate(err, "enable pessimistic transaction failed") + } + needAutoID := common.TableHasAutoRowID(m.tr.tableInfo.Core) || m.tr.tableInfo.Core.GetAutoIncrementColInfo() != nil || m.tr.tableInfo.Core.ContainsAutoRandomBits() + err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) + rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID) + if err != nil { + return errors.Trace(err) + } + defer rows.Close() + var ( + metaTaskID, rowIDBase, rowIDMax, maxRowIDMax int64 + totalKvs, totalBytes, checksum uint64 + statusValue string + ) + for rows.Next() { + if err = rows.Scan(&metaTaskID, &rowIDBase, &rowIDMax, &totalKvs, &totalBytes, &checksum, &statusValue); err != nil { + return errors.Trace(err) + } + status, err := parseMetaStatus(statusValue) + if err != nil { + return errors.Annotatef(err, "invalid meta status '%s'", statusValue) + } + + // skip finished meta + if status >= metaStatusFinished { + continue + } + + if status == metaStatusChecksuming { + return errors.New("target table is calculating checksum, please wait unit the checksum is finished and try again.") + } + + if metaTaskID == m.taskID { + curStatus = status + baseChecksum = checksum + baseTotalKvs = totalKvs + baseTotalBytes = totalBytes + if status >= metaStatusRowIDAllocated { + if rowIDMax-rowIDBase != rawRowIDMax { + return errors.Errorf("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase) + } + newRowIDBase = rowIDBase + newRowIDMax = rowIDMax + break + } + continue + } + + // other tasks has finished this logic, we needn't do again. + if status >= metaStatusRowIDAllocated { + newStatus = metaStatusRestoreStarted + } + + if rowIDMax > maxRowIDMax { + maxRowIDMax = rowIDMax + } + } + + // no enough info are available, fetch row_id max for table + if curStatus == metaStatusInitial { + if needAutoID && maxRowIDMax == 0 { + // NOTE: currently, if a table contains auto_incremental unique key and _tidb_rowid, + // the `show table next_row_id` will returns the unique key field only. + var autoIDField string + for _, col := range m.tr.tableInfo.Core.Columns { + if mysql.HasAutoIncrementFlag(col.Flag) { + autoIDField = col.Name.L + break + } else if mysql.HasPriKeyFlag(col.Flag) && m.tr.tableInfo.Core.AutoRandomBits > 0 { + autoIDField = col.Name.L + break + } + } + if len(autoIDField) == 0 && common.TableHasAutoRowID(m.tr.tableInfo.Core) { + autoIDField = model.ExtraHandleName.L + } + if len(autoIDField) == 0 { + return errors.Errorf("table %s contains auto increment id or _tidb_rowid, but target field not found", m.tr.tableName) + } + + autoIDInfos, err := tidb.FetchTableAutoIDInfos(ctx, tx, m.tr.tableName) + if err != nil { + return errors.Trace(err) + } + found := false + for _, info := range autoIDInfos { + if strings.ToLower(info.Column) == autoIDField { + maxRowIDMax = info.NextID - 1 + found = true + break + } + } + if !found { + return errors.Errorf("can't fetch previous auto id base for table %s field '%s'", m.tr.tableName, autoIDField) + } + } + newRowIDBase = maxRowIDMax + newRowIDMax = newRowIDBase + rawRowIDMax + // table contains no data, can skip checksum + if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted { + newStatus = metaStatusRestoreStarted + } + query = fmt.Sprintf("update %s set row_id_base = ?, row_id_max = ?, status = ? where table_id = ? and task_id = ?", m.tableName) + _, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID) + if err != nil { + return errors.Trace(err) + } + + curStatus = newStatus + } + return nil + }) + if err != nil { + return nil, 0, errors.Trace(err) + } + + var checksum *verify.KVChecksum + // need to do checksum and update checksum meta since we are the first one. + if curStatus < metaStatusRestoreStarted { + // table contains data but haven't do checksum yet + if (newRowIDBase > 0 || !needAutoID) && baseTotalKvs == 0 { + remoteCk, err := DoChecksum(ctx, m.tr.tableInfo) + if err != nil { + return nil, 0, errors.Trace(err) + } + + if remoteCk.Checksum != baseChecksum || remoteCk.TotalKVs != baseTotalKvs || remoteCk.TotalBytes != baseTotalBytes { + ck := verify.MakeKVChecksum(remoteCk.TotalBytes, remoteCk.TotalKVs, remoteCk.Checksum) + checksum = &ck + } + + } + + if checksum != nil { + if err = m.UpdateTableBaseChecksum(ctx, checksum); err != nil { + return nil, 0, errors.Trace(err) + } + + m.tr.logger.Info("checksum before restore table", zap.Object("checksum", checksum)) + } else if err = m.UpdateTableStatus(ctx, metaStatusRestoreStarted); err != nil { + return nil, 0, errors.Trace(err) + } + } + if checksum == nil && baseTotalKvs > 0 { + ck := verify.MakeKVChecksum(baseTotalBytes, baseTotalKvs, baseChecksum) + checksum = &ck + } + log.L().Info("allocate table row_id base", zap.String("table", m.tr.tableName), + zap.Int64("row_id_base", newRowIDBase)) + if checksum != nil { + log.L().Info("checksum base", zap.Any("checksum", checksum)) + } + return checksum, newRowIDBase, nil +} + +func (m *dbTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + query := fmt.Sprintf("update %s set total_kvs_base = ?, total_bytes_base = ?, checksum_base = ?, status = ? where table_id = ? and task_id = ?", m.tableName) + + return exec.Exec(ctx, "update base checksum", query, checksum.SumKVS(), + checksum.SumSize(), checksum.Sum(), metaStatusRestoreStarted.String(), m.tr.tableInfo.ID, m.taskID) +} + +func (m *dbTableMetaMgr) UpdateTableStatus(ctx context.Context, status metaStatus) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + query := fmt.Sprintf("update %s set status = ? where table_id = ? and task_id = ?", m.tableName) + return exec.Exec(ctx, "update meta status", query, status.String(), m.tr.tableInfo.ID, m.taskID) +} + +func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum) (bool, *verify.KVChecksum, error) { + conn, err := m.session.Conn(ctx) + if err != nil { + return false, nil, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") + if err != nil { + return false, nil, errors.Annotate(err, "enable pessimistic transaction failed") + } + var ( + baseTotalKvs, baseTotalBytes, baseChecksum uint64 + taskKvs, taskBytes, taskChecksum uint64 + totalKvs, totalBytes, totalChecksum uint64 + ) + newStatus := metaStatusChecksuming + needChecksum := true + err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id, total_kvs_base, total_bytes_base, checksum_base, total_kvs, total_bytes, checksum, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) + rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID) + if err != nil { + return errors.Annotate(err, "fetch task meta failed") + } + closed := false + defer func() { + if !closed { + rows.Close() + } + }() + var ( + taskID int64 + statusValue string + ) + for rows.Next() { + if err = rows.Scan(&taskID, &baseTotalKvs, &baseTotalBytes, &baseChecksum, &taskKvs, &taskBytes, &taskChecksum, &statusValue); err != nil { + return errors.Trace(err) + } + status, err := parseMetaStatus(statusValue) + if err != nil { + return errors.Annotatef(err, "invalid meta status '%s'", statusValue) + } + + // skip finished meta + if status >= metaStatusFinished { + continue + } + + if taskID == m.taskID { + if status >= metaStatusChecksuming { + newStatus = status + needChecksum = status == metaStatusChecksuming + return nil + } + + continue + } + + if status < metaStatusChecksuming { + newStatus = metaStatusChecksumSkipped + needChecksum = false + break + } else if status == metaStatusChecksuming { + return errors.New("another task is checksuming, there must be something wrong!") + } + + totalBytes += baseTotalBytes + totalKvs += baseTotalKvs + totalChecksum ^= baseChecksum + + totalBytes += taskBytes + totalKvs += taskKvs + totalChecksum ^= taskChecksum + } + rows.Close() + closed = true + + query = fmt.Sprintf("update %s set total_kvs = ?, total_bytes = ?, checksum = ?, status = ? where table_id = ? and task_id = ?", m.tableName) + _, err = tx.ExecContext(ctx, query, checksum.SumKVS(), checksum.SumSize(), checksum.Sum(), newStatus.String(), m.tr.tableInfo.ID, m.taskID) + return errors.Annotate(err, "update local checksum failed") + }) + if err != nil { + return false, nil, err + } + + var remoteChecksum *verify.KVChecksum + if needChecksum { + ck := verify.MakeKVChecksum(totalBytes, totalKvs, totalChecksum) + remoteChecksum = &ck + } + log.L().Info("check table checksum", zap.String("table", m.tr.tableName), + zap.Bool("checksum", needChecksum), zap.String("new_status", newStatus.String())) + return needChecksum, remoteChecksum, nil +} + +func (m *dbTableMetaMgr) FinishTable(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: m.tr.logger, + } + query := fmt.Sprintf("DELETE FROM %s where table_id = ? and (status = 'checksuming' or status = 'checksum_skipped')", m.tableName) + return exec.Exec(ctx, "clean up metas", query, m.tr.tableInfo.ID) +} + +type taskMetaMgr interface { + InitTask(ctx context.Context) error + CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) + CheckAndFinishRestore(ctx context.Context) (bool, error) + Cleanup(ctx context.Context) error + CleanupAllMetas(ctx context.Context) error +} + +type dbTaskMetaMgr struct { + session *sql.DB + taskID int64 + pd *pdutil.PdController + // unique name of task meta table + tableName string + schemaName string +} + +type taskMetaStatus uint32 + +const ( + taskMetaStatusInitial taskMetaStatus = iota + taskMetaStatusScheduleSet + taskMetaStatusSwitchSkipped + taskMetaStatusSwitchBack +) + +func (m taskMetaStatus) String() string { + switch m { + case taskMetaStatusInitial: + return "initialized" + case taskMetaStatusScheduleSet: + return "schedule_set" + case taskMetaStatusSwitchSkipped: + return "skip_switch" + case taskMetaStatusSwitchBack: + return "switched" + default: + panic(fmt.Sprintf("unexpected metaStatus value '%d'", m)) + } +} + +func parseTaskMetaStatus(s string) (taskMetaStatus, error) { + switch s { + case "", "initialized": + return taskMetaStatusInitial, nil + case "schedule_set": + return taskMetaStatusScheduleSet, nil + case "skip_switch": + return taskMetaStatusSwitchSkipped, nil + case "switched": + return taskMetaStatusSwitchBack, nil + default: + return taskMetaStatusInitial, errors.Errorf("invalid meta status '%s'", s) + } +} + +type storedCfgs struct { + PauseCfg pdutil.ClusterConfig `json:"paused"` + RestoreCfg pdutil.ClusterConfig `json:"restore"` +} + +func (m *dbTaskMetaMgr) InitTask(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + // avoid override existing metadata if the meta is already inserted. + stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, status) values (?, ?)`, m.tableName) + err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String()) + return errors.Trace(err) +} + +func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { + pauseCtx, cancel := context.WithCancel(ctx) + conn, err := m.session.Conn(ctx) + if err != nil { + return nil, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") + if err != nil { + return nil, errors.Annotate(err, "enable pessimistic transaction failed") + } + + needSwitch := true + paused := false + var pausedCfg storedCfgs + err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status from %s FOR UPDATE", m.tableName) + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return errors.Annotate(err, "fetch task meta failed") + } + closed := false + defer func() { + if !closed { + rows.Close() + } + }() + var ( + taskID int64 + cfg string + statusValue string + ) + var cfgStr string + for rows.Next() { + if err = rows.Scan(&taskID, &cfg, &statusValue); err != nil { + return errors.Trace(err) + } + status, err := parseTaskMetaStatus(statusValue) + if err != nil { + return errors.Annotatef(err, "invalid task meta status '%s'", statusValue) + } + + if status == taskMetaStatusInitial { + continue + } + + if taskID == m.taskID { + if status >= taskMetaStatusSwitchSkipped { + needSwitch = false + return nil + } + } + + if cfg != "" { + cfgStr = cfg + break + } + } + if err = rows.Close(); err != nil { + return errors.Trace(err) + } + closed = true + + if cfgStr != "" { + err = json.Unmarshal([]byte(cfgStr), &pausedCfg) + return errors.Trace(err) + } + + orig, removed, err := m.pd.RemoveSchedulersWithOrigin(pauseCtx) + if err != nil { + return errors.Trace(err) + } + paused = true + + pausedCfg = storedCfgs{PauseCfg: removed, RestoreCfg: orig} + jsonByts, err := json.Marshal(&pausedCfg) + if err != nil { + return errors.Trace(err) + } + + query = fmt.Sprintf("update %s set pd_cfgs = ?, status = ? where task_id = ?", m.tableName) + _, err = tx.ExecContext(ctx, query, string(jsonByts), taskMetaStatusScheduleSet.String(), m.taskID) + + return errors.Annotate(err, "update task pd configs failed") + }) + if err != nil { + return nil, err + } + + if !needSwitch { + return nil, nil + } + + if !paused { + if err = m.pd.RemoveSchedulersWithCfg(pauseCtx, pausedCfg.PauseCfg); err != nil { + return nil, err + } + } + + cancelFunc := m.pd.MakeUndoFunctionByConfig(pausedCfg.RestoreCfg) + + return func(ctx context.Context) error { + // close the periodic task ctx + cancel() + return cancelFunc(ctx) + }, nil +} + +func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { + conn, err := m.session.Conn(ctx) + if err != nil { + return false, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") + if err != nil { + return false, errors.Annotate(err, "enable pessimistic transaction failed") + } + + switchBack := true + err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id, status from %s FOR UPDATE", m.tableName) + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return errors.Annotate(err, "fetch task meta failed") + } + closed := false + defer func() { + if !closed { + rows.Close() + } + }() + var ( + taskID int64 + statusValue string + ) + newStatus := taskMetaStatusSwitchBack + for rows.Next() { + if err = rows.Scan(&taskID, &statusValue); err != nil { + return errors.Trace(err) + } + status, err := parseTaskMetaStatus(statusValue) + if err != nil { + return errors.Annotatef(err, "invalid task meta status '%s'", statusValue) + } + + if taskID == m.taskID { + continue + } + + if status < taskMetaStatusSwitchSkipped { + newStatus = taskMetaStatusSwitchSkipped + switchBack = false + break + } + } + if err = rows.Close(); err != nil { + return errors.Trace(err) + } + closed = true + + query = fmt.Sprintf("update %s set status = ? where task_id = ?", m.tableName) + _, err = tx.ExecContext(ctx, query, newStatus.String(), m.taskID) + + return errors.Trace(err) + }) + + return switchBack, err +} + +func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + // avoid override existing metadata if the meta is already inserted. + stmt := fmt.Sprintf("DROP TABLE %s;", m.tableName) + if err := exec.Exec(ctx, "cleanup task meta tables", stmt); err != nil { + return errors.Trace(err) + } + return nil +} + +func (m *dbTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + + // check if all tables are finished + query := fmt.Sprintf("SELECT COUNT(*) from %s", common.UniqueTable(m.schemaName, tableMetaTableName)) + var cnt int + if err := exec.QueryRow(ctx, "fetch table meta row count", query, &cnt); err != nil { + return errors.Trace(err) + } + if cnt > 0 { + log.L().Warn("there are unfinished table in table meta table, cleanup skipped.") + return nil + } + + // avoid override existing metadata if the meta is already inserted. + stmt := fmt.Sprintf("DROP DATABASE %s;", common.EscapeIdentifier(m.schemaName)) + if err := exec.Exec(ctx, "cleanup task meta tables", stmt); err != nil { + return errors.Trace(err) + } + return nil +} + +type noopMetaMgrBuilder struct{} + +func (b noopMetaMgrBuilder) Init(ctx context.Context) error { + return nil +} + +func (b noopMetaMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { + return noopTaskMetaMgr{} +} + +func (b noopMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { + return noopTableMetaMgr{} +} + +type noopTaskMetaMgr struct{} + +func (m noopTaskMetaMgr) InitTask(ctx context.Context) error { + return nil +} + +func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { + return func(ctx context.Context) error { + return nil + }, nil +} + +func (m noopTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { + return false, nil +} + +func (m noopTaskMetaMgr) Cleanup(ctx context.Context) error { + return nil +} + +func (m noopTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { + return nil +} + +type noopTableMetaMgr struct{} + +func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { + return nil +} + +func (m noopTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64) (*verify.KVChecksum, int64, error) { + return nil, 0, nil +} + +func (m noopTableMetaMgr) UpdateTableStatus(ctx context.Context, status metaStatus) error { + return nil +} + +func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum *verify.KVChecksum) error { + return nil +} + +func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum) (bool, *verify.KVChecksum, error) { + return false, nil, nil +} + +func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { + return nil +} diff --git a/pkg/lightning/restore/meta_manager_test.go b/pkg/lightning/restore/meta_manager_test.go new file mode 100644 index 000000000..bf2fcba38 --- /dev/null +++ b/pkg/lightning/restore/meta_manager_test.go @@ -0,0 +1,244 @@ +// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "context" + "database/sql" + "database/sql/driver" + + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/ddl" + tmock "github.com/pingcap/tidb/util/mock" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/lightning/checkpoints" + "github.com/pingcap/br/pkg/lightning/common" + "github.com/pingcap/br/pkg/lightning/log" + "github.com/pingcap/br/pkg/lightning/verification" +) + +var _ = Suite(&metaMgrSuite{}) + +type metaMgrSuite struct { + dbHandle *sql.DB + mockDB sqlmock.Sqlmock + tr *TableRestore + mgr *dbTableMetaMgr + checksumMgr *testChecksumMgr +} + +func (s *metaMgrSuite) SetUpSuite(c *C) { + p := parser.New() + se := tmock.NewContext() + + node, err := p.ParseOneStmt("CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", "utf8mb4", "utf8mb4_bin") + c.Assert(err, IsNil) + tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), int64(1)) + c.Assert(err, IsNil) + tableInfo.State = model.StatePublic + + schema := "test" + tb := "t1" + ti := &checkpoints.TidbTableInfo{ + ID: tableInfo.ID, + DB: schema, + Name: tb, + Core: tableInfo, + } + + tableName := common.UniqueTable(schema, tb) + logger := log.With(zap.String("table", tableName)) + s.tr = &TableRestore{ + tableName: tableName, + tableInfo: ti, + logger: logger, + } +} + +func (s *metaMgrSuite) SetUpTest(c *C) { + db, m, err := sqlmock.New() + c.Assert(err, IsNil) + + s.mgr = &dbTableMetaMgr{ + session: db, + taskID: 1, + tr: s.tr, + tableName: common.UniqueTable("test", tableMetaTableName), + } + s.mockDB = m + s.checksumMgr = &testChecksumMgr{} +} + +func (s *metaMgrSuite) TearDownTest(c *C) { + c.Assert(s.mockDB.ExpectationsWereMet(), IsNil) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsSingleTable(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(0), int64(0), uint64(0), uint64(0), uint64(0), "initialized"}, + } + nextID := int64(1) + updateArgs := []driver.Value{int64(0), int64(10), "restore", int64(1), int64(1)} + s.prepareMock(rows, &nextID, updateArgs, nil, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(0)) + c.Assert(ck, IsNil) + c.Assert(s.checksumMgr.callCnt, Equals, 0) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsSingleTableAutoIDNot0(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(0), int64(0), uint64(0), uint64(0), uint64(0), "initialized"}, + } + nextID := int64(999) + updateArgs := []driver.Value{int64(998), int64(1008), "allocated", int64(1), int64(1)} + newStatus := "restore" + s.prepareMock(rows, &nextID, updateArgs, nil, &newStatus) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(998)) + c.Assert(ck, IsNil) + c.Assert(s.checksumMgr.callCnt, Equals, 1) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsSingleTableContainsData(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(0), int64(0), uint64(0), uint64(0), uint64(0), "initialized"}, + } + nextID := int64(999) + checksum := verification.MakeKVChecksum(1, 2, 3) + updateArgs := []driver.Value{int64(998), int64(1008), "allocated", int64(1), int64(1)} + s.prepareMock(rows, &nextID, updateArgs, &checksum, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(998)) + c.Assert(ck, DeepEquals, &checksum) + c.Assert(s.checksumMgr.callCnt, Equals, 1) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsAllocated(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(998), int64(1008), uint64(0), uint64(0), uint64(0), metaStatusRowIDAllocated.String()}, + } + checksum := verification.MakeKVChecksum(2, 1, 3) + s.prepareMock(rows, nil, nil, &checksum, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(998)) + c.Assert(ck, DeepEquals, &checksum) + c.Assert(s.checksumMgr.callCnt, Equals, 1) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsFinished(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(998), int64(1008), uint64(1), uint64(2), uint64(3), metaStatusRestoreStarted.String()}, + } + checksum := verification.MakeKVChecksum(2, 1, 3) + s.prepareMock(rows, nil, nil, nil, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(998)) + c.Assert(ck, DeepEquals, &checksum) + c.Assert(s.checksumMgr.callCnt, Equals, 0) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsMultiTasksInit(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(0), int64(0), uint64(0), uint64(0), uint64(0), "initialized"}, + {int64(2), int64(0), int64(0), uint64(0), uint64(0), uint64(0), "initialized"}, + } + nextID := int64(1) + updateArgs := []driver.Value{int64(0), int64(10), "restore", int64(1), int64(1)} + s.prepareMock(rows, &nextID, updateArgs, nil, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(0)) + c.Assert(ck, IsNil) + c.Assert(s.checksumMgr.callCnt, Equals, 0) +} + +func (s *metaMgrSuite) TestAllocTableRowIDsMultiTasksAllocated(c *C) { + ctx := context.WithValue(context.Background(), &checksumManagerKey, s.checksumMgr) + + rows := [][]driver.Value{ + {int64(1), int64(0), int64(0), uint64(0), uint64(0), uint64(0), metaStatusInitial.String()}, + {int64(2), int64(0), int64(100), uint64(0), uint64(0), uint64(0), metaStatusRowIDAllocated.String()}, + } + updateArgs := []driver.Value{int64(100), int64(110), "restore", int64(1), int64(1)} + s.prepareMock(rows, nil, updateArgs, nil, nil) + + ck, rowIDBase, err := s.mgr.AllocTableRowIDs(ctx, 10) + c.Assert(err, IsNil) + c.Assert(rowIDBase, Equals, int64(100)) + c.Assert(ck, IsNil) + c.Assert(s.checksumMgr.callCnt, Equals, 0) +} + +func (s *metaMgrSuite) prepareMock(rowsVal [][]driver.Value, nextRowID *int64, updateArgs []driver.Value, checksum *verification.KVChecksum, updateStatus *string) { + s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';"). + WillReturnResult(sqlmock.NewResult(int64(0), int64(0))) + + s.mockDB.ExpectBegin() + + rows := sqlmock.NewRows([]string{"task_id", "row_id_base", "row_id_max", "total_kvs_base", "total_bytes_base", "checksum_base", "status"}) + for _, r := range rowsVal { + rows = rows.AddRow(r...) + } + s.mockDB.ExpectQuery("\\QSELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E"). + WithArgs(int64(1)). + WillReturnRows(rows) + if nextRowID != nil { + s.mockDB.ExpectQuery("SHOW TABLE `test`.`t1` NEXT_ROW_ID"). + WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). + AddRow("test", "t1", "_tidb_rowid", *nextRowID, "AUTO_INCREMENT")) + } + + if len(updateArgs) > 0 { + s.mockDB.ExpectExec("\\Qupdate `test`.`table_meta` set row_id_base = ?, row_id_max = ?, status = ? where table_id = ? and task_id = ?\\E"). + WithArgs(updateArgs...). + WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) + } + + s.mockDB.ExpectCommit() + + if checksum != nil { + s.mockDB.ExpectExec("\\Qupdate `test`.`table_meta` set total_kvs_base = ?, total_bytes_base = ?, checksum_base = ?, status = ? where table_id = ? and task_id = ?\\E"). + WithArgs(checksum.SumKVS(), checksum.SumSize(), checksum.Sum(), metaStatusRestoreStarted.String(), int64(1), int64(1)). + WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) + s.checksumMgr.checksum = RemoteChecksum{ + TotalBytes: checksum.SumSize(), + TotalKVs: checksum.SumKVS(), + Checksum: checksum.Sum(), + } + } + + if updateStatus != nil { + s.mockDB.ExpectExec("\\Qupdate `test`.`table_meta` set status = ? where table_id = ? and task_id = ?\\E"). + WithArgs(*updateStatus, int64(1), int64(1)). + WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) + } +} diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index 5f15fd862..b83963f62 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/utils" + "github.com/pingcap/br/pkg/version" "github.com/pingcap/br/pkg/version/build" ) @@ -81,6 +82,32 @@ const ( ) const ( + taskMetaTableName = "task_meta" + tableMetaTableName = "table_meta" + // CreateTableMetadataTable stores the per-table sub jobs information used by TiDB Lightning + CreateTableMetadataTable = `CREATE TABLE IF NOT EXISTS %s ( + task_id BIGINT(20) UNSIGNED, + table_id BIGINT(64) NOT NULL, + table_name VARCHAR(64) NOT NULL, + row_id_base BIGINT(20) NOT NULL DEFAULT 0, + row_id_max BIGINT(20) NOT NULL DEFAULT 0, + total_kvs_base BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + total_bytes_base BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + checksum_base BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + total_kvs BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + total_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + checksum BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + status VARCHAR(32) NOT NULL, + PRIMARY KEY (table_id, task_id) + );` + // CreateTaskMetaTable stores the pre-lightning metadata used by TiDB Lightning + CreateTaskMetaTable = `CREATE TABLE IF NOT EXISTS %s ( + task_id BIGINT(20) UNSIGNED NOT NULL, + pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '', + status VARCHAR(32) NOT NULL, + PRIMARY KEY (task_id) + );` + compactionLowerThreshold = 512 * units.MiB compactionUpperThreshold = 32 * units.GiB ) @@ -173,6 +200,7 @@ type Controller struct { closedEngineLimit *worker.Pool store storage.ExternalStorage + metaMgrBuilder metaMgrBuilder diskQuotaLock sync.RWMutex diskQuotaState int32 @@ -217,6 +245,10 @@ func NewRestoreControllerWithPauser( if err := verifyCheckpoint(cfg, taskCp); err != nil { return nil, errors.Trace(err) } + // reuse task id to reuse task meta correctly. + if taskCp != nil { + cfg.TaskID = taskCp.TaskID + } var backend backend.Backend switch cfg.TikvImporter.Backend { @@ -272,6 +304,23 @@ func NewRestoreControllerWithPauser( ts = oracle.ComposeTS(physical, logical) } + var metaBuilder metaMgrBuilder + switch cfg.TikvImporter.Backend { + case config.BackendLocal, config.BackendImporter: + // TODO: support Lightning via SQL + db, err := g.GetDB() + if err != nil { + return nil, errors.Trace(err) + } + metaBuilder = &dbMetaMgrBuilder{ + db: db, + taskID: cfg.TaskID, + schema: cfg.App.MetaSchemaName, + } + default: + metaBuilder = noopMetaMgrBuilder{} + } + rc := &Controller{ cfg: cfg, dbMetas: dbMetas, @@ -291,8 +340,9 @@ func NewRestoreControllerWithPauser( saveCpCh: make(chan saveCp), closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), - store: s, - ts: ts, + store: s, + ts: ts, + metaMgrBuilder: metaBuilder, } return rc, nil @@ -639,31 +689,6 @@ func (rc *Controller) restoreSchema(ctx context.Context) error { } rc.dbInfos = dbInfos - if rc.cfg.TikvImporter.Backend != config.BackendTiDB { - for _, dbMeta := range rc.dbMetas { - for _, tableMeta := range dbMeta.Tables { - tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name) - - // if checkpoint enable and not missing, we skip the check table empty progress. - if rc.cfg.Checkpoint.Enable { - _, err := rc.checkpointsDB.Get(ctx, tableName) - switch { - case err == nil: - continue - case errors.IsNotFound(err): - default: - return err - } - } - - err := rc.checkTableEmpty(ctx, tableName) - if err != nil { - return err - } - } - } - } - // Load new checkpoints err = rc.checkpointsDB.Initialize(ctx, rc.cfg, dbInfos) if err != nil { @@ -925,144 +950,173 @@ func (rc *Controller) listenCheckpointUpdates() { rc.checkpointsWg.Done() } -func (rc *Controller) runPeriodicActions(ctx context.Context, stop <-chan struct{}) { +// buildRunPeriodicActionAndCancelFunc build the runPeriodicAction func and a cancel func +func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, stop <-chan struct{}) (func(), func(bool)) { + cancelFuncs := make([]func(bool), 0) + closeFuncs := make([]func(), 0) // a nil channel blocks forever. // if the cron duration is zero we use the nil channel to skip the action. var logProgressChan <-chan time.Time if rc.cfg.Cron.LogProgress.Duration > 0 { logProgressTicker := time.NewTicker(rc.cfg.Cron.LogProgress.Duration) - defer logProgressTicker.Stop() + closeFuncs = append(closeFuncs, func() { + logProgressTicker.Stop() + }) logProgressChan = logProgressTicker.C } glueProgressTicker := time.NewTicker(3 * time.Second) - defer glueProgressTicker.Stop() + closeFuncs = append(closeFuncs, func() { + glueProgressTicker.Stop() + }) var switchModeChan <-chan time.Time // tidb backend don't need to switch tikv to import mode if rc.cfg.TikvImporter.Backend != config.BackendTiDB && rc.cfg.Cron.SwitchMode.Duration > 0 { switchModeTicker := time.NewTicker(rc.cfg.Cron.SwitchMode.Duration) - defer switchModeTicker.Stop() + cancelFuncs = append(cancelFuncs, func(bool) { switchModeTicker.Stop() }) + cancelFuncs = append(cancelFuncs, func(do bool) { + if do { + log.L().Info("switch to normal mode") + if err := rc.switchToNormalMode(ctx); err != nil { + log.L().Warn("switch tikv to normal mode failed", zap.Error(err)) + } + } + }) switchModeChan = switchModeTicker.C - - rc.switchToImportMode(ctx) } var checkQuotaChan <-chan time.Time // only local storage has disk quota concern. if rc.cfg.TikvImporter.Backend == config.BackendLocal && rc.cfg.Cron.CheckDiskQuota.Duration > 0 { checkQuotaTicker := time.NewTicker(rc.cfg.Cron.CheckDiskQuota.Duration) - defer checkQuotaTicker.Stop() + cancelFuncs = append(cancelFuncs, func(bool) { checkQuotaTicker.Stop() }) checkQuotaChan = checkQuotaTicker.C } - start := time.Now() - for { - select { - case <-ctx.Done(): - log.L().Warn("stopping periodic actions", log.ShortError(ctx.Err())) - return - case <-stop: - log.L().Info("everything imported, stopping periodic actions") - return - - case <-switchModeChan: - // periodically switch to import mode, as requested by TiKV 3.0 - rc.switchToImportMode(ctx) - - case <-logProgressChan: - // log the current progress periodically, so OPS will know that we're still working - nanoseconds := float64(time.Since(start).Nanoseconds()) - // the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate - // before the last table start, so use the bigger of the two should be a workaround - estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated)) - pending := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending)) - if estimated < pending { - estimated = pending - } - finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished)) - totalTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess)) - completedTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStateCompleted, metric.TableResultSuccess)) - bytesRead := metric.ReadHistogramSum(metric.RowReadBytesHistogram) - engineEstimated := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess)) - enginePending := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStatePending, metric.TableResultSuccess)) - if engineEstimated < enginePending { - engineEstimated = enginePending - } - engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess)) - bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten)) - bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported)) - - var state string - var remaining zap.Field - switch { - case finished >= estimated: - if engineFinished < engineEstimated { - state = "importing" - } else { - state = "post-processing" - } - case finished > 0: - state = "writing" - default: - state = "preparing" - } - - // since we can't accurately estimate the extra time cost by import after all writing are finished, - // so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total - // progress. - remaining = zap.Skip() - totalPercent := 0.0 - if finished > 0 { - writePercent := math.Min(finished/estimated, 1.0) - importPercent := 1.0 - if bytesWritten > 0 { - totalBytes := bytesWritten / writePercent - importPercent = math.Min(bytesImported/totalBytes, 1.0) - } - totalPercent = writePercent*0.8 + importPercent*0.2 - if totalPercent < 1.0 { - remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds - remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second)) + return func() { + defer func() { + for _, f := range closeFuncs { + f() } + }() + // tidb backend don't need to switch tikv to import mode + if rc.cfg.TikvImporter.Backend != config.BackendTiDB && rc.cfg.Cron.SwitchMode.Duration > 0 { + rc.switchToImportMode(ctx) } + start := time.Now() + for { + select { + case <-ctx.Done(): + log.L().Warn("stopping periodic actions", log.ShortError(ctx.Err())) + return + case <-stop: + log.L().Info("everything imported, stopping periodic actions") + return - formatPercent := func(finish, estimate float64) string { - speed := "" - if estimated > 0 { - speed = fmt.Sprintf(" (%.1f%%)", finish/estimate*100) - } - return speed - } + case <-switchModeChan: + // periodically switch to import mode, as requested by TiKV 3.0 + rc.switchToImportMode(ctx) + + case <-logProgressChan: + // log the current progress periodically, so OPS will know that we're still working + nanoseconds := float64(time.Since(start).Nanoseconds()) + // the estimated chunk is not accurate(likely under estimated), but the actual count is not accurate + // before the last table start, so use the bigger of the two should be a workaround + estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated)) + pending := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending)) + if estimated < pending { + estimated = pending + } + finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished)) + totalTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess)) + completedTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStateCompleted, metric.TableResultSuccess)) + bytesRead := metric.ReadHistogramSum(metric.RowReadBytesHistogram) + engineEstimated := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStateEstimated, metric.TableResultSuccess)) + enginePending := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.ChunkStatePending, metric.TableResultSuccess)) + if engineEstimated < enginePending { + engineEstimated = enginePending + } + engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues(metric.TableStateImported, metric.TableResultSuccess)) + bytesWritten := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateWritten)) + bytesImported := metric.ReadCounter(metric.BytesCounter.WithLabelValues(metric.TableStateImported)) - // avoid output bytes speed if there are no unfinished chunks - chunkSpeed := zap.Skip() - if bytesRead > 0 { - chunkSpeed = zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)) - } + var state string + var remaining zap.Field + switch { + case finished >= estimated: + if engineFinished < engineEstimated { + state = "importing" + } else { + state = "post-processing" + } + case finished > 0: + state = "writing" + default: + state = "preparing" + } - // Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour. - log.L().Info("progress", - zap.String("total", fmt.Sprintf("%.1f%%", totalPercent*100)), - // zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)), - zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))), - zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))), - zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))), - chunkSpeed, - zap.String("state", state), - remaining, - ) + // since we can't accurately estimate the extra time cost by import after all writing are finished, + // so here we use estimatedWritingProgress * 0.8 + estimatedImportingProgress * 0.2 as the total + // progress. + remaining = zap.Skip() + totalPercent := 0.0 + if finished > 0 { + writePercent := math.Min(finished/estimated, 1.0) + importPercent := 1.0 + if bytesWritten > 0 { + totalBytes := bytesWritten / writePercent + importPercent = math.Min(bytesImported/totalBytes, 1.0) + } + totalPercent = writePercent*0.8 + importPercent*0.2 + if totalPercent < 1.0 { + remainNanoseconds := (1.0 - totalPercent) / totalPercent * nanoseconds + remaining = zap.Duration("remaining", time.Duration(remainNanoseconds).Round(time.Second)) + } + } - case <-checkQuotaChan: - // verify the total space occupied by sorted-kv-dir is below the quota, - // otherwise we perform an emergency import. - rc.enforceDiskQuota(ctx) + formatPercent := func(finish, estimate float64) string { + speed := "" + if estimated > 0 { + speed = fmt.Sprintf(" (%.1f%%)", finish/estimate*100) + } + return speed + } + + // avoid output bytes speed if there are no unfinished chunks + chunkSpeed := zap.Skip() + if bytesRead > 0 { + chunkSpeed = zap.Float64("speed(MiB/s)", bytesRead/(1048576e-9*nanoseconds)) + } - case <-glueProgressTicker.C: - finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished)) - rc.tidbGlue.Record(glue.RecordFinishedChunk, uint64(finished)) + // Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour. + log.L().Info("progress", + zap.String("total", fmt.Sprintf("%.1f%%", totalPercent*100)), + // zap.String("files", fmt.Sprintf("%.0f/%.0f (%.1f%%)", finished, estimated, finished/estimated*100)), + zap.String("tables", fmt.Sprintf("%.0f/%.0f%s", completedTables, totalTables, formatPercent(completedTables, totalTables))), + zap.String("chunks", fmt.Sprintf("%.0f/%.0f%s", finished, estimated, formatPercent(finished, estimated))), + zap.String("engines", fmt.Sprintf("%.f/%.f%s", engineFinished, engineEstimated, formatPercent(engineFinished, engineEstimated))), + chunkSpeed, + zap.String("state", state), + remaining, + ) + + case <-checkQuotaChan: + // verify the total space occupied by sorted-kv-dir is below the quota, + // otherwise we perform an emergency import. + rc.enforceDiskQuota(ctx) + + case <-glueProgressTicker.C: + finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished)) + rc.tidbGlue.Record(glue.RecordFinishedChunk, uint64(finished)) + } + } + }, func(do bool) { + log.L().Info("cancel periodic actions", zap.Bool("do", do)) + for _, f := range cancelFuncs { + f(do) + } } - } } var checksumManagerKey struct{} @@ -1070,10 +1124,19 @@ var checksumManagerKey struct{} func (rc *Controller) restoreTables(ctx context.Context) error { logTask := log.L().Begin(zap.InfoLevel, "restore all tables data") + if err := rc.metaMgrBuilder.Init(ctx); err != nil { + return err + } + // for local backend, we should disable some pd scheduler and change some settings, to // make split region and ingest sst more stable // because importer backend is mostly use for v3.x cluster which doesn't support these api, // so we also don't do this for import backend + finishSchedulers := func() {} + // if one lightning failed abnormally, and can't determine whether it needs to switch back, + // we do not do switch back automatically + cleanupFunc := func() {} + switchBack := false if rc.cfg.TikvImporter.Backend == config.BackendLocal { // disable some pd schedulers pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, @@ -1081,20 +1144,56 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if err != nil { return errors.Trace(err) } + + mgr := rc.metaMgrBuilder.TaskMetaMgr(pdController) + if err = mgr.InitTask(ctx); err != nil { + return err + } + logTask.Info("removing PD leader®ion schedulers") - restoreFn, e := pdController.RemoveSchedulers(ctx) - defer func() { - // use context.Background to make sure this restore function can still be executed even if ctx is canceled - if restoreE := restoreFn(context.Background()); restoreE != nil { - logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - return + + restoreFn, err := mgr.CheckAndPausePdSchedulers(ctx) + finishSchedulers = func() { + if restoreFn != nil { + // use context.Background to make sure this restore function can still be executed even if ctx is canceled + restoreCtx := context.Background() + needSwitchBack, err := mgr.CheckAndFinishRestore(restoreCtx) + if err != nil { + logTask.Warn("check restore pd schedulers failed", zap.Error(err)) + return + } + switchBack = needSwitchBack + if needSwitchBack { + if restoreE := restoreFn(restoreCtx); restoreE != nil { + logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + // clean up task metas + if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil { + logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) + } + // cleanup table meta and schema db if needed. + cleanupFunc = func() { + if e := mgr.CleanupAllMetas(restoreCtx); err != nil { + logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) + } + } + } + + logTask.Info("add back PD leader®ion schedulers") } - logTask.Info("add back PD leader®ion schedulers") - }() - if e != nil { + + pdController.Close() + } + + if err != nil { return errors.Trace(err) } } + defer func() { + if switchBack { + cleanupFunc() + } + }() type task struct { tr *TableRestore @@ -1111,7 +1210,18 @@ func (rc *Controller) restoreTables(ctx context.Context) error { var restoreErr common.OnceError stopPeriodicActions := make(chan struct{}) - go rc.runPeriodicActions(ctx, stopPeriodicActions) + + periodicActions, cancelFunc := rc.buildRunPeriodicActionAndCancelFunc(ctx, stopPeriodicActions) + go periodicActions() + finishFuncCalled := false + defer func() { + if !finishFuncCalled { + finishSchedulers() + cancelFunc(switchBack) + finishFuncCalled = true + } + }() + defer close(stopPeriodicActions) taskCh := make(chan task, rc.cfg.App.IndexConcurrency) @@ -1258,17 +1368,24 @@ func (rc *Controller) restoreTables(ctx context.Context) error { default: } + // stop periodic tasks for restore table such as pd schedulers and switch-mode tasks. + // this can help make cluster switching back to normal state more quickly. + // finishSchedulers() + // cancelFunc(switchBack) + // finishFuncCalled = true + close(postProcessTaskChan) // otherwise, we should run all tasks in the post-process task chan for i := 0; i < rc.cfg.App.TableConcurrency; i++ { wg.Add(1) go func() { + defer wg.Done() for task := range postProcessTaskChan { + metaMgr := rc.metaMgrBuilder.TableMetaMgr(task.tr) // force all the remain post-process tasks to be executed - _, err := task.tr.postProcess(ctx2, rc, task.cp, true) + _, err = task.tr.postProcess(ctx2, rc, task.cp, true, metaMgr) restoreErr.Set(err) } - wg.Done() }() } wg.Wait() @@ -1291,6 +1408,7 @@ func (tr *TableRestore) restoreTable( default: } + metaMgr := rc.metaMgrBuilder.TableMetaMgr(tr) // no need to do anything if the chunks are already populated if len(cp.Engines) > 0 { tr.logger.Info("reusing engines and files info from checkpoint", @@ -1298,9 +1416,55 @@ func (tr *TableRestore) restoreTable( zap.Int("filesCnt", cp.CountChunks()), ) } else if cp.Status < checkpoints.CheckpointStatusAllWritten { + versionStr, err := rc.tidbGlue.GetSQLExecutor().ObtainStringWithLog( + ctx, "SELECT version()", "fetch tidb version", log.L()) + if err != nil { + return false, errors.Trace(err) + } + + tidbVersion, err := version.ExtractTiDBVersion(versionStr) + if err != nil { + return false, errors.Trace(err) + } + if err := tr.populateChunks(ctx, rc, cp); err != nil { return false, errors.Trace(err) } + + // fetch the max chunk row_id max value as the global max row_id + rowIDMax := int64(0) + for _, engine := range cp.Engines { + if len(engine.Chunks) > 0 && engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax > rowIDMax { + rowIDMax = engine.Chunks[len(engine.Chunks)-1].Chunk.RowIDMax + } + } + + // "show table next_row_id" is only available after v4.0.0 + if tidbVersion.Major >= 4 && (rc.cfg.TikvImporter.Backend == config.BackendLocal || rc.cfg.TikvImporter.Backend == config.BackendImporter) { + // first, insert a new-line into meta table + if err = metaMgr.InitTableMeta(ctx); err != nil { + return false, err + } + + checksum, rowIDBase, err := metaMgr.AllocTableRowIDs(ctx, rowIDMax) + if err != nil { + return false, err + } + tr.RebaseChunkRowIDs(cp, rowIDBase) + + if checksum != nil { + if cp.Checksum != *checksum { + cp.Checksum = *checksum + rc.saveCpCh <- saveCp{ + tableName: tr.tableName, + merger: &checkpoints.TableChecksumMerger{ + Checksum: cp.Checksum, + }, + } + } + tr.logger.Info("checksum before restore table", zap.Object("checksum", &cp.Checksum)) + } + } if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, tr.tableName, cp.Engines); err != nil { return false, errors.Trace(err) } @@ -1332,8 +1496,13 @@ func (tr *TableRestore) restoreTable( return false, errors.Trace(err) } + err = metaMgr.UpdateTableStatus(ctx, metaStatusRestoreFinished) + if err != nil { + return false, errors.Trace(err) + } + // 3. Post-process. With the last parameter set to false, we can allow delay analyze execute latter - return tr.postProcess(ctx, rc, cp, false /* force-analyze */) + return tr.postProcess(ctx, rc, cp, false /* force-analyze */, metaMgr) } // estimate SST files compression threshold by total row file size @@ -1489,10 +1658,6 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp return } - failpoint.Inject("FailBeforeDataEngineImported", func() { - panic("forcing failure due to FailBeforeDataEngineImported") - }) - dataWorker := rc.closedEngineLimit.Apply() defer rc.closedEngineLimit.Recycle(dataWorker) if err := tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp); err != nil { @@ -1764,6 +1929,7 @@ func (tr *TableRestore) postProcess( rc *Controller, cp *checkpoints.TableCheckpoint, forcePostProcess bool, + metaMgr tableMetaMgr, ) (bool, error) { // there are no data in this table, no need to do post process // this is important for tables that are just the dump table of views @@ -1817,8 +1983,21 @@ func (tr *TableRestore) postProcess( } else { if forcePostProcess || !rc.cfg.PostRestore.PostProcessAtLast { tr.logger.Info("local checksum", zap.Object("checksum", &localChecksum)) - err := tr.compareChecksum(ctx, localChecksum) + needChecksum, baseTotalChecksum, err := metaMgr.CheckAndUpdateLocalChecksum(ctx, &localChecksum) + if err != nil { + return false, err + } + if !needChecksum { + return false, nil + } + if cp.Checksum.SumKVS() > 0 || baseTotalChecksum.SumKVS() > 0 { + localChecksum.Add(&cp.Checksum) + localChecksum.Add(baseTotalChecksum) + tr.logger.Info("merged local checksum", zap.Object("checksum", &localChecksum)) + } + + err = tr.compareChecksum(ctx, localChecksum) // with post restore level 'optional', we will skip checksum error if rc.cfg.PostRestore.Checksum == config.OpLevelOptional { if err != nil { @@ -1826,10 +2005,15 @@ func (tr *TableRestore) postProcess( err = nil } } + if err == nil { + err = metaMgr.FinishTable(ctx) + } + rc.saveStatusCheckpoint(tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusChecksummed) if err != nil { return false, errors.Trace(err) } + cp.Status = checkpoints.CheckpointStatusChecksummed } else { finished = false @@ -2036,6 +2220,7 @@ func (rc *Controller) setGlobalVariables(ctx context.Context) error { // we should enable/disable new collation here since in server mode, tidb config // may be different in different tasks collate.SetNewCollationEnabledForTest(enabled) + return nil } @@ -2220,6 +2405,18 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * return err } +func (t *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { + if rowIDBase == 0 { + return + } + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + chunk.Chunk.PrevRowIDMax += rowIDBase + chunk.Chunk.RowIDMax += rowIDBase + } + } +} + // initializeColumns computes the "column permutation" for an INSERT INTO // statement. Suppose a table has columns (a, b, c, d) in canonical order, and // we execute `INSERT INTO (d, b, a) VALUES ...`, we will need to remap the diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index c1730654e..49e3a1d4a 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -863,6 +863,7 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics(c *C) { c.Assert(err, IsNil) cpDB := checkpoints.NewNullCheckpointsDB() + g := mock.NewMockGlue(controller) rc := &Controller{ cfg: cfg, dbMetas: []*mydump.MDDatabaseMeta{ @@ -882,17 +883,22 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics(c *C) { saveCpCh: chptCh, pauser: DeliverPauser, backend: noop.NewNoopBackend(), - tidbGlue: mock.NewMockGlue(controller), + tidbGlue: g, errorSummaries: makeErrorSummaries(log.L()), tls: tls, checkpointsDB: cpDB, closedEngineLimit: worker.NewPool(ctx, 1, "closed_engine"), store: s.store, + metaMgrBuilder: noopMetaMgrBuilder{}, } go func() { for range chptCh { } }() + exec := mock.NewMockSQLExecutor(controller) + g.EXPECT().GetSQLExecutor().Return(exec).AnyTimes() + exec.EXPECT().ObtainStringWithLog(gomock.Any(), "SELECT version()", gomock.Any(), gomock.Any()). + Return("5.7.25-TiDB-v5.0.1", nil).AnyTimes() web.BroadcastInitProgress(rc.dbMetas) @@ -1486,3 +1492,13 @@ func (s *restoreSchemaSuite) TestRestoreSchemaContextCancel(c *C) { c.Assert(err, NotNil) c.Assert(err, Equals, childCtx.Err()) } + +type testChecksumMgr struct { + checksum RemoteChecksum + callCnt int +} + +func (t *testChecksumMgr) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { + t.callCnt++ + return &t.checksum, nil +} diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 104c6e998..63276804b 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -75,13 +75,13 @@ func constConfigGeneratorBuilder(val interface{}) pauseConfigGenerator { } } -// clusterConfig represents a set of scheduler whose config have been modified +// ClusterConfig represents a set of scheduler whose config have been modified // along with their original config. -type clusterConfig struct { +type ClusterConfig struct { // Enable PD schedulers before restore - scheduler []string + Schedulers []string `json:"schedulers"` // Original scheudle configuration - scheduleCfg map[string]interface{} + ScheduleCfg map[string]interface{} `json:"schedule_cfg"` } type pauseSchedulerBody struct { @@ -527,14 +527,14 @@ func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interf return p.doUpdatePDScheduleConfig(ctx, cfg, post, prefix) } -func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { - if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { +func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg ClusterConfig) error { + if err := pd.ResumeSchedulers(ctx, clusterCfg.Schedulers); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } - log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg)) + log.Info("restoring config", zap.Any("config", clusterCfg.ScheduleCfg)) mergeCfg := make(map[string]interface{}) for cfgKey := range expectPDCfg { - value := clusterCfg.scheduleCfg[cfgKey] + value := clusterCfg.ScheduleCfg[cfgKey] if value == nil { // Ignore non-exist config. continue @@ -554,7 +554,8 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster return nil } -func (p *PdController) makeUndoFunctionByConfig(config clusterConfig) UndoFunc { +// MakeUndoFunctionByConfig return an UndoFunc based on specified ClusterConfig +func (p *PdController) MakeUndoFunctionByConfig(config ClusterConfig) UndoFunc { restore := func(ctx context.Context) error { return restoreSchedulers(ctx, p, config) } @@ -563,22 +564,38 @@ func (p *PdController) makeUndoFunctionByConfig(config clusterConfig) UndoFunc { // RemoveSchedulers removes the schedulers that may slow down BR speed. func (p *PdController) RemoveSchedulers(ctx context.Context) (undo UndoFunc, err error) { + undo = Nop + + origin, _, err1 := p.RemoveSchedulersWithOrigin(ctx) + if err1 != nil { + err = err1 + return + } + + undo = p.MakeUndoFunctionByConfig(ClusterConfig{Schedulers: origin.Schedulers, ScheduleCfg: origin.ScheduleCfg}) + return undo, errors.Trace(err) +} + +// RemoveSchedulersWithOrigin pause and remove br related schedule configs and return the origin and modified configs +func (p *PdController) RemoveSchedulersWithOrigin(ctx context.Context) (ClusterConfig, ClusterConfig, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("PdController.RemoveSchedulers", opentracing.ChildOf(span.Context())) defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } - undo = Nop + originCfg := ClusterConfig{} + removedCfg := ClusterConfig{} stores, err := p.pdClient.GetAllStores(ctx) if err != nil { - return + return originCfg, removedCfg, err } scheduleCfg, err := p.GetPDScheduleConfig(ctx) if err != nil { - return + return originCfg, removedCfg, err } - disablePDCfg := make(map[string]interface{}) + disablePDCfg := make(map[string]interface{}, len(expectPDCfg)) + originPDCfg := make(map[string]interface{}, len(expectPDCfg)) for cfgKey, cfgValFunc := range expectPDCfg { value, ok := scheduleCfg[cfgKey] if !ok { @@ -586,14 +603,17 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo UndoFunc, err continue } disablePDCfg[cfgKey] = cfgValFunc(len(stores), value) + originPDCfg[cfgKey] = value } - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduleCfg: scheduleCfg}) + originCfg.ScheduleCfg = originPDCfg + removedCfg.ScheduleCfg = disablePDCfg + log.Debug("saved PD config", zap.Any("config", scheduleCfg)) // Remove default PD scheduler that may affect restore process. existSchedulers, err := p.ListSchedulers(ctx) if err != nil { - return + return originCfg, removedCfg, err } needRemoveSchedulers := make([]string, 0, len(existSchedulers)) for _, s := range existSchedulers { @@ -602,7 +622,30 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo UndoFunc, err } } + removedSchedulers, err := p.doRemoveSchedulersWith(ctx, needRemoveSchedulers, disablePDCfg) + if err != nil { + return originCfg, removedCfg, err + } + + originCfg.Schedulers = removedSchedulers + removedCfg.Schedulers = removedSchedulers + + return originCfg, removedCfg, nil +} + +// RemoveSchedulersWithCfg removes pd schedulers and configs with specified ClusterConfig +func (p *PdController) RemoveSchedulersWithCfg(ctx context.Context, removeCfg ClusterConfig) error { + _, err := p.doRemoveSchedulersWith(ctx, removeCfg.Schedulers, removeCfg.ScheduleCfg) + return err +} + +func (p *PdController) doRemoveSchedulersWith( + ctx context.Context, + needRemoveSchedulers []string, + disablePDCfg map[string]interface{}, +) ([]string, error) { var removedSchedulers []string + var err error if p.isPauseConfigEnabled() { // after 4.0.8 we can set these config with TTL removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, disablePDCfg, pdRequest) @@ -611,12 +654,11 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo UndoFunc, err // which doesn't have temporary config setting. err = p.doUpdatePDScheduleConfig(ctx, disablePDCfg, pdRequest) if err != nil { - return + return nil, err } removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, nil, pdRequest) } - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) - return undo, errors.Trace(err) + return removedSchedulers, err } // Close close the connection to pd. diff --git a/tests/lightning_checkpoint/run.sh b/tests/lightning_checkpoint/run.sh index d2289b414..f4bbede37 100755 --- a/tests/lightning_checkpoint/run.sh +++ b/tests/lightning_checkpoint/run.sh @@ -110,7 +110,7 @@ for BACKEND in importer local; do run_lightning -d "$DBPATH" --backend $BACKEND --enable-checkpoint=1 run_sql "$PARTIAL_IMPORT_QUERY" check_contains "s: $(( (1000 * $CHUNK_COUNT + 1001) * $CHUNK_COUNT * $TABLE_COUNT ))" - run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cppk.1357924680.bak`.table_v6 WHERE status >= 200' + run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cppk.1357924680.bak`.table_v7 WHERE status >= 200' check_contains "count(*): $TABLE_COUNT" # Ensure there is no dangling open engines diff --git a/tests/lightning_checkpoint_chunks/run.sh b/tests/lightning_checkpoint_chunks/run.sh index f7b7cb92e..d06adfd9b 100755 --- a/tests/lightning_checkpoint_chunks/run.sh +++ b/tests/lightning_checkpoint_chunks/run.sh @@ -32,7 +32,7 @@ verify_checkpoint_noop() { run_sql 'SELECT count(i), sum(i) FROM cpch_tsr.tbl;' check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))" check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))" - run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cpch.1234567890.bak`.table_v6 WHERE status >= 200' + run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cpch.1234567890.bak`.table_v7 WHERE status >= 200' check_contains "count(*): 1" } diff --git a/tests/lightning_error_summary/data/error_summary.c.sql b/tests/lightning_error_summary/data/error_summary.c.sql index be11c04ab..4ed9e54a4 100644 --- a/tests/lightning_error_summary/data/error_summary.c.sql +++ b/tests/lightning_error_summary/data/error_summary.c.sql @@ -1 +1 @@ -INSERT INTO c VALUES (10, 100), (1000, 10000); +INSERT INTO c VALUES (3, 100), (1000, 10000); diff --git a/tests/lightning_incremental/config.toml b/tests/lightning_incremental/config.toml new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lightning_incremental/data/incr-schema-create.sql b/tests/lightning_incremental/data/incr-schema-create.sql new file mode 100644 index 000000000..624892540 --- /dev/null +++ b/tests/lightning_incremental/data/incr-schema-create.sql @@ -0,0 +1 @@ +create database `incr`; diff --git a/tests/lightning_incremental/data/incr.auto_random-schema.sql b/tests/lightning_incremental/data/incr.auto_random-schema.sql new file mode 100644 index 000000000..028c7c9d9 --- /dev/null +++ b/tests/lightning_incremental/data/incr.auto_random-schema.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +CREATE TABLE `auto_random` ( + `id` bigint primary key clustered auto_random, + v varchar(255) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data/incr.auto_random.sql b/tests/lightning_incremental/data/incr.auto_random.sql new file mode 100644 index 000000000..d4357822b --- /dev/null +++ b/tests/lightning_incremental/data/incr.auto_random.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `auto_random` (`v`) VALUES +("a"), +("b"), +("c"); diff --git a/tests/lightning_incremental/data/incr.pk_auto_inc-schema.sql b/tests/lightning_incremental/data/incr.pk_auto_inc-schema.sql new file mode 100644 index 000000000..52e876978 --- /dev/null +++ b/tests/lightning_incremental/data/incr.pk_auto_inc-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE `auto_random` ( + `id` bigint PRIMARY KEY AUTO_INCREMENT, + v varchar(255) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data/incr.pk_auto_inc.sql b/tests/lightning_incremental/data/incr.pk_auto_inc.sql new file mode 100644 index 000000000..ac85444a5 --- /dev/null +++ b/tests/lightning_incremental/data/incr.pk_auto_inc.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `pk_auto_inc` (`v`) VALUES +("a"), +("b"), +("c"); diff --git a/tests/lightning_incremental/data/incr.rowid_uk_inc-schema.sql b/tests/lightning_incremental/data/incr.rowid_uk_inc-schema.sql new file mode 100644 index 000000000..c1ace8ba9 --- /dev/null +++ b/tests/lightning_incremental/data/incr.rowid_uk_inc-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE `rowid_uk_inc` ( + `id` bigint UNIQUE KEY AUTO_INCREMENT, + v varchar(16) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data/incr.rowid_uk_inc.sql b/tests/lightning_incremental/data/incr.rowid_uk_inc.sql new file mode 100644 index 000000000..b90acb9b1 --- /dev/null +++ b/tests/lightning_incremental/data/incr.rowid_uk_inc.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `rowid_uk_inc` (`v`) VALUES +('a'), +('b'), +('c'); diff --git a/tests/lightning_incremental/data/incr.uk_auto_inc-schema.sql b/tests/lightning_incremental/data/incr.uk_auto_inc-schema.sql new file mode 100644 index 000000000..3901d7ed3 --- /dev/null +++ b/tests/lightning_incremental/data/incr.uk_auto_inc-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE `uk_auto_inc` ( + `pk` int PRIMARY KEY, + `id` bigint UNIQUE KEY AUTO_INCREMENT +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data/incr.uk_auto_inc.sql b/tests/lightning_incremental/data/incr.uk_auto_inc.sql new file mode 100644 index 000000000..4b1e7b134 --- /dev/null +++ b/tests/lightning_incremental/data/incr.uk_auto_inc.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `uk_auto_inc` (`pk`) VALUES +(1), +(2), +(3); diff --git a/tests/lightning_incremental/data1/incr-schema-create.sql b/tests/lightning_incremental/data1/incr-schema-create.sql new file mode 100644 index 000000000..624892540 --- /dev/null +++ b/tests/lightning_incremental/data1/incr-schema-create.sql @@ -0,0 +1 @@ +create database `incr`; diff --git a/tests/lightning_incremental/data1/incr.auto_random-schema.sql b/tests/lightning_incremental/data1/incr.auto_random-schema.sql new file mode 100644 index 000000000..028c7c9d9 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.auto_random-schema.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +CREATE TABLE `auto_random` ( + `id` bigint primary key clustered auto_random, + v varchar(255) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data1/incr.auto_random.sql b/tests/lightning_incremental/data1/incr.auto_random.sql new file mode 100644 index 000000000..7e89d09b5 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.auto_random.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `auto_random` (`v`) VALUES +("d"), +("e"), +("f"); diff --git a/tests/lightning_incremental/data1/incr.pk_auto_inc-schema.sql b/tests/lightning_incremental/data1/incr.pk_auto_inc-schema.sql new file mode 100644 index 000000000..52e876978 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.pk_auto_inc-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE `auto_random` ( + `id` bigint PRIMARY KEY AUTO_INCREMENT, + v varchar(255) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data1/incr.pk_auto_inc.sql b/tests/lightning_incremental/data1/incr.pk_auto_inc.sql new file mode 100644 index 000000000..5a0ab087d --- /dev/null +++ b/tests/lightning_incremental/data1/incr.pk_auto_inc.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `pk_auto_inc` (`v`) VALUES +("d"), +("e"), +("f"); diff --git a/tests/lightning_incremental/data1/incr.rowid_uk_inc-schema.sql b/tests/lightning_incremental/data1/incr.rowid_uk_inc-schema.sql new file mode 100644 index 000000000..c9bc49801 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.rowid_uk_inc-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE `uk_auto_inc` ( + `id` bigint UNIQUE KEY AUTO_INCREMENT, + v varchar(16) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data1/incr.rowid_uk_inc.sql b/tests/lightning_incremental/data1/incr.rowid_uk_inc.sql new file mode 100644 index 000000000..f4ab9a5a7 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.rowid_uk_inc.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `rowid_uk_inc` (`v`) VALUES +("d"), +("e"), +("f"); diff --git a/tests/lightning_incremental/data1/incr.uk_auto_inc-schema.sql b/tests/lightning_incremental/data1/incr.uk_auto_inc-schema.sql new file mode 100644 index 000000000..3901d7ed3 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.uk_auto_inc-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE `uk_auto_inc` ( + `pk` int PRIMARY KEY, + `id` bigint UNIQUE KEY AUTO_INCREMENT +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; diff --git a/tests/lightning_incremental/data1/incr.uk_auto_inc.sql b/tests/lightning_incremental/data1/incr.uk_auto_inc.sql new file mode 100644 index 000000000..31d87c135 --- /dev/null +++ b/tests/lightning_incremental/data1/incr.uk_auto_inc.sql @@ -0,0 +1,5 @@ +/*!40103 SET TIME_ZONE='+00:00' */; +INSERT INTO `uk_auto_inc` (`pk`) VALUES +(4), +(5), +(6); diff --git a/tests/lightning_incremental/run.sh b/tests/lightning_incremental/run.sh new file mode 100644 index 000000000..bf8ccde57 --- /dev/null +++ b/tests/lightning_incremental/run.sh @@ -0,0 +1,76 @@ +#!/bin/sh +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +check_cluster_version 4 0 0 "incremental restore" || exit 0 + +DB_NAME=incr + +for backend in importer local; do + run_sql "DROP DATABASE IF EXISTS incr;" + run_lightning --backend $backend + + for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do + run_sql "SELECT count(*) from incr.$tbl" + check_contains "count(*): 3" + done + + for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do + if [ "$tbl" = "auto_random" ]; then + run_sql "SELECT id & b'000001111111111111111111111111111111111111111111111111111111111' as inc FROM incr.$tbl" + else + run_sql "SELECT id as inc FROM incr.$tbl" + fi + check_contains 'inc: 1' + check_contains 'inc: 2' + check_contains 'inc: 3' + done + + for tbl in pk_auto_inc rowid_uk_inc; do + run_sql "SELECT group_concat(v) from incr.$tbl group by 'all';" + check_contains "group_concat(v): a,b,c" + done + + run_sql "SELECT sum(pk) from incr.uk_auto_inc;" + check_contains "sum(pk): 6" + + # incrementally import all data in data1 + run_lightning --backend $backend -d "tests/$TEST_NAME/data1" + + for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do + run_sql "SELECT count(*) from incr.$tbl" + check_contains "count(*): 6" + done + + for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do + if [ "$tbl" = "auto_random" ]; then + run_sql "SELECT id & b'000001111111111111111111111111111111111111111111111111111111111' as inc FROM incr.$tbl" + else + run_sql "SELECT id as inc FROM incr.$tbl" + fi + check_contains 'inc: 4' + check_contains 'inc: 5' + check_contains 'inc: 6' + done + + for tbl in pk_auto_inc rowid_uk_inc; do + run_sql "SELECT group_concat(v) from incr.$tbl group by 'all';" + check_contains "group_concat(v): a,b,c,d,e,f" + done + + run_sql "SELECT sum(pk) from incr.uk_auto_inc;" + check_contains "sum(pk): 21" +done diff --git a/tests/lightning_local_backend/run.sh b/tests/lightning_local_backend/run.sh index e43fd04fe..cfecd3e72 100755 --- a/tests/lightning_local_backend/run.sh +++ b/tests/lightning_local_backend/run.sh @@ -58,7 +58,7 @@ run_sql 'DROP DATABASE cpeng;' rm -f "/tmp/tidb_lightning_checkpoint_local_backend_test.pb" set +e -export GO_FAILPOINTS='github.com/pingcap/br/pkg/lightning/restore/FailBeforeDataEngineImported=return' +export GO_FAILPOINTS='github.com/pingcap/br/pkg/lightning/restore/FailIfStatusBecomes=return(90);' for i in $(seq "$ENGINE_COUNT"); do echo "******** Importing Table Now (step $i/$ENGINE_COUNT) ********" run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" diff --git a/tests/lightning_tidb_rowid/run.sh b/tests/lightning_tidb_rowid/run.sh index 4397c2679..395c21978 100755 --- a/tests/lightning_tidb_rowid/run.sh +++ b/tests/lightning_tidb_rowid/run.sh @@ -57,13 +57,8 @@ for BACKEND in local importer tidb; do run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase' check_contains 'count(*): 1' - if [ "$BACKEND" == 'tidb' ]; then - check_contains 'min(_tidb_rowid): 70000' - check_contains 'max(_tidb_rowid): 70000' - else - check_contains 'min(_tidb_rowid): 1' - check_contains 'max(_tidb_rowid): 1' - fi + check_contains 'min(_tidb_rowid): 70000' + check_contains 'max(_tidb_rowid): 70000' run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")' run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"' check_contains '_tidb_rowid > 70000: 1' diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 68482b4fb..c019a1265 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -32,6 +32,10 @@ table-concurrency = 6 # adjusted according to monitoring. # Ref: https://en.wikipedia.org/wiki/Disk_buffer#Read-ahead/read-behind # io-concurrency = 5 +# meta-schema-name is (database name) to store lightning task and table metadata. +# the meta schema and tables is store in target tidb cluster. +# this config is only used in "local" and "importer" backend. +# meta-schema-name = "lightning_metadata" # logging level = "info"