Skip to content

Commit

Permalink
sql: add RevertSpans and RevertSpansFanout
Browse files Browse the repository at this point in the history
This PR adds RevertSpans and RevertSpansFanout.

RevertSpans de-duplicates some code between import rollback and
streaming cutover.

Along the way, it fixes a small bug that existed in RevertTables: Only
a single RevertRangeRequest is permitted in a batch since
RevertRangeRequest has the isAlone flag set. As a result, a future
caller of RevertTables would have encountered a fatal error from KV.

RevertSpansFanout uses DistSQL's PartitionSpans to manually fan out
multiple RevertRange requests. Since all users of revert range request
currently set a limit on the number of keys touched, dist sender
doesn't fanout such request.

Release note: None
  • Loading branch information
stevendanna committed Mar 2, 2023
1 parent 20e2add commit ff6c3a7
Show file tree
Hide file tree
Showing 6 changed files with 586 additions and 190 deletions.
284 changes: 179 additions & 105 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
Expand All @@ -31,7 +29,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -192,7 +192,7 @@ func updateRunningStatusInternal(
func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error {
// Cutover should be the *first* thing checked upon resumption as it is the
// most critical task in disaster recovery.
reverted, err := maybeRevertToCutoverTimestamp(ctx, execCtx, ingestionJob.ID())
reverted, err := maybeRevertToCutoverTimestamp(ctx, execCtx, ingestionJob)
if err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
log.Infof(ctx,
"starting to revert to the specified cutover timestamp for stream ingestion job %d",
ingestionJob.ID())
if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJob.ID()); err != nil {
if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJob); err != nil {
return err
}

Expand Down Expand Up @@ -481,9 +481,9 @@ func (s *streamIngestionResumer) protectDestinationTenant(
// revertToCutoverTimestamp attempts a cutover and errors out if one was not
// executed.
func revertToCutoverTimestamp(
ctx context.Context, execCtx interface{}, ingestionJobID jobspb.JobID,
ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job,
) error {
reverted, err := maybeRevertToCutoverTimestamp(ctx, execCtx, ingestionJobID)
reverted, err := maybeRevertToCutoverTimestamp(ctx, execCtx, ingestionJob)
if err != nil {
return err
}
Expand All @@ -494,56 +494,70 @@ func revertToCutoverTimestamp(
return nil
}

func cutoverTimeIsEligibleForCutover(
ctx context.Context, cutoverTime hlc.Timestamp, progress *jobspb.Progress,
) bool {
if cutoverTime.IsEmpty() {
log.Infof(ctx, "empty cutover time, no revert required")
return false
}
if progress.GetHighWater() == nil || progress.GetHighWater().Less(cutoverTime) {
log.Infof(ctx, "job with highwater %s not yet ready to revert to cutover at %s",
progress.GetHighWater(), cutoverTime.String())
return false
}
return true
}

// maybeRevertToCutoverTimestamp reads the job progress for the cutover time and
// if the job has progressed passed the cutover time issues a RevertRangeRequest
// with the target time set to that cutover time, to bring the ingesting cluster
// to a consistent state.
func maybeRevertToCutoverTimestamp(
ctx context.Context, execCtx interface{}, ingestionJobID jobspb.JobID,
ctx context.Context, p sql.JobExecContext, ingestionJob *jobs.Job,
) (bool, error) {
ctx, span := tracing.ChildSpan(ctx, "streamingest.revertToCutoverTimestamp")
defer span.Finish()

p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
jobRegistry := p.ExecCfg().JobRegistry
ingestionJob, err := jobRegistry.LoadJob(ctx, ingestionJobID)
if err != nil {
return false, err
}

var shouldRevertToCutover bool
if err := p.ExecCfg().InternalDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
return ingestionJob.WithTxn(txn).Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
payload := md.Payload.GetStreamIngestion()
if payload == nil {
return errors.Newf("unknown payload type %T in stream ingestion job %d",
md.Payload, ingestionJobID)
}
streamIngest := md.Progress.GetStreamIngest()
if streamIngest == nil {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
md.Progress, ingestionJobID)
}
cutoverTime := streamIngest.CutoverTime
if cutoverTime.IsEmpty() {
log.Infof(ctx, "empty cutover time, no revert required")
return nil
// The update below sets the ReplicationStatus to
// CuttingOver. Once set, the cutoverTimestamp cannot be
// changed. We want to be sure to read the timestamp that
// existed in the record at the point of the update rather the
// value that may be in the job record before the update.
var (
shouldRevertToCutover bool
cutoverTimestamp hlc.Timestamp
spanToRevert roachpb.Span
)
if err := ingestionJob.NoTxn().Update(ctx,
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
streamIngestionDetails := md.Payload.GetStreamIngestion()
if streamIngestionDetails == nil {
return errors.AssertionFailedf("unknown payload %v in stream ingestion job %d",
md.Payload, ingestionJob.ID())
}
if md.Progress.GetHighWater() == nil || md.Progress.GetHighWater().Less(cutoverTime) {
log.Infof(ctx, "job with highwater %s not yet ready to revert to cutover at %s",
md.Progress.GetHighWater(), cutoverTime.String())
return nil

streamIngestionProgress := md.Progress.GetStreamIngest()
if streamIngestionProgress == nil {
return errors.AssertionFailedf("unknown progress %v in stream ingestion job %d",
md.Progress, ingestionJob.ID())
}

shouldRevertToCutover = true
updateRunningStatusInternal(md, ju, jobspb.ReplicationCuttingOver,
fmt.Sprintf("starting to cut over to the given timestamp %s", cutoverTime))
cutoverTimestamp = streamIngestionProgress.CutoverTime
spanToRevert = streamIngestionDetails.Span
shouldRevertToCutover = cutoverTimeIsEligibleForCutover(ctx, cutoverTimestamp, md.Progress)

if shouldRevertToCutover {
updateRunningStatusInternal(md, ju, jobspb.ReplicationCuttingOver,
fmt.Sprintf("starting to cut over to the given timestamp %s", cutoverTimestamp))
} else {
if streamIngestionProgress.ReplicationStatus == jobspb.ReplicationCuttingOver {
return errors.AssertionFailedf("cutover already started but cutover time %s is not eligible for cutover",
cutoverTimestamp)
}
}
return nil
})
}); err != nil {
}); err != nil {
return false, err
}
if !shouldRevertToCutover {
Expand All @@ -554,71 +568,29 @@ func maybeRevertToCutoverTimestamp(
p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted()
}

origNRanges := -1
payload := ingestionJob.Payload()
progress := ingestionJob.Progress()
spans := []roachpb.Span{payload.GetStreamIngestion().Span}
updateJobProgress := func() error {
if spans == nil {
return nil
}
nRanges, err := sql.NumRangesInSpans(ctx, p.ExecCfg().DB, p.DistSQLPlanner(), spans)
if err != nil {
return err
}
m := jobRegistry.MetricsStruct().StreamIngest.(*Metrics)
m.ReplicationCutoverProgress.Update(int64(nRanges))
if origNRanges == -1 {
origNRanges = nRanges
}
return p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
if err := ingestionJob.WithTxn(txn).FractionProgressed(
ctx, jobs.FractionUpdater(fractionRangesFinished),
); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
}
return nil
})
minProgressUpdateInterval := 15 * time.Second
progMetric := p.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplicationCutoverProgress
progUpdater, err := newCutoverProgressTracker(ctx, p, spanToRevert, ingestionJob, progMetric, minProgressUpdateInterval)
if err != nil {
return false, err
}

cutoverTime := progress.GetStreamIngest().CutoverTime
for len(spans) != 0 {
if err := updateJobProgress(); err != nil {
log.Warningf(ctx, "failed to update replication job progress: %+v", err)
}
var b kv.Batch
for _, span := range spans {
b.AddRawRequest(&kvpb.RevertRangeRequest{
RequestHeader: kvpb.RequestHeader{
Key: span.Key,
EndKey: span.EndKey,
},
TargetTime: cutoverTime,
})
}
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize != 0 {
b.Header.MaxSpanRequestKeys = p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize
}
if err := db.Run(ctx, &b); err != nil {
return false, err
}

spans = spans[:0]
for _, raw := range b.RawResponse().Responses {
r := raw.GetRevertRange()
if r.ResumeSpan != nil {
if !r.ResumeSpan.Valid() {
return false, errors.Errorf("invalid resume span: %s", r.ResumeSpan)
}
spans = append(spans, *r.ResumeSpan)
}
}
batchSize := int64(sql.RevertTableDefaultBatchSize)
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize != 0 {
batchSize = p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize
}
if err := sql.RevertSpans(ctx,
p.ExecCfg().DB,
[]roachpb.Span{spanToRevert},
cutoverTimestamp,
// TODO(ssd): It should be safe for us to ingore the
// GC threshold. Why weren't we before?
false, /* ignoreGCThreshold */
batchSize,
progUpdater.onCompletedCallback); err != nil {
return false, err
}
return true, updateJobProgress()
return true, nil
}

func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachpb.TenantID) error {
Expand Down Expand Up @@ -701,6 +673,108 @@ func (s *streamIngestionResumer) OnFailOrCancel(
})
}

// cutoverProgressTracker updates the job progress and the given
// metric with the number of ranges still remainng to revert during
// the cutover process.
type cutoverProgressTracker struct {
minProgressUpdateInterval time.Duration
progMetric *metric.Gauge
job *jobs.Job

remainingSpans roachpb.SpanGroup
lastUpdatedAt time.Time
originalRangeCount int

getRangeCount func(context.Context, roachpb.Spans) (int, error)
onJobProgressUpdate func()
overrideShouldUpdateJobProgress func() bool
}

func newCutoverProgressTracker(
ctx context.Context,
p sql.JobExecContext,
spanToRevert roachpb.Span,
job *jobs.Job,
progMetric *metric.Gauge,
minProgressUpdateInterval time.Duration,
) (*cutoverProgressTracker, error) {
var sg roachpb.SpanGroup
sg.Add(spanToRevert)

nRanges, err := sql.NumRangesInSpans(ctx, p.ExecCfg().DB, p.DistSQLPlanner(), sg.Slice())
if err != nil {
return nil, err
}
c := &cutoverProgressTracker{
job: job,
progMetric: progMetric,
minProgressUpdateInterval: minProgressUpdateInterval,

remainingSpans: sg,
originalRangeCount: nRanges,

getRangeCount: func(ctx context.Context, sps roachpb.Spans) (int, error) {
return sql.NumRangesInSpans(ctx, p.ExecCfg().DB, p.DistSQLPlanner(), sg.Slice())
},
}
if testingKnobs := p.ExecCfg().StreamingTestingKnobs; testingKnobs != nil {
c.overrideShouldUpdateJobProgress = testingKnobs.CutoverProgressShouldUpdate
c.onJobProgressUpdate = testingKnobs.OnCutoverProgressUpdate
}
return c, nil

}

func (c *cutoverProgressTracker) shouldUpdateJobProgress() bool {
if c.overrideShouldUpdateJobProgress != nil {
return c.overrideShouldUpdateJobProgress()
}
return timeutil.Since(c.lastUpdatedAt) >= c.minProgressUpdateInterval
}

func (c *cutoverProgressTracker) updateJobProgress(ctx context.Context, sp []roachpb.Span) error {
nRanges, err := c.getRangeCount(ctx, sp)
if err != nil {
return err
}

c.progMetric.Update(int64(nRanges))

// We set lastUpdatedAt even though we might not actually
// update the job record below. We do this to avoid asking for
// the range count too often.
c.lastUpdatedAt = timeutil.Now()

// If our fraction is going to actually move, avoid touching
// the job record.
if nRanges >= c.originalRangeCount {
return nil
}

fractionRangesFinished := float32(c.originalRangeCount-nRanges) / float32(c.originalRangeCount)
if err := c.job.NoTxn().FractionProgressed(ctx, jobs.FractionUpdater(fractionRangesFinished)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
if c.onJobProgressUpdate != nil {
c.onJobProgressUpdate()
}
return nil
}

func (c *cutoverProgressTracker) onCompletedCallback(
ctx context.Context, completed roachpb.Span,
) error {
c.remainingSpans.Sub(completed)
if !c.shouldUpdateJobProgress() {
return nil
}

if err := c.updateJobProgress(ctx, c.remainingSpans.Slice()); err != nil {
log.Warningf(ctx, "failed to update job progress: %v", err)
}
return nil
}

func (s *streamIngestionResumer) ForceRealSpan() bool { return true }

var _ jobs.Resumer = &streamIngestionResumer{}
Expand Down
Loading

0 comments on commit ff6c3a7

Please sign in to comment.