Skip to content

Commit

Permalink
Merge #75969
Browse files Browse the repository at this point in the history
75969: backupccl: breakup the txn that inserts stats during cluster restore r=stevendanna a=adityamaru

We have seen instances of restores with hundreds of tables getting
stuck on inserting the backed up table stats into the system.table_stats
table on the restoring cluster. Previously, we would issue insert
statements for each table stat row in a single, long-running txn. If this
txn were to be retried a few times, we would observe intent buildup
on the system.table_stats ranges. Once these intents exceeded the
`max_intent_bytes` on the cluster, every subsequent txn retry would fall
back to the much more expensive ranged intent resolution. The only
remedy at this point would be to delete the BACKUP-STATISTICS file from
the bucket where the backup resides, and restore the tables with no
stats, relying on the AUTO STATS job to rebuild them gradually.

This change "batches" the insertion of the table stats to prevent the
above situation.

Fixes: #69207

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Feb 12, 2022
2 parents 212ddab + 568c463 commit 260be01
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 14 deletions.
87 changes: 87 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6152,6 +6152,93 @@ func TestBackupHandlesDroppedTypeStatsCollection(t *testing.T) {
sqlDB.Exec(t, `BACKUP foo TO $1`, dest)
}

// TestBatchedInsertStats is a test for the `insertStats` method used in a
// cluster restore to restore backed up statistics.
func TestBatchedInsertStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1
tc, sqlDB, _, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts,
InitManualReplication)
defer cleanupFn()
ctx := context.Background()
s := tc.Server(0)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
registry := s.JobRegistry().(*jobs.Registry)
mkJob := func(t *testing.T) *jobs.Job {
id := registry.MakeJobID()
job, err := registry.CreateJobWithTxn(ctx, jobs.Record{
// Job does not accept an empty Details field, so arbitrarily provide
// RestoreDetails.
Details: jobspb.RestoreDetails{},
Progress: jobspb.RestoreProgress{},
}, id, nil /* txn */)
require.NoError(t, err)
return job
}

generateTableStatistics := func(numStats int) []*stats.TableStatisticProto {
tableStats := make([]*stats.TableStatisticProto, 0, numStats)
for i := 0; i < numStats; i++ {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE foo%d (id INT)`, i))
var tableID descpb.ID
sqlDB.QueryRow(t, fmt.Sprintf(
`SELECT id FROM system.namespace WHERE name = 'foo%d'`, i)).Scan(&tableID)
tableStats = append(tableStats, &stats.TableStatisticProto{
TableID: tableID,
ColumnIDs: []descpb.ColumnID{1},
RowCount: 10,
DistinctCount: 0,
NullCount: 0,
})
}
return tableStats
}

for i, test := range []struct {
name string
numTableStats int
}{
{
name: "empty-stats",
numTableStats: 0,
},
{
name: "less-than-batch-size",
numTableStats: 5,
},
{
name: "equal-to-batch-size",
numTableStats: 10,
},
{
name: "greater-than-batch-size",
numTableStats: 15,
},
} {
t.Run(test.name, func(t *testing.T) {
dbName := fmt.Sprintf("foo%d", i)
defer sqlDB.Exec(t, fmt.Sprintf(`DROP DATABASE %s`, dbName))
sqlDB.Exec(t, fmt.Sprintf("CREATE DATABASE %s", dbName))
sqlDB.Exec(t, fmt.Sprintf("USE %s", dbName))
stats := generateTableStatistics(test.numTableStats)

// Clear the stats.
sqlDB.Exec(t, `DELETE FROM system.table_statistics WHERE true`)
job := mkJob(t)
require.NoError(t, insertStats(ctx, job, &execCfg, stats))
// If there are no stats to insert, we exit early without updating the
// job.
if test.numTableStats != 0 {
require.True(t, job.Details().(jobspb.RestoreDetails).StatsInserted)
}
res := sqlDB.QueryStr(t, `SELECT * FROM system.table_statistics`)
require.Len(t, res, test.numTableStats)
})
}
}

func TestBackupRestoreCorruptedStatsIgnored(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
48 changes: 34 additions & 14 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ import (
"github.com/gogo/protobuf/types"
)

// restoreStatsInsertBatchSize is an arbitrarily chosen value of the number of
// tables we process in a single txn when restoring their table statistics.
var restoreStatsInsertBatchSize = 10

func processTableForMultiRegion(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, table catalog.TableDescriptor,
) error {
Expand Down Expand Up @@ -1806,24 +1810,40 @@ func insertStats(
return nil
}

if latestStats == nil {
return nil
}
// We could be restoring hundreds of tables, so insert the new stats in
// batches instead of all in a single, long-running txn. This prevents intent
// buildup in the face of txn retries.
for {
if len(latestStats) == 0 {
return nil
}

err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := stats.InsertNewStats(ctx, execCfg.Settings, execCfg.InternalExecutor, txn, latestStats); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
if len(latestStats) < restoreStatsInsertBatchSize {
restoreStatsInsertBatchSize = len(latestStats)
}
details.StatsInserted = true
if err := job.SetDetails(ctx, txn, details); err != nil {
return errors.Wrapf(err, "updating job marking stats insertion complete")

if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := stats.InsertNewStats(ctx, execCfg.Settings, execCfg.InternalExecutor, txn,
latestStats[:restoreStatsInsertBatchSize]); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
}

// If this is the last batch, mark the stats insertion complete.
if restoreStatsInsertBatchSize == len(latestStats) {
details.StatsInserted = true
if err := job.SetDetails(ctx, txn, details); err != nil {
return errors.Wrapf(err, "updating job marking stats insertion complete")
}
}

return nil
}); err != nil {
return err
}
return nil
})
if err != nil {
return err

// Truncate the stats that we have inserted in the txn above.
latestStats = latestStats[restoreStatsInsertBatchSize:]
}
return nil
}

// publishDescriptors updates the RESTORED descriptors' status from OFFLINE to
Expand Down

0 comments on commit 260be01

Please sign in to comment.