Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61113: ui: show replica type on the range report page r=aayushshah15 a=aayushshah15

Resolves #59677 

Release justification: observability improvement

Release note (ui change): the range report page on the admin ui will now
also show each of the replica's types

61128: jobs: introduce jobspb.JobID r=lucy-zhang a=lucy-zhang

This commit introduces a `jobspb.JobID` int64 type and uses it in most
places where we were previously using an int64.

Closes #61121.

Release justification: Low-risk change to existing functionality.

Release note: None

61129: geo/wkt: update parsing of dimensions for empty geometrycollections r=otan,rafiss a=andyyang890

Previously, the data structure used for storing geometry collections
was unable to store a layout, which made it impossible to distinguish
empty geometry collections of different layouts. That issue has since
been fixed and this patch updates the parser accordingly.

Resolves #61035.

Refs: #53091

Release justification: bug fix for new functionality
Release note: None

61130: kv: disable timestamp cache + current clock assertion r=nvanbenschoten a=nvanbenschoten

Closes #60580.
Closes #60736.
Closes #60779.
Closes #61060.

This was added in 218a5a3. The check was more of a sanity check that we have and
always have had an understand of the timestamps that can enter the timestamp
cache. The fact that it's failing is a clear indication that there were issues
in past releases, because a lease transfer used to only be safe if the outgoing
leaseholder's clock was above the time of any read in its timestamp cache. We
now ship a snapshot of the timestamp cache on lease transfers, so that invariant
is less important.

I'd still like to get to the bottom of this, but I'll do so on my own branch,
off of master where it's causing disruption.

Release justification: avoid assertion failures

61155: jobs: make sure we finish spans if canceled before starting job r=ajwerner a=ajwerner

Was seeing:
```
    testcluster.go:135: condition failed to evaluate within 45s: unexpectedly found active spans:
             0.000ms      0.000ms    === operation:job _unfinished:1 intExec:create-stats
        goroutine 84 [running]:
        runtime/debug.Stack(0xc0086b1890, 0x792e940, 0xc009ac37e0)
        	/usr/local/go/src/runtime/debug/stack.go:24 +0xab
```

In roachprod stressrace with a big cluster. This seemed to fix it.

Release justification: bug fixes and low-risk updates to new functionality.

Release note: None

Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Lucy Zhang <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
6 people committed Feb 26, 2021
6 parents 6e41db6 + 2f4572c + 1e3f1b7 + 482e83b + 54f29fe + 3cef09e commit 8f612b4
Show file tree
Hide file tree
Showing 81 changed files with 1,128 additions and 1,024 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ func backupPlanHook(
// 2. Verifies we can write to destination location.
// This temporary checkpoint file gets renamed to real checkpoint
// file when the backup jobs starts execution.
doWriteBackupManifestCheckpoint := func(ctx context.Context, jobID int64) error {
doWriteBackupManifestCheckpoint := func(ctx context.Context, jobID jobspb.JobID) error {
if err := writeBackupManifest(
ctx, p.ExecCfg().Settings, defaultStore, tempCheckpointFileNameForJob(jobID),
encryptionOptions, &backupManifest,
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func protectTimestampForBackup(
ctx context.Context,
p sql.PlanHookState,
txn *kv.Txn,
jobID int64,
jobID jobspb.JobID,
spans []roachpb.Span,
startTime, endTime hlc.Timestamp,
backupDetails jobspb.BackupDetails,
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,8 +1278,8 @@ type inProgressState struct {
dir, name string
}

func (ip inProgressState) latestJobID() (int64, error) {
var id int64
func (ip inProgressState) latestJobID() (jobspb.JobID, error) {
var id jobspb.JobID
if err := ip.QueryRow(
`SELECT job_id FROM crdb_internal.jobs ORDER BY created DESC LIMIT 1`,
).Scan(&id); err != nil {
Expand Down Expand Up @@ -1514,7 +1514,7 @@ func createAndWaitForJob(
t.Fatal(err)
}

var jobID int64
var jobID jobspb.JobID
db.QueryRow(
t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes,
Expand Down Expand Up @@ -1642,7 +1642,7 @@ func TestBackupRestoreResume(t *testing.T) {
})
}

func getHighWaterMark(jobID int64, sqlDB *gosql.DB) (roachpb.Key, error) {
func getHighWaterMark(jobID jobspb.JobID, sqlDB *gosql.DB) (roachpb.Key, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, jobID,
Expand Down Expand Up @@ -1897,7 +1897,7 @@ func TestRestoreFailCleanup(t *testing.T) {
// stress test, it is here for safety.
blockGC := make(chan struct{})
params.Knobs.GCJob = &sql.GCJobTestingKnobs{
RunBeforeResume: func(_ int64) error {
RunBeforeResume: func(_ jobspb.JobID) error {
<-blockGC
return nil
},
Expand Down Expand Up @@ -5043,7 +5043,7 @@ func TestFileIOLimits(t *testing.T) {
)
}

func waitForSuccessfulJob(t *testing.T, tc *testcluster.TestCluster, id int64) {
func waitForSuccessfulJob(t *testing.T, tc *testcluster.TestCluster, id jobspb.JobID) {
// Force newly created job to be adopted and verify it succeeds.
tc.Server(0).JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
testutils.SucceedsSoon(t, func() error {
Expand All @@ -5065,7 +5065,7 @@ func TestDetachedBackup(t *testing.T) {
db := sqlDB.DB.(*gosql.DB)

// running backup under transaction requires DETACHED.
var jobID int64
var jobID jobspb.JobID
err := crdb.ExecuteTx(ctx, db, nil /* txopts */, func(tx *gosql.Tx) error {
return tx.QueryRow(`BACKUP DATABASE data TO $1`, LocalFoo).Scan(&jobID)
})
Expand Down Expand Up @@ -5118,7 +5118,7 @@ func TestDetachedRestore(t *testing.T) {
sqlDB.Exec(t, `CREATE DATABASE test`)

// Running RESTORE under transaction requires DETACHED.
var jobID int64
var jobID jobspb.JobID
err := crdb.ExecuteTx(ctx, db, nil /* txopts */, func(tx *gosql.Tx) error {
return tx.QueryRow(`RESTORE TABLE t FROM $1 WITH INTO_DB=test`, LocalFoo).Scan(&jobID)
})
Expand Down Expand Up @@ -5703,7 +5703,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
func(t *testing.T, query string, sqlDB *sqlutils.SQLRunner) {
backupWithDetachedOption := query + ` WITH DETACHED`
db := sqlDB.DB.(*gosql.DB)
var jobID int64
var jobID jobspb.JobID
err := crdb.ExecuteTx(ctx, db, nil /* txopts */, func(tx *gosql.Tx) error {
return tx.QueryRow(backupWithDetachedOption).Scan(&jobID)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
blockCh := make(chan struct{})
defer close(blockCh)
params.Knobs.GCJob = &sql.GCJobTestingKnobs{
RunBeforeResume: func(_ int64) error { <-blockCh; return nil },
RunBeforeResume: func(_ jobspb.JobID) error { <-blockCh; return nil },
}

const numAccounts = 1000
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,6 @@ func checkForPreviousBackup(
}

// tempCheckpointFileNameForJob returns temporary filename for backup manifest checkpoint.
func tempCheckpointFileNameForJob(jobID int64) string {
func tempCheckpointFileNameForJob(jobID jobspb.JobID) string {
return fmt.Sprintf("%s-%d", backupManifestCheckpointName, jobID)
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func createSchemaChangeJobsFromMutations(
}
newMutationJob := descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: jobID,
JobID: int64(jobID),
}
mutationJobs = append(mutationJobs, newMutationJob)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func planBackup(
// NotifyJobTermination implements jobs.ScheduledJobExecutor interface.
func (e *scheduledBackupExecutor) NotifyJobTermination(
ctx context.Context,
jobID int64,
jobID jobspb.JobID,
jobStatus jobs.Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ type jobFeed struct {
db *gosql.DB
flushCh chan struct{}

JobID int64
JobID jobspb.JobID
jobErr error
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createProtectedTimestampRecord(
codec keys.SQLCodec,
pts protectedts.Storage,
txn *kv.Txn,
jobID int64,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
func distChangefeedFlow(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2158,7 +2158,7 @@ func TestChangefeedDescription(t *testing.T) {
sink.Scheme = changefeedbase.SinkSchemeExperimentalSQL
sink.Path = `d`

var jobID int64
var jobID jobspb.JobID
sqlDB.QueryRow(t,
`CREATE CHANGEFEED FOR foo INTO $1 WITH updated, envelope = $2`, sink.String(), `wrapped`,
).Scan(&jobID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeeddist/distflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var ChangefeedResultTypes = []*types.T{
func StartDistChangefeed(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
trackedSpans []roachpb.Span,
initialHighWater hlc.Timestamp,
Expand Down
16 changes: 8 additions & 8 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ var _ jobs.Resumer = &cancellableImportResumer{}

type cancellableImportResumer struct {
ctx context.Context
jobIDCh chan int64
jobID int64
jobIDCh chan jobspb.JobID
jobID jobspb.JobID
onSuccessBarrier syncBarrier
wrapped *importResumer
}
Expand Down Expand Up @@ -590,7 +590,7 @@ type jobState struct {
prog jobspb.ImportProgress
}

func queryJob(db sqlutils.DBHandle, jobID int64) (js jobState) {
func queryJob(db sqlutils.DBHandle, jobID jobspb.JobID) (js jobState) {
js = jobState{
err: nil,
status: "",
Expand Down Expand Up @@ -623,7 +623,7 @@ func queryJob(db sqlutils.DBHandle, jobID int64) (js jobState) {

// Repeatedly queries job status/progress until specified function returns true.
func queryJobUntil(
t *testing.T, db sqlutils.DBHandle, jobID int64, isDone func(js jobState) bool,
t *testing.T, db sqlutils.DBHandle, jobID jobspb.JobID, isDone func(js jobState) bool,
) (js jobState) {
t.Helper()
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
Expand Down Expand Up @@ -666,8 +666,8 @@ func TestCSVImportCanBeResumed(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1
jobIDCh := make(chan jobspb.JobID)
var jobID jobspb.JobID = -1
var importSummary backupccl.RowCount

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
Expand Down Expand Up @@ -772,10 +772,10 @@ func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
sqlDB.Exec(t, "CREATE TABLE t (id INT, data STRING)")
defer sqlDB.Exec(t, `DROP TABLE t`)

jobIDCh := make(chan int64)
jobIDCh := make(chan jobspb.JobID)
controllerBarrier, importBarrier := newSyncBarrier()

var jobID int64 = -1
var jobID jobspb.JobID = -1
var importSummary backupccl.RowCount

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func protectTimestampForImport(
ctx context.Context,
p sql.PlanHookState,
txn *kv.Txn,
jobID int64,
jobID jobspb.JobID,
spansToProtect []roachpb.Span,
walltime int64,
importDetails jobspb.ImportDetails,
Expand Down Expand Up @@ -1270,14 +1270,14 @@ func writeNonDropDatabaseChange(
descsCol *descs.Collection,
p sql.JobExecContext,
jobDesc string,
) ([]int64, error) {
) ([]jobspb.JobID, error) {
var job *jobs.Job
var err error
if job, err = createNonDropDatabaseChangeJob(p.User(), desc.ID, jobDesc, p, txn); err != nil {
return nil, err
}

queuedJob := []int64{job.ID()}
queuedJob := []jobspb.JobID{job.ID()}
b := txn.NewBatch()
dg := catalogkv.NewOneLevelUncachedDescGetter(txn, p.ExecCfg().Codec)
if err := desc.Validate(ctx, dg); err != nil {
Expand Down Expand Up @@ -1801,7 +1801,7 @@ type preparedSchemaMetadata struct {
schemaRewrites backupccl.DescRewriteMap
newSchemaIDToName map[descpb.ID]string
oldSchemaIDToName map[descpb.ID]string
queuedSchemaJobs []int64
queuedSchemaJobs []jobspb.JobID
}

// Resume is part of the jobs.Resumer interface.
Expand Down Expand Up @@ -2143,7 +2143,7 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{})
addToFileFormatTelemetry(details.Format.Format.String(), "failed")
cfg := execCtx.(sql.JobExecContext).ExecCfg()
lm, ie, db := cfg.LeaseManager, cfg.InternalExecutor, cfg.DB
var jobsToRunAfterTxnCommit []int64
var jobsToRunAfterTxnCommit []jobspb.JobID
if err := descs.Txn(ctx, cfg.Settings, lm, ie, db, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func (r *importResumer) dropSchemas(
descsCol *descs.Collection,
execCfg *sql.ExecutorConfig,
p sql.JobExecContext,
) ([]int64, error) {
) ([]jobspb.JobID, error) {
details := r.job.Details().(jobspb.ImportDetails)

// If the prepare step of the import job was not completed then the
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,7 +1698,7 @@ func TestImportCSVStmt(t *testing.T) {
SQLMemoryPoolSize: 256 << 20,
ExternalIODir: baseDir,
Knobs: base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ int64) error { <-blockGC; return nil }},
GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { <-blockGC; return nil }},
},
}})
defer tc.Stopper().Stop(ctx)
Expand Down Expand Up @@ -4200,8 +4200,8 @@ func TestImportDefaultWithResume(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t (%s)`, test.create))

jobCtx, cancelImport := context.WithCancel(ctx)
jobIDCh := make(chan int64)
var jobID int64 = -1
jobIDCh := make(chan jobspb.JobID)
var jobID jobspb.JobID = -1

registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
// Arrange for our special job resumer to be
Expand Down Expand Up @@ -4835,7 +4835,7 @@ func TestImportWorkerFailure(t *testing.T) {
case err := <-errCh:
t.Fatalf("%s: query returned before expected: %s", err, query)
}
var jobID int64
var jobID jobspb.JobID
sqlDB.QueryRow(t, `SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1`).Scan(&jobID)

// Shut down a node. This should force LoadCSV to fail in its current
Expand Down Expand Up @@ -4934,7 +4934,7 @@ func TestImportLivenessWithRestart(t *testing.T) {
}
}
// Fetch the new job ID and lease since we know it's running now.
var jobID int64
var jobID jobspb.JobID
originalLease := &jobspb.Progress{}
{
var expectedLeaseBytes []byte
Expand Down Expand Up @@ -5066,7 +5066,7 @@ func TestImportLivenessWithLeniency(t *testing.T) {
}
}
// Fetch the new job ID and lease since we know it's running now.
var jobID int64
var jobID jobspb.JobID
originalLease := &jobspb.Payload{}
{
var expectedLeaseBytes []byte
Expand Down Expand Up @@ -6659,7 +6659,9 @@ func putUserfile(
return tx.Commit()
}

func waitForJobResult(t *testing.T, tc *testcluster.TestCluster, id int64, expected jobs.Status) {
func waitForJobResult(
t *testing.T, tc *testcluster.TestCluster, id jobspb.JobID, expected jobs.Status,
) {
// Force newly created job to be adopted and verify its result.
tc.Server(0).JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -6695,7 +6697,7 @@ func TestDetachedImport(t *testing.T) {
importIntoQueryDetached := importIntoQuery + " WITH DETACHED"

// DETACHED import w/out transaction is okay.
var jobID int64
var jobID jobspb.JobID
sqlDB.QueryRow(t, importQueryDetached, simpleOcf).Scan(&jobID)
waitForJobResult(t, tc, jobID, jobs.StatusSucceeded)

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/partitionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_test(
"//pkg/ccl/utilccl",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/roachpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{
RunBeforeResume: func(_ int64) error {
RunBeforeResume: func(_ jobspb.JobID) error {
<-asyncNotification
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func ingest(
execCtx sql.JobExecContext,
streamAddress streamingccl.StreamAddress,
progress jobspb.Progress,
jobID int64,
jobID jobspb.JobID,
) error {
// Initialize a stream client and resolve topology.
client, err := streamclient.NewStreamClient(streamAddress)
Expand Down
Loading

0 comments on commit 8f612b4

Please sign in to comment.