Skip to content

Commit

Permalink
streamingccl: display fraction progressed when cutting over
Browse files Browse the repository at this point in the history
This change teaches the logic that issues RevertRange requests
on replication cutover to update the fraction progressed of the
replication job, as and when revisions in different ranges are
reverted to the cutover time. This should provide the user a
visual cue on the Jobs page in the console about how long is left
until the destination tenant will be marked active.

Fixes: #93448

Release note: None
  • Loading branch information
adityamaru committed Jan 16, 2023
1 parent 8643954 commit 706bf0e
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ go_test(
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/upgrade/upgradebase",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
Expand Down
33 changes: 32 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,36 @@ func maybeRevertToCutoverTimestamp(
}

updateRunningStatus(ctx, j, fmt.Sprintf("starting to cut over to the given timestamp %s", cutoverTime))

origNRanges := -1
spans := []roachpb.Span{sd.Span}
updateJobProgress := func() error {
if spans == nil {
return nil
}
nRanges, err := sql.NumRangesInSpans(ctx, p.ExecCfg().DB, p.DistSQLPlanner(), spans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}
return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
if err := j.FractionProgressed(ctx, txn,
jobs.FractionUpdater(fractionRangesFinished)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
}
return nil
})
}

for len(spans) != 0 {
if err := updateJobProgress(); err != nil {
log.Warningf(ctx, "failed to update replication job progress: %+v", err)
}
var b kv.Batch
for _, span := range spans {
b.AddRawRequest(&roachpb.RevertRangeRequest{
Expand All @@ -479,6 +507,9 @@ func maybeRevertToCutoverTimestamp(
})
}
b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize != 0 {
b.Header.MaxSpanRequestKeys = p.ExecCfg().StreamingTestingKnobs.OverrideRevertRangeBatchSize
}
if err := db.Run(ctx, &b); err != nil {
return false, err
}
Expand All @@ -494,7 +525,7 @@ func maybeRevertToCutoverTimestamp(
}
}
}
return true, j.SetProgress(ctx, nil /* txn */, *sp.StreamIngest)
return true, updateJobProgress()
}

func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachpb.TenantID) error {
Expand Down
138 changes: 138 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"fmt"
"net/url"
"strings"
"testing"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -382,3 +385,138 @@ func TestReplicationJobResumptionStartTime(t *testing.T) {
c.Cutover(producerJobID, replicationJobID, srcTime.GoTime())
jobutils.WaitForJobToSucceed(t, c.DestSysSQL, jobspb.JobID(replicationJobID))
}

func makeTableSpan(codec keys.SQLCodec, tableID uint32) roachpb.Span {
k := codec.TablePrefix(tableID)
return roachpb.Span{Key: k, EndKey: k.PrefixEnd()}
}

func TestCutoverFractionProgressed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

respRecvd := make(chan struct{})
continueRevert := make(chan struct{})
defer close(continueRevert)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.RevertRangeResponse:
respRecvd <- struct{}{}
<-continueRevert
}
}
return nil
},
},
Streaming: &sql.StreamingTestingKnobs{
OverrideRevertRangeBatchSize: 1,
},
},
DisableDefaultTestTenant: true,
})
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`CREATE TABLE foo(id) AS SELECT generate_series(1, 10)`)
require.NoError(t, err)

cutover := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

// Insert some revisions which we can revert to a timestamp before the update.
_, err = sqlDB.Exec(`UPDATE foo SET id = id + 1`)
require.NoError(t, err)

// Split every other row into its own range. Progress updates are on a
// per-range basis so we need >1 range to see the fraction progress.
_, err = sqlDB.Exec(`ALTER TABLE foo SPLIT AT (SELECT rowid FROM foo WHERE rowid % 2 = 0)`)
require.NoError(t, err)

var nRanges int
require.NoError(t, sqlDB.QueryRow(
`SELECT count(*) FROM [SHOW RANGES FROM TABLE foo]`).Scan(&nRanges))

require.Equal(t, nRanges, 6)
var id int
err = sqlDB.QueryRow(`SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&id)
require.NoError(t, err)

// Create a mock replication job with the `foo` table span so that on cut over
// we can revert the table's ranges.
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
jobExecCtx := &sql.FakeJobExecContext{ExecutorConfig: &execCfg}
mockReplicationJobDetails := jobspb.StreamIngestionDetails{
Span: makeTableSpan(execCfg.Codec, uint32(id)),
}
mockReplicationJobRecord := jobs.Record{
Details: mockReplicationJobDetails,
Progress: jobspb.StreamIngestionProgress{
CutoverTime: cutover,
},
Username: username.TestUserName(),
}
registry := execCfg.JobRegistry
jobID := registry.MakeJobID()
replicationJob, err := registry.CreateJobWithTxn(ctx, mockReplicationJobRecord, jobID, nil)
require.NoError(t, err)
require.NoError(t, replicationJob.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
return jobs.UpdateHighwaterProgressed(cutover, md, ju)
}))

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(respRecvd)
revert, err := maybeRevertToCutoverTimestamp(ctx, jobExecCtx, jobID)
require.NoError(t, err)
require.True(t, revert)
return nil
})

loadProgress := func() jobspb.Progress {
j, err := execCfg.JobRegistry.LoadJob(ctx, jobID)
require.NoError(t, err)
return j.Progress()
}
progressMap := map[string]bool{
"0.00": false,
"0.17": false,
"0.33": false,
"0.50": false,
"0.67": false,
"0.83": false,
}
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-respRecvd:
if !ok {
return nil
}
sip := loadProgress()
curProgress := sip.GetFractionCompleted()
s := fmt.Sprintf("%.2f", curProgress)
if _, ok := progressMap[s]; !ok {
t.Fatalf("unexpected progress fraction %s", s)
}
progressMap[s] = true
continueRevert <- struct{}{}
}
}
})
require.NoError(t, g.Wait())
sip := loadProgress()
require.Equal(t, sip.GetFractionCompleted(), float32(1))

// Ensure we have hit all our expected progress fractions.
for k, v := range progressMap {
if !v {
t.Fatalf("failed to see progress fraction %s", k)
}
}
}
14 changes: 7 additions & 7 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,11 +843,11 @@ func getJobIDForMutationWithDescriptor(
"job not found for table id %d, mutation %d", tableDesc.GetID(), mutationID)
}

// numRangesInSpans returns the number of ranges that cover a set of spans.
// NumRangesInSpans returns the number of ranges that cover a set of spans.
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func numRangesInSpans(
// It operates entirely on the current goroutine and is thus able to reuse an
// existing kv.Txn safely.
func NumRangesInSpans(
ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span,
) (int, error) {
txn := db.NewTxn(ctx, "num-ranges-in-spans")
Expand Down Expand Up @@ -1099,7 +1099,7 @@ func (sc *SchemaChanger) distIndexBackfill(
if updatedTodoSpans == nil {
return nil
}
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, updatedTodoSpans)
nRanges, err := NumRangesInSpans(ctx, sc.db, sc.distSQLPlanner, updatedTodoSpans)
if err != nil {
return err
}
Expand Down Expand Up @@ -1252,7 +1252,7 @@ func (sc *SchemaChanger) distColumnBackfill(
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
nRanges, err := NumRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
if err != nil {
return err
}
Expand Down Expand Up @@ -2889,7 +2889,7 @@ func (sc *SchemaChanger) distIndexMerge(
// TODO(rui): these can be initialized along with other new schema changer dependencies.
planner := NewIndexBackfillerMergePlanner(sc.execCfg)
rc := func(ctx context.Context, spans []roachpb.Span) (int, error) {
return numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, spans)
return NumRangesInSpans(ctx, sc.db, sc.distSQLPlanner, spans)
}
tracker := NewIndexMergeTracker(progress, sc.job, rc, fractionScaler)
periodicFlusher := newPeriodicProgressFlusher(sc.settings)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,10 @@ type StreamingTestingKnobs struct {
// frontier specs generated for the replication job.
AfterReplicationFlowPlan func([]*execinfrapb.StreamIngestionDataSpec,
*execinfrapb.StreamIngestionFrontierSpec)

// OverrideRevertRangeBatchSize allows overriding the `MaxSpanRequestKeys`
// used when sending a RevertRange request.
OverrideRevertRangeBatchSize int64
}

var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/job_exec_context_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func (p *FakeJobExecContext) SessionDataMutatorIterator() *sessionDataMutatorIte

// DistSQLPlanner implements the JobExecContext interface.
func (p *FakeJobExecContext) DistSQLPlanner() *DistSQLPlanner {
panic("unimplemented")
if p.ExecutorConfig == nil {
panic("unimplemented")
}
return p.ExecutorConfig.DistSQLPlanner
}

// LeaseMgr implements the JobExecContext interface.
Expand Down

0 comments on commit 706bf0e

Please sign in to comment.