Skip to content

Commit

Permalink
jobs,migration: introduce long running migration job
Browse files Browse the repository at this point in the history
This commit introduces a job to run long-running migration. This empowers
long-running migrations with leases, pausability, and cancelability.

Fixes cockroachdb#58183

Release note: None
  • Loading branch information
ajwerner committed Feb 9, 2021
1 parent cde18f5 commit e094a7a
Show file tree
Hide file tree
Showing 22 changed files with 1,308 additions and 476 deletions.
6 changes: 4 additions & 2 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/cockroachdb/errors"
)

const claimableStatusTupleString = `(` +
// NonTerminalStatusTupleString is a sql tuple corresponding to statuses of
// non-terminal jobs.
const NonTerminalStatusTupleString = `(` +
`'` + string(StatusRunning) + `', ` +
`'` + string(StatusPending) + `', ` +
`'` + string(StatusCancelRequested) + `', ` +
Expand All @@ -42,7 +44,7 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
WHERE claim_session_id IS NULL
AND status IN `+claimableStatusTupleString+`
AND status IN `+NonTerminalStatusTupleString+`
ORDER BY created DESC
LIMIT $3
RETURNING id;`,
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ proto_library(
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion:clusterversion_proto",
"//pkg/roachpb:roachpb_proto",
"//pkg/sql/catalog/descpb:descpb_proto",
"//pkg/sql/schemachanger/scpb:scpb_proto",
Expand All @@ -42,6 +43,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl", # keep
"//pkg/clusterversion",
"//pkg/roachpb",
"//pkg/security", # keep
"//pkg/sql/catalog/descpb",
Expand Down
1,337 changes: 900 additions & 437 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import "sql/catalog/descpb/structured.proto";
import "sql/catalog/descpb/tenant.proto";
import "util/hlc/timestamp.proto";
import "sql/schemachanger/scpb/scpb.proto";
import "clusterversion/cluster_version.proto";

message Lease {
option (gogoproto.equal) = true;
Expand Down Expand Up @@ -322,6 +323,7 @@ message NewSchemaChangeProgress {
repeated cockroach.sql.schemachanger.scpb.State states = 1;
}


message ResumeSpanList {
repeated roachpb.Span resume_spans = 1 [(gogoproto.nullable) = false];
}
Expand Down Expand Up @@ -577,6 +579,14 @@ message CreateStatsProgress {

}

message LongRunningMigrationDetails {
clusterversion.ClusterVersion cluster_version = 1;
}

message LongRunningMigrationProgress {

}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -619,6 +629,7 @@ message Payload {
TypeSchemaChangeDetails typeSchemaChange = 22;
StreamIngestionDetails streamIngestion = 23;
NewSchemaChangeDetails newSchemaChange = 24;
LongRunningMigrationDetails longRunningMigration = 25;
}
}

Expand All @@ -641,6 +652,7 @@ message Progress {
TypeSchemaChangeProgress typeSchemaChange = 17;
StreamIngestionProgress streamIngest = 18;
NewSchemaChangeProgress newSchemaChange = 19;
LongRunningMigrationProgress longRunningMigration = 20;
}
}

Expand All @@ -662,6 +674,7 @@ enum Type {
TYPEDESC_SCHEMA_CHANGE = 9 [(gogoproto.enumvalue_customname) = "TypeTypeSchemaChange"];
STREAM_INGESTION = 10 [(gogoproto.enumvalue_customname) = "TypeStreamIngestion"];
NEW_SCHEMA_CHANGE = 11 [(gogoproto.enumvalue_customname) = "TypeNewSchemaChange"];
LONG_RUNNING_MIGRATION = 12 [(gogoproto.enumvalue_customname) = "TypeLongRunningMigration"];
}

message Job {
Expand Down
14 changes: 13 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var _ Details = CreateStatsDetails{}
var _ Details = SchemaChangeGCDetails{}
var _ Details = StreamIngestionDetails{}
var _ Details = NewSchemaChangeDetails{}
var _ Details = LongRunningMigrationDetails{}

// ProgressDetails is a marker interface for job progress details proto structs.
type ProgressDetails interface{}
Expand All @@ -42,6 +43,7 @@ var _ ProgressDetails = CreateStatsProgress{}
var _ ProgressDetails = SchemaChangeGCProgress{}
var _ ProgressDetails = StreamIngestionProgress{}
var _ ProgressDetails = NewSchemaChangeProgress{}
var _ ProgressDetails = LongRunningMigrationProgress{}

// Type returns the payload's job type.
func (p *Payload) Type() Type {
Expand Down Expand Up @@ -75,6 +77,8 @@ func DetailsType(d isPayload_Details) Type {
return TypeStreamIngestion
case *Payload_NewSchemaChange:
return TypeNewSchemaChange
case *Payload_LongRunningMigration:
return TypeLongRunningMigration
default:
panic(errors.AssertionFailedf("Payload.Type called on a payload with an unknown details type: %T", d))
}
Expand Down Expand Up @@ -109,6 +113,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_StreamIngest{StreamIngest: &d}
case NewSchemaChangeProgress:
return &Progress_NewSchemaChange{NewSchemaChange: &d}
case LongRunningMigrationProgress:
return &Progress_LongRunningMigration{LongRunningMigration: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -138,6 +144,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.StreamIngestion
case *Payload_NewSchemaChange:
return *d.NewSchemaChange
case *Payload_LongRunningMigration:
return *d.LongRunningMigration
default:
return nil
}
Expand Down Expand Up @@ -167,6 +175,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.StreamIngest
case *Progress_NewSchemaChange:
return *d.NewSchemaChange
case *Progress_LongRunningMigration:
return *d.LongRunningMigration
default:
return nil
}
Expand Down Expand Up @@ -209,6 +219,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_StreamIngestion{StreamIngestion: &d}
case NewSchemaChangeDetails:
return &Payload_NewSchemaChange{NewSchemaChange: &d}
case LongRunningMigrationDetails:
return &Payload_LongRunningMigration{LongRunningMigration: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -244,7 +256,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 12
const NumJobTypes = 13

func init() {
if len(Type_name) != NumJobTypes {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func (r *Registry) Start(
UPDATE system.jobs
SET claim_session_id = NULL
WHERE claim_session_id <> $1
AND status IN `+claimableStatusTupleString+`
AND status IN `+NonTerminalStatusTupleString+`
AND NOT crdb_internal.sql_liveness_is_alive(claim_session_id)`,
s.ID().UnsafeBytes(),
); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security",
"//pkg/server/serverpb",
"//pkg/util/log",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
3 changes: 2 additions & 1 deletion pkg/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/logtags"
)

// Manager coordinates long-running migrations.
type Manager interface {
Migrate(ctx context.Context, from, to clusterversion.ClusterVersion) error
Migrate(ctx context.Context, user security.SQLUsername, from, to clusterversion.ClusterVersion) error
}

// Cluster abstracts a physical KV cluster and can be utilized by a long-runnng
Expand Down
16 changes: 16 additions & 0 deletions pkg/migration/migrationjob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "migrationjob",
srcs = ["migration_job.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrationjob",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/migration",
"//pkg/migration/migrations",
"//pkg/settings/cluster",
"//pkg/sql",
],
)
53 changes: 53 additions & 0 deletions pkg/migration/migrationjob/migration_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package migrationjob contains the jobs.Resumer implementation
// used for long-running migrations.
package migrationjob

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/migration/migrations"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
)

func init() {
jobs.RegisterConstructor(jobspb.TypeLongRunningMigration, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &resumer{j: job}
})
}

type resumer struct {
j *jobs.Job
}

var _ jobs.Resumer = (*resumer)(nil)

func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error {
// TODO(ajwerner): add some check to see if we're done.
execCtx := execCtxI.(sql.JobExecContext)
pl := r.j.Payload()
cv := *pl.GetLongRunningMigration().ClusterVersion
m, ok := migrations.GetMigration(cv)
if !ok {
return nil
}
return m.(*migration.KVMigration).Run(ctx, cv, execCtx.MigrationCluster())
}

// The long-running migration resumer has no reverting logic.
func (r resumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return nil
}
13 changes: 12 additions & 1 deletion pkg/migration/migrationmanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/migration",
"//pkg/migration/migrationcluster",
"//pkg/migration/migrationjob",
"//pkg/migration/migrations",
"//pkg/security",
"//pkg/server/serverpb",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlutil",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand All @@ -26,6 +34,7 @@ go_test(
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/liveness",
"//pkg/migration",
Expand All @@ -35,11 +44,13 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/sqlutil",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
Loading

0 comments on commit e094a7a

Please sign in to comment.