diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index e7b2275cbf3b..3fede7e2af15 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6152,6 +6152,93 @@ func TestBackupHandlesDroppedTypeStatsCollection(t *testing.T) { sqlDB.Exec(t, `BACKUP foo TO $1`, dest) } +// TestBatchedInsertStats is a test for the `insertStats` method used in a +// cluster restore to restore backed up statistics. +func TestBatchedInsertStats(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const numAccounts = 1 + tc, sqlDB, _, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, + InitManualReplication) + defer cleanupFn() + ctx := context.Background() + s := tc.Server(0) + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + registry := s.JobRegistry().(*jobs.Registry) + mkJob := func(t *testing.T) *jobs.Job { + id := registry.MakeJobID() + job, err := registry.CreateJobWithTxn(ctx, jobs.Record{ + // Job does not accept an empty Details field, so arbitrarily provide + // RestoreDetails. + Details: jobspb.RestoreDetails{}, + Progress: jobspb.RestoreProgress{}, + }, id, nil /* txn */) + require.NoError(t, err) + return job + } + + generateTableStatistics := func(numStats int) []*stats.TableStatisticProto { + tableStats := make([]*stats.TableStatisticProto, 0, numStats) + for i := 0; i < numStats; i++ { + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE foo%d (id INT)`, i)) + var tableID descpb.ID + sqlDB.QueryRow(t, fmt.Sprintf( + `SELECT id FROM system.namespace WHERE name = 'foo%d'`, i)).Scan(&tableID) + tableStats = append(tableStats, &stats.TableStatisticProto{ + TableID: tableID, + ColumnIDs: []descpb.ColumnID{1}, + RowCount: 10, + DistinctCount: 0, + NullCount: 0, + }) + } + return tableStats + } + + for i, test := range []struct { + name string + numTableStats int + }{ + { + name: "empty-stats", + numTableStats: 0, + }, + { + name: "less-than-batch-size", + numTableStats: 5, + }, + { + name: "equal-to-batch-size", + numTableStats: 10, + }, + { + name: "greater-than-batch-size", + numTableStats: 15, + }, + } { + t.Run(test.name, func(t *testing.T) { + dbName := fmt.Sprintf("foo%d", i) + defer sqlDB.Exec(t, fmt.Sprintf(`DROP DATABASE %s`, dbName)) + sqlDB.Exec(t, fmt.Sprintf("CREATE DATABASE %s", dbName)) + sqlDB.Exec(t, fmt.Sprintf("USE %s", dbName)) + stats := generateTableStatistics(test.numTableStats) + + // Clear the stats. + sqlDB.Exec(t, `DELETE FROM system.table_statistics WHERE true`) + job := mkJob(t) + require.NoError(t, insertStats(ctx, job, &execCfg, stats)) + // If there are no stats to insert, we exit early without updating the + // job. + if test.numTableStats != 0 { + require.True(t, job.Details().(jobspb.RestoreDetails).StatsInserted) + } + res := sqlDB.QueryStr(t, `SELECT * FROM system.table_statistics`) + require.Len(t, res, test.numTableStats) + }) + } +} + func TestBackupRestoreCorruptedStatsIgnored(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 7caa5753b4ab..509049a7e784 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -65,6 +65,10 @@ import ( "github.com/gogo/protobuf/types" ) +// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of +// tables we process in a single txn when restoring their table statistics. +var restoreStatsInsertBatchSize = 10 + func processTableForMultiRegion( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, table catalog.TableDescriptor, ) error { @@ -1806,24 +1810,40 @@ func insertStats( return nil } - if latestStats == nil { - return nil - } + // We could be restoring hundreds of tables, so insert the new stats in + // batches instead of all in a single, long-running txn. This prevents intent + // buildup in the face of txn retries. + for { + if len(latestStats) == 0 { + return nil + } - err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := stats.InsertNewStats(ctx, execCfg.Settings, execCfg.InternalExecutor, txn, latestStats); err != nil { - return errors.Wrapf(err, "inserting stats from backup") + if len(latestStats) < restoreStatsInsertBatchSize { + restoreStatsInsertBatchSize = len(latestStats) } - details.StatsInserted = true - if err := job.SetDetails(ctx, txn, details); err != nil { - return errors.Wrapf(err, "updating job marking stats insertion complete") + + if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := stats.InsertNewStats(ctx, execCfg.Settings, execCfg.InternalExecutor, txn, + latestStats[:restoreStatsInsertBatchSize]); err != nil { + return errors.Wrapf(err, "inserting stats from backup") + } + + // If this is the last batch, mark the stats insertion complete. + if restoreStatsInsertBatchSize == len(latestStats) { + details.StatsInserted = true + if err := job.SetDetails(ctx, txn, details); err != nil { + return errors.Wrapf(err, "updating job marking stats insertion complete") + } + } + + return nil + }); err != nil { + return err } - return nil - }) - if err != nil { - return err + + // Truncate the stats that we have inserted in the txn above. + latestStats = latestStats[restoreStatsInsertBatchSize:] } - return nil } // publishDescriptors updates the RESTORED descriptors' status from OFFLINE to