Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132854: sql/row: fix multi-range reads from PCR standby r=DrewKimball,nvanbenschoten,dt a=michae2

Reads from tables with external row data (i.e. reads from a PCR standby cluster) need to use the fixed timestamp specified by the external row data. This timestamp might be different from the transaction timestamp, so we were explicitly setting BatchRequest.Timestamp in kv_batch_fetcher.

The KV API only allows BatchRequest.Timestamp to be set for non-transactional requests (i.e. requests sent with a NonTransactionalSender, which is a CrossRangeTxnWrapperSender in this case). We were using a NonTransactionalSender, but this had two problems:

1. CrossRangeTxnWrapperSender in turn sends the BatchRequest with a transactional sender, which again does not allow BatchRequest.Timestamp to be set.
2. CrossRangeTxnWrapperSender uses `kv.(*Txn).CommitInBatch`, which does not provide the 1-to-1 request-response guarantee required by txnKVFetcher. It is `kv.(*Txn).Send` which provides this guarantee.

Because of these two problems, whenever the txnKVFetcher would send a multi-range-spanning BatchRequest to CrossRangeTxnWrapperSender, it would either fail with a "transactional request must not set batch timestamp" error or would return an unexpected number of responses, violating the txnKVFetcher's assumed mapping from request to response.

To fix both these problems, instead of using a NonTransactionalSender, change the txnKVFetcher to open a new root transaction with the correct fixed timestamp, and then use txn.Send.

Fixes: #132608

Release note: None

133033: kv: deflake TestMergeQueue, control merge queue directly r=nvanbenschoten a=nvanbenschoten

Fixes #132831.

This commit deflakes TestMergeQueue by changing how it disables the merge queue. Previously, it attempted to disable the merge queue by disabling the replica scanner. This was insufficient to prevent the merge queue from running at unexpected times, because there are other reactive reasons for the merge queue to run (e.g. gossip updates).

This commit switches the test to directly setting the merge queue as active or inactive when needed. This is more reliable because it prevents unexpected background activity causing the merge queue to run at inopportune times.

Release note: None

133080: backupccl: remove deprecated restore checkpointing method r=dt a=msbutler

The new checkpointing logic has been default since 23.1, so it is safe delete the old checkpointing logic-- we do not expect 25.1 to resume a restore that began pre 23.1.

Epic: none

Release note: non

Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
4 people committed Oct 22, 2024
4 parents c800bae + 121da38 + 09a0758 + e3a1093 commit fcf7acd
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 247 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
filter, err := makeSpanCoveringFilter(
backups[numBackups-1].Spans,
[]jobspb.RestoreProgress_FrontierEntry{},
nil,
introducedSpanFrontier,
0,
defaultMaxFileCount,
false)
)
require.NoError(b, err)
defer filter.close()

Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,9 @@ func runGenerativeSplitAndScatter(
filter, err := makeSpanCoveringFilter(
spec.Spans,
spec.CheckpointedSpans,
spec.HighWater,
introducedSpanFrontier,
spec.TargetSize,
spec.MaxFileCount,
spec.UseFrontierCheckpointing)
spec.MaxFileCount)
if err != nil {
return errors.Wrap(err, "failed to make span covering filter")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ func makeTestingGenerativeSplitAndScatterSpec(
EndTime: hlc.Timestamp{},
Spans: requiredSpans,
BackupLocalityInfo: nil,
HighWater: nil,
UserProto: "",
ChunkSize: 1,
TargetSize: 1,
Expand Down
15 changes: 1 addition & 14 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,11 @@ func restore(
return emptyRowCount, err
}

ver := job.Payload().CreationClusterVersion
// TODO(radu,msbutler,stevendanna): we might be able to remove this now?
on231 := ver.Major > 23 || (ver.Major == 23 && ver.Minor >= 1)
restoreCheckpoint := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
requiredSpans := dataToRestore.getSpans()
progressTracker, err := makeProgressTracker(
requiredSpans,
restoreCheckpoint,
on231,
restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV),
endTime)
if err != nil {
Expand Down Expand Up @@ -353,11 +349,9 @@ func restore(
return makeSpanCoveringFilter(
requiredSpans,
restoreCheckpoint,
job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater,
introducedSpanFrontier,
targetSize,
maxFileCount,
progressTracker.useFrontier)
maxFileCount)
}(); err != nil {
return roachpb.RowCount{}, err
}
Expand Down Expand Up @@ -436,13 +430,6 @@ func restore(
}
tasks = append(tasks, jobProgressLoop)
}
if !progressTracker.useFrontier {
// This goroutine feeds the deprecated high water mark variant of the
// generativeCheckpointLoop.
tasks = append(tasks, func(ctx context.Context) error {
return genSpan(ctx, progressTracker.inFlightSpanFeeder)
})
}

progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
if !details.ExperimentalOnline {
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,17 @@ func distRestore(
EndTime: md.restoreTime,
Spans: md.dataToRestore.getSpans(),
BackupLocalityInfo: md.backupLocalityInfo,
HighWater: md.spanFilter.highWaterMark,
UserProto: execCtx.User().EncodeProto(),
TargetSize: md.spanFilter.targetSize,
MaxFileCount: int64(md.spanFilter.maxFileCount),
ChunkSize: int64(chunkSize),
NumEntries: int64(md.numImportSpans),
UseFrontierCheckpointing: md.spanFilter.useFrontierCheckpointing,
NumNodes: int64(numNodes),
JobID: int64(md.jobID),
SQLInstanceIDs: instanceIDs,
ExclusiveFileSpanComparison: md.exclusiveEndKeys,
}
if md.spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0)
}
spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0)

splitAndScatterProc := physicalplan.Processor{
SQLInstanceID: execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID(),
Expand Down
180 changes: 53 additions & 127 deletions pkg/ccl/backupccl/restore_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
spanUtils "github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)

Expand Down Expand Up @@ -50,26 +49,7 @@ type progressTracker struct {

// res tracks the amount of data that has been ingested.
res roachpb.RowCount

// Note that the fields below are used for the deprecated high watermark progress
// tracker.
// highWaterMark represents the index into the requestsCompleted map.
highWaterMark int64
ceiling int64

// As part of job progress tracking, inFlightImportSpans tracks all the
// spans that have been generated are being processed by the processors in
// distRestore. requestsCompleleted tracks the spans from
// inFlightImportSpans that have completed its processing. Once all spans up
// to index N have been processed (and appear in requestsCompleted), then
// any spans with index < N will be removed from both inFlightImportSpans
// and requestsCompleted maps.
inFlightImportSpans map[int64]roachpb.Span
requestsCompleted map[int64]bool
}
useFrontier bool
inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry

// endTime is the restore as of timestamp. This can be empty, and an empty timestamp
// indicates a restore of the latest revision.
endTime hlc.Timestamp
Expand All @@ -78,7 +58,6 @@ type progressTracker struct {
func makeProgressTracker(
requiredSpans roachpb.Spans,
persistedSpans []jobspb.RestoreProgress_FrontierEntry,
useFrontier bool,
maxBytes int64,
endTime hlc.Timestamp,
) (*progressTracker, error) {
Expand All @@ -87,32 +66,20 @@ func makeProgressTracker(
checkpointFrontier spanUtils.Frontier
err error
nextRequiredSpanKey map[string]roachpb.Key
inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry
)
if useFrontier {
checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans)
if err != nil {
return nil, err
}
nextRequiredSpanKey = make(map[string]roachpb.Key)
for i := 0; i < len(requiredSpans)-1; i++ {
nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key
}

} else {
inFlightSpanFeeder = make(chan execinfrapb.RestoreSpanEntry, 1000)
checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans)
if err != nil {
return nil, err
}
nextRequiredSpanKey = make(map[string]roachpb.Key)
for i := 0; i < len(requiredSpans)-1; i++ {
nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key
}

pt := &progressTracker{}
pt.mu.checkpointFrontier = checkpointFrontier
pt.mu.highWaterMark = -1
pt.mu.ceiling = 0
pt.mu.inFlightImportSpans = make(map[int64]roachpb.Span)
pt.mu.requestsCompleted = make(map[int64]bool)
pt.nextRequiredSpanKey = nextRequiredSpanKey
pt.maxBytes = maxBytes
pt.useFrontier = useFrontier
pt.inFlightSpanFeeder = inFlightSpanFeeder
pt.endTime = endTime
return pt, nil
}
Expand Down Expand Up @@ -182,16 +149,10 @@ func (pt *progressTracker) updateJobCallback(
func() {
pt.mu.Lock()
defer pt.mu.Unlock()
if pt.useFrontier {
// TODO (msbutler): this requires iterating over every span in the frontier,
// and rewriting every completed required span to disk.
// We may want to be more intelligent about this.
d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes)
} else {
if pt.mu.highWaterMark >= 0 {
d.Restore.HighWater = pt.mu.inFlightImportSpans[pt.mu.highWaterMark].Key
}
}
// TODO (msbutler): this requires iterating over every span in the frontier,
// and rewriting every completed required span to disk.
// We may want to be more intelligent about this.
d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes)
}()
default:
log.Errorf(progressedCtx, "job payload had unexpected type %T", d)
Expand Down Expand Up @@ -224,83 +185,48 @@ func (pt *progressTracker) ingestUpdate(
}

pt.mu.res.Add(progDetails.Summary)
if pt.useFrontier {
updateSpan := progDetails.DataSpan.Clone()
// If the completedSpan has the same end key as a requiredSpan_i, forward
// the frontier for the span [completedSpan_startKey,
// requiredSpan_i+1_startKey]. This trick ensures the span frontier will
// contain a single entry when the restore completes. Recall that requiredSpans are
// disjoint, and a spanFrontier never merges disjoint spans. So, without
// this trick, the spanFrontier will have O(requiredSpans) entries when the
// restore completes. This trick ensures all spans persisted to the frontier are adjacent,
// and consequently, will eventually merge.
//
// Here's a visual example:
// - this restore has two required spans: [a,d) and [e,h).
// - the restore span entry [c,d) just completed, implying the frontier logically looks like:
//
// tC| x---o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - since [c,d)'s endkey equals the required span (a,d]'s endkey,
// also update the gap between required span 1 and 2 in the frontier:
//
// tC| x-------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - this will ensure that when all subspans in required spans 1 and 2 complete,
// the checkpoint frontier has one span:
//
// tC| x---------------------------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok {
updateSpan.EndKey = newEndKey
}
if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil {
return false, err
}
} else {
idx := progDetails.ProgressIdx

if idx >= pt.mu.ceiling {
for i := pt.mu.ceiling; i <= idx; i++ {
importSpan, ok := <-pt.inFlightSpanFeeder
if !ok {
// The channel has been closed, there is nothing left to do.
log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed")
return true, nil
}
pt.mu.inFlightImportSpans[i] = importSpan.Span
}
pt.mu.ceiling = idx + 1
}

if sp, ok := pt.mu.inFlightImportSpans[idx]; ok {
// Assert that we're actually marking the correct span done. See #23977.
if !sp.Key.Equal(progDetails.DataSpan.Key) {
return false, errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, sp,
)
}
pt.mu.requestsCompleted[idx] = true
prevHighWater := pt.mu.highWaterMark
for j := pt.mu.highWaterMark + 1; j < pt.mu.ceiling && pt.mu.requestsCompleted[j]; j++ {
pt.mu.highWaterMark = j
}
for j := prevHighWater; j < pt.mu.highWaterMark; j++ {
delete(pt.mu.requestsCompleted, j)
delete(pt.mu.inFlightImportSpans, j)
}
}
updateSpan := progDetails.DataSpan.Clone()
// If the completedSpan has the same end key as a requiredSpan_i, forward
// the frontier for the span [completedSpan_startKey,
// requiredSpan_i+1_startKey]. This trick ensures the span frontier will
// contain a single entry when the restore completes. Recall that requiredSpans are
// disjoint, and a spanFrontier never merges disjoint spans. So, without
// this trick, the spanFrontier will have O(requiredSpans) entries when the
// restore completes. This trick ensures all spans persisted to the frontier are adjacent,
// and consequently, will eventually merge.
//
// Here's a visual example:
// - this restore has two required spans: [a,d) and [e,h).
// - the restore span entry [c,d) just completed, implying the frontier logically looks like:
//
// tC| x---o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - since [c,d)'s endkey equals the required span (a,d]'s endkey,
// also update the gap between required span 1 and 2 in the frontier:
//
// tC| x-------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - this will ensure that when all subspans in required spans 1 and 2 complete,
// the checkpoint frontier has one span:
//
// tC| x---------------------------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok {
updateSpan.EndKey = newEndKey
}
if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil {
return false, err
}
return true, nil
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestProgressTracker(t *testing.T) {
},
} {
restoreTime := hlc.Timestamp{}
pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0, restoreTime)
pt, err := makeProgressTracker(requiredSpans, persistedSpans, 0, restoreTime)
require.NoError(t, err, "step %d", i)

done, err := pt.ingestUpdate(ctx, mockUpdate(step.update, step.completeUpTo))
Expand Down
Loading

0 comments on commit fcf7acd

Please sign in to comment.