Skip to content

Commit

Permalink
Merge #83083 #83151
Browse files Browse the repository at this point in the history
83083: sql: ensure more mutexes get properly unlocked in case of panic r=rafiss,ajwerner a=knz

Fixes #83078.
Informs #83080. 

83151: backupccl: make checkpoint interval configurable r=adityamaru,benbardin a=stevendanna

This PR makes the interval between checkpoints configurable and also
excludes the processing time of the checkpoint itself from that
interval.

The goal of this change is to potentially address issues we've seen in
large clusters that we currently believe can be attributed to the
backup process slowing down substantially once it takes a minute or
longer to marshall, compress, and write the progress checkpoint.

Release note (ops change): A new setting
`bulkio.backup.checkpoint_interval` controls the minimum interval
between writes of progress checkpoints to external storage.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
3 people committed Jun 22, 2022
3 parents b8e5b23 + f5cd2cc + 76f5a09 commit 76cbd88
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 341 deletions.
26 changes: 10 additions & 16 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -57,18 +58,11 @@ import (

// BackupCheckpointInterval is the interval at which backup progress is saved
// to durable storage.
var BackupCheckpointInterval = time.Minute

// TestingShortBackupCheckpointInterval sets the BackupCheckpointInterval
// to a shorter interval for testing purposes, so we can see multiple
// checkpoints written without having extremely large backups. It returns
// a function which resets the checkpoint interval to the old interval.
func TestingShortBackupCheckpointInterval(oldInterval time.Duration) func() {
BackupCheckpointInterval = time.Millisecond * 10
return func() {
BackupCheckpointInterval = oldInterval
}
}
var BackupCheckpointInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"bulkio.backup.checkpoint_interval",
"the minimum time between writing progress checkpoints during a backup",
time.Minute)

var forceReadBackupManifest = util.ConstantWithMetamorphicTestBool("backup-read-manifest", false)

Expand Down Expand Up @@ -240,22 +234,22 @@ func backup(
for i := int32(0); i < progDetails.CompletedSpans; i++ {
requestFinishedCh <- struct{}{}
}
if timeutil.Since(lastCheckpoint) > BackupCheckpointInterval {

interval := BackupCheckpointInterval.Get(&execCtx.ExecCfg().Settings.SV)
if timeutil.Since(lastCheckpoint) > interval {
resumerSpan.RecordStructured(&backuppb.BackupProgressTraceEvent{
TotalNumFiles: numBackedUpFiles,
TotalEntryCounts: backupManifest.EntryCounts,
RevisionStartTime: backupManifest.RevisionStartTime,
})

lastCheckpoint = timeutil.Now()

err := writeBackupManifestCheckpoint(
ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(),
)
if err != nil {
log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err)
}

lastCheckpoint = timeutil.Now()
if execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint != nil {
execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ func writeBackupManifestCheckpoint(
execCfg *sql.ExecutorConfig,
user username.SQLUsername,
) error {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "write-backup-manifest-checkpoint")
defer span.Finish()

defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, storageURI, user)
if err != nil {
return err
Expand Down
84 changes: 4 additions & 80 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,62 +1472,6 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) {
checkInProgressBackupRestore(t, checkFraction, checkFraction)
}

func TestBackupRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 33357)

defer func(oldInterval time.Duration) {
BackupCheckpointInterval = oldInterval
}(BackupCheckpointInterval)
BackupCheckpointInterval = 0

var checkpointPath string

checkBackup := func(ctx context.Context, ip inProgressState) error {
checkpointPath = filepath.Join(ip.dir, ip.name, backupProgressDirectory+"/"+backupManifestCheckpointName)
checkpointDescBytes, err := ioutil.ReadFile(checkpointPath)
if err != nil {
return errors.Wrap(err, "error while reading checkpoint")
}
var checkpointDesc backuppb.BackupManifest
if err := protoutil.Unmarshal(checkpointDescBytes, &checkpointDesc); err != nil {
return errors.Wrap(err, "error while unmarshalling checkpoint")
}
if len(checkpointDesc.Files) == 0 {
return errors.Errorf("empty backup checkpoint descriptor")
}
return nil
}

checkRestore := func(ctx context.Context, ip inProgressState) error {
jobID, err := ip.latestJobID()
if err != nil {
return err
}
highWaterMark, err := getHighWaterMark(jobID, ip.DB)
if err != nil {
return err
}
low := keys.SystemSQLCodec.TablePrefix(ip.backupTableID)
high := keys.SystemSQLCodec.TablePrefix(ip.backupTableID + 1)
if bytes.Compare(highWaterMark, low) <= 0 || bytes.Compare(highWaterMark, high) >= 0 {
return errors.Errorf("expected high-water mark %v to be between %v and %v",
highWaterMark, low, high)
}
return nil
}

checkInProgressBackupRestore(t, checkBackup, checkRestore)

if _, err := os.Stat(checkpointPath); err == nil {
t.Fatalf("backup checkpoint descriptor at %s not cleaned up", checkpointPath)
} else if !oserror.IsNotExist(err) {
t.Fatal(err)
}
}

func createAndWaitForJob(
t *testing.T,
db *sqlutils.SQLRunner,
Expand Down Expand Up @@ -1702,25 +1646,6 @@ func TestBackupRestoreResume(t *testing.T) {
})
}

func getHighWaterMark(jobID jobspb.JobID, sqlDB *gosql.DB) (roachpb.Key, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, jobID,
).Scan(&progressBytes); err != nil {
return nil, err
}
var payload jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &payload); err != nil {
return nil, err
}
switch d := payload.Details.(type) {
case *jobspb.Progress_Restore:
return d.Restore.HighWater, nil
default:
return nil, errors.Errorf("unexpected job details type %T", d)
}
}

// TestBackupRestoreControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
// work as intended on backup and restore jobs.
func TestBackupRestoreControlJob(t *testing.T) {
Expand Down Expand Up @@ -9815,11 +9740,6 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The regular interval is a minute which would require us to take a
// very large backup in order to get more than one checkpoint. Instead,
// lower the interval and change it back to normal after the test.
resetCheckpointInterval := TestingShortBackupCheckpointInterval(BackupCheckpointInterval)
defer resetCheckpointInterval()
var numCheckpointsWritten int

// Set the testing knob so we count each time we write a checkpoint.
Expand All @@ -9840,6 +9760,10 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) {
tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

// The regular interval is a minute which would require us to take a
// very large backup in order to get more than one checkpoint.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'")

query := fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)

Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,10 +1017,12 @@ func (sc *SchemaChanger) distIndexBackfill(
if meta.BulkProcessorProgress != nil {
todoSpans = roachpb.SubtractSpans(todoSpans,
meta.BulkProcessorProgress.CompletedSpans)
mu.Lock()
mu.updatedTodoSpans = make([]roachpb.Span, len(todoSpans))
copy(mu.updatedTodoSpans, todoSpans)
mu.Unlock()
func() {
mu.Lock()
defer mu.Unlock()
mu.updatedTodoSpans = make([]roachpb.Span, len(todoSpans))
copy(mu.updatedTodoSpans, todoSpans)
}()

if sc.testingKnobs.AlwaysUpdateIndexBackfillDetails {
if err := updateJobDetails(); err != nil {
Expand Down
42 changes: 23 additions & 19 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,27 +651,29 @@ func (ib *IndexBackfiller) InitForDistributedUse(
// Close releases the resources used by the IndexBackfiller.
func (ib *IndexBackfiller) Close(ctx context.Context) {
if ib.mon != nil {
ib.muBoundAccount.Lock()
ib.muBoundAccount.boundAccount.Close(ctx)
ib.muBoundAccount.Unlock()
func() {
ib.muBoundAccount.Lock()
defer ib.muBoundAccount.Unlock()
ib.muBoundAccount.boundAccount.Close(ctx)
}()
ib.mon.Stop(ctx)
}
}

// GrowBoundAccount grows the mutex protected bound account backing the
// index backfiller.
func (ib *IndexBackfiller) GrowBoundAccount(ctx context.Context, growBy int64) error {
defer ib.muBoundAccount.Unlock()
ib.muBoundAccount.Lock()
defer ib.muBoundAccount.Unlock()
err := ib.muBoundAccount.boundAccount.Grow(ctx, growBy)
return err
}

// ShrinkBoundAccount shrinks the mutex protected bound account backing the
// index backfiller.
func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int64) {
defer ib.muBoundAccount.Unlock()
ib.muBoundAccount.Lock()
defer ib.muBoundAccount.Unlock()
ib.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy)
}

Expand Down Expand Up @@ -913,22 +915,24 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
// We lock the bound account for the duration of this method as it could
// attempt to Grow() it while encoding secondary indexes.
var memUsedDuringEncoding int64
ib.muBoundAccount.Lock()
if buffer, memUsedDuringEncoding, err = rowenc.EncodeSecondaryIndexes(
ctx,
ib.evalCtx.Codec,
tableDesc,
ib.indexesToEncode,
ib.colIdxMap,
ib.rowVals,
buffer,
false, /* includeEmpty */
&ib.muBoundAccount.boundAccount,
); err != nil {
ib.muBoundAccount.Unlock()
buffer, memUsedDuringEncoding, err = func(buffer []rowenc.IndexEntry) ([]rowenc.IndexEntry, int64, error) {
ib.muBoundAccount.Lock()
defer ib.muBoundAccount.Unlock()
return rowenc.EncodeSecondaryIndexes(
ctx,
ib.evalCtx.Codec,
tableDesc,
ib.indexesToEncode,
ib.colIdxMap,
ib.rowVals,
buffer,
false, /* includeEmpty */
&ib.muBoundAccount.boundAccount,
)
}(buffer)
if err != nil {
return nil, nil, 0, err
}
ib.muBoundAccount.Unlock()
memUsedPerChunk += memUsedDuringEncoding

// The memory monitor has already accounted for cap(entries). If the number
Expand Down
24 changes: 14 additions & 10 deletions pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,20 @@ func (ibm *IndexBackfillMerger) scan(
nextStart = resp.Rows[len(resp.Rows)-1].Key.Next()
chunk.completedSpan = roachpb.Span{Key: startKey, EndKey: nextStart}

ibm.muBoundAccount.Lock()
for i := range resp.Rows {
chunk.keys = append(chunk.keys, resp.Rows[i].Key)
if err := ibm.muBoundAccount.boundAccount.Grow(ctx, int64(len(resp.Rows[i].Key))); err != nil {
ibm.muBoundAccount.Unlock()
return mergeChunk{}, nil, errors.Wrap(err, "failed to allocate space for merge keys")
if err := func() error {
ibm.muBoundAccount.Lock()
defer ibm.muBoundAccount.Unlock()
for i := range resp.Rows {
chunk.keys = append(chunk.keys, resp.Rows[i].Key)
if err := ibm.muBoundAccount.boundAccount.Grow(ctx, int64(len(resp.Rows[i].Key))); err != nil {
return errors.Wrap(err, "failed to allocate space for merge keys")
}
chunkMem += int64(len(resp.Rows[i].Key))
}
chunkMem += int64(len(resp.Rows[i].Key))
return nil
}(); err != nil {
return mergeChunk{}, nil, err
}
ibm.muBoundAccount.Unlock()
}
chunk.memUsed = chunkMem
return chunk, nextStart, nil
Expand Down Expand Up @@ -477,14 +481,14 @@ func mergeEntry(sourceKV *kv.KeyValue, destKey roachpb.Key) (*kv.KeyValue, bool,
}

func (ibm *IndexBackfillMerger) growBoundAccount(ctx context.Context, growBy int64) error {
defer ibm.muBoundAccount.Unlock()
ibm.muBoundAccount.Lock()
defer ibm.muBoundAccount.Unlock()
return ibm.muBoundAccount.boundAccount.Grow(ctx, growBy)
}

func (ibm *IndexBackfillMerger) shrinkBoundAccount(ctx context.Context, shrinkBy int64) {
defer ibm.muBoundAccount.Unlock()
ibm.muBoundAccount.Lock()
defer ibm.muBoundAccount.Unlock()
ibm.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy)
}

Expand Down
Loading

0 comments on commit 76cbd88

Please sign in to comment.