Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compacted restore: fix the wrong initial configrations #58050

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,17 @@ func TestCheckpointCompactedRestoreRunner(t *testing.T) {
respCount++
}

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.True(t, exists)

_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 3, respCount)

err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
exists = checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName))
require.False(t, exists)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ func ExistsSstRestoreCheckpoint(
dom *domain.Domain,
dbName string,
) bool {
// we only check the existence of the checkpoint data table
// because the checkpoint metadata is not used for restore
return dom.InfoSchema().
TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointMetaTableName))
TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointDataTableName))
}

func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error {
Expand Down
51 changes: 36 additions & 15 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func NewSstRestoreManager(
storeCount uint,
createCheckpointSessionFn func() (glue.Session, error),
) (*SstRestoreManager, error) {
var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]
// This poolSize is similar to full restore, as both workflows are comparable.
// The poolSize should be greater than concurrencyPerStore multiplied by the number of stores.
poolSize := concurrencyPerStore * 32 * storeCount
Expand All @@ -166,14 +167,12 @@ func NewSstRestoreManager(
return nil, errors.Trace(err)
}
if se != nil {
checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
if err != nil {
return nil, errors.Trace(err)
}
s.checkpointRunner = checkpointRunner
}
// TODO implement checkpoint
s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, nil)
s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, checkpointRunner)
return s, nil
}

Expand All @@ -182,14 +181,13 @@ type LogClient struct {
logRestoreManager *LogRestoreManager
sstRestoreManager *SstRestoreManager

cipher *backuppb.CipherInfo
pdClient pd.Client
pdHTTPClient pdhttp.Client
clusterID uint64
dom *domain.Domain
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
concurrencyPerStore uint
cipher *backuppb.CipherInfo
pdClient pd.Client
pdHTTPClient pdhttp.Client
clusterID uint64
dom *domain.Domain
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters

rawKVClient *rawkv.RawKVBatchClient
storage storage.ExternalStorage
Expand Down Expand Up @@ -263,6 +261,7 @@ func (rc *LogClient) RestoreCompactedSstFiles(
// Collect all items from the iterator in advance to avoid blocking during restoration.
// This approach ensures that we have all necessary data ready for processing,
// preventing any potential delays caused by waiting for the iterator to yield more items.
start := time.Now()
for r := compactionsIter.TryNext(ctx); !r.Finished; r = compactionsIter.TryNext(ctx) {
if r.Err != nil {
return r.Err
Expand Down Expand Up @@ -295,6 +294,13 @@ func (rc *LogClient) RestoreCompactedSstFiles(
}
}()

log.Info("[Compacted SST Restore] Start to restore SST files",
zap.Int("sst-file-count", len(backupFileSets)), zap.Duration("iterate-take", time.Since(start)))
start = time.Now()
defer func() {
log.Info("[Compacted SST Restore] Restore SST files finished", zap.Duration("restore-take", time.Since(start)))
}()

// To optimize performance and minimize cross-region downloads,
// we are currently opting for a single restore approach instead of batch restoration.
// This decision is similar to the handling of raw and txn restores,
Expand Down Expand Up @@ -422,7 +428,7 @@ func (rc *LogClient) InitClients(

opt := snapclient.NewSnapFileImporterOptions(
rc.cipher, metaClient, importCli, backend,
snapclient.RewriteModeKeyspace, stores, rc.concurrencyPerStore, createCallBacks, closeCallBacks,
snapclient.RewriteModeKeyspace, stores, concurrencyPerStore, createCallBacks, closeCallBacks,
)
snapFileImporter, err := snapclient.NewSnapFileImporter(
ctx, rc.dom.Store().GetCodec().GetAPIVersion(), snapclient.TiDBCompcated, opt)
Expand All @@ -442,9 +448,24 @@ func (rc *LogClient) InitClients(
func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore(
ctx context.Context,
) (map[string]struct{}, error) {
// get sst checkpoint to skip repeated files
sstCheckpointSets := make(map[string]struct{})
// TODO initial checkpoint

if checkpoint.ExistsSstRestoreCheckpoint(ctx, rc.dom, checkpoint.CustomSSTRestoreCheckpointDatabaseName) {
// we need to load the checkpoint data for the following restore
execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor()
_, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.CustomSSTRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) {
sstCheckpointSets[v.Name] = struct{}{}
})
if err != nil {
return nil, errors.Trace(err)
}
} else {
// initialize the checkpoint metadata since it is the first time to restore.
err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.unsafeSession, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil)
if err != nil {
return nil, errors.Trace(err)
}
}
return sstCheckpointSets, nil
}

Expand Down
1 change: 0 additions & 1 deletion br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ func (p *PipelineRestorerWrapper[T]) WithSplit(ctx context.Context, i iter.TryNe

// Check if the accumulated items meet the criteria for splitting.
if strategy.ShouldSplit() {
log.Info("Trying to start region split with accumulations")
startTime := time.Now()

// Execute the split operation on the accumulated items.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ go_test(
],
embed = [":snap_client"],
flaky = True,
shard_count = 18,
shard_count = 19,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func NewSnapFileImporter(
kvMode KvMode,
options *SnapFileImporterOptions,
) (*SnapFileImporter, error) {
if options.concurrencyPerStore == 0 {
return nil, errors.New("concurrencyPerStore must be greater than 0")
}
fileImporter := &SnapFileImporter{
apiVersion: apiVersion,
kvMode: kvMode,
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/restore/snap_client/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (client *fakeImporterClient) MultiIngest(
return &import_sstpb.IngestResponse{}, nil
}

func TestUnproperConfigSnapImporter(t *testing.T) {
ctx := context.Background()
opt := snapclient.NewSnapFileImporterOptionsForTest(nil, nil, nil, snapclient.RewriteModeKeyspace, 0)
_, err := snapclient.NewSnapFileImporter(ctx, kvrpcpb.APIVersion_V1, snapclient.TiDBFull, opt)
require.Error(t, err)
}

func TestSnapImporter(t *testing.T) {
ctx := context.Background()
splitClient := split.NewFakeSplitClient()
Expand Down
1 change: 1 addition & 0 deletions br/tests/br_pitr/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ fi
# PITR restore
echo "run pitr"
run_sql "DROP DATABASE __TiDB_BR_Temporary_Log_Restore_Checkpoint;"
run_sql "DROP DATABASE __TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint;"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1

check_result
Expand Down