diff --git a/docs/generated/http/BUILD.bazel b/docs/generated/http/BUILD.bazel index c5d811482171..cc9dd4b5dd4c 100644 --- a/docs/generated/http/BUILD.bazel +++ b/docs/generated/http/BUILD.bazel @@ -18,6 +18,7 @@ genrule( "//pkg/multitenant/mtinfopb:mtinfopb_proto", "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb:tenantcapabilitiespb_proto", "//pkg/roachpb:roachpb_proto", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb_proto", "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_proto", "//pkg/server/serverpb:serverpb_proto", "//pkg/server/status/statuspb:statuspb_proto", diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5f6fd4af88f9..d7b0790c5c4a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -276,6 +276,7 @@ ALL_TESTS = [ "//pkg/security/username:username_disallowed_imports_test", "//pkg/security/username:username_test", "//pkg/security:security_test", + "//pkg/server/autoconfig:autoconfig_test", "//pkg/server/debug/goroutineui:goroutineui_test", "//pkg/server/debug/pprofui:pprofui_test", "//pkg/server/debug:debug_test", @@ -1431,7 +1432,10 @@ GO_TARGETS = [ "//pkg/security/username:username_test", "//pkg/security:security", "//pkg/security:security_test", + "//pkg/server/autoconfig/acprovider:acprovider", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb", "//pkg/server/autoconfig:autoconfig", + "//pkg/server/autoconfig:autoconfig_test", "//pkg/server/debug/goroutineui:goroutineui", "//pkg/server/debug/goroutineui:goroutineui_test", "//pkg/server/debug/pprofui:pprofui", @@ -2814,6 +2818,8 @@ GET_X_DATA_TARGETS = [ "//pkg/security/username:get_x_data", "//pkg/server:get_x_data", "//pkg/server/autoconfig:get_x_data", + "//pkg/server/autoconfig/acprovider:get_x_data", + "//pkg/server/autoconfig/autoconfigpb:get_x_data", "//pkg/server/debug:get_x_data", "//pkg/server/debug/goroutineui:get_x_data", "//pkg/server/debug/pprofui:get_x_data", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 5183a1f09522..eb6bf757970a 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -56,4 +56,5 @@ type TestingKnobs struct { LOQRecovery ModuleTestingKnobs KeyVisualizer ModuleTestingKnobs TenantCapabilitiesTestingKnobs ModuleTestingKnobs + AutoConfig ModuleTestingKnobs } diff --git a/pkg/ccl/testccl/sqlccl/tenant_gc_test.go b/pkg/ccl/testccl/sqlccl/tenant_gc_test.go index 3d90f250887c..08c0d4586b4e 100644 --- a/pkg/ccl/testccl/sqlccl/tenant_gc_test.go +++ b/pkg/ccl/testccl/sqlccl/tenant_gc_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -470,7 +471,7 @@ func TestGCTableOrIndexWaitsForProtectedTimestamps(t *testing.T) { func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 94808) + defer gcjob.SetSmallMaxGCIntervalForTest()() ctx := context.Background() @@ -614,8 +615,11 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) { tenID := roachpb.MustMakeTenantID(10) sqlDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;") + tenantStopper := stop.NewStopper() + defer tenantStopper.Stop(ctx) // in case the test fails prematurely. + ten, conn10 := serverutils.StartTenant(t, srv, - base.TestTenantArgs{TenantID: tenID, Stopper: srv.Stopper()}) + base.TestTenantArgs{TenantID: tenID, Stopper: tenantStopper}) defer conn10.Close() // Write a cluster PTS record as the tenant. @@ -630,6 +634,10 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) { return tenPtp.WithTxn(txn).Protect(ctx, rec) })) + // Ensure the secondary tenant is not running any more tasks. + tenantStopper.Stop(ctx) + + // Drop the record. sqlDB.Exec(t, `DROP TENANT [$1]`, tenID.ToUint64()) sqlDB.CheckQueryResultsRetry( diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index e6d6843203b0..dab2ab513c27 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -50,6 +50,7 @@ PROTOBUF_SRCS = [ "//pkg/repstream/streampb:streampb_go_proto", "//pkg/roachpb:roachpb_go_proto", "//pkg/rpc:rpc_go_proto", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb_go_proto", "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_go_proto", "//pkg/server/serverpb:serverpb_go_proto", "//pkg/server/status/statuspb:statuspb_go_proto", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 22e9a0e9a816..41742a66e9e5 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -422,6 +422,10 @@ func (r *Registry) addAdoptedJob( func (r *Registry) runJob( ctx context.Context, resumer Resumer, job *Job, status Status, taskName string, ) error { + if r.IsDraining() { + return errors.Newf("refusing to start %q; job registry is draining", taskName) + } + job.mu.Lock() var finalResumeError error if job.mu.payload.FinalResumeError != nil { diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index bbd4e42e80b9..7e821431f20b 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -32,6 +32,7 @@ proto_library( "//pkg/kv/kvpb:kvpb_proto", "//pkg/multitenant/mtinfopb:mtinfopb_proto", "//pkg/roachpb:roachpb_proto", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb_proto", "//pkg/sql/catalog/descpb:descpb_proto", "//pkg/sql/sessiondatapb:sessiondatapb_proto", "//pkg/util/hlc:hlc_proto", @@ -54,6 +55,7 @@ go_proto_library( "//pkg/multitenant/mtinfopb", "//pkg/roachpb", "//pkg/security/username", # keep + "//pkg/server/autoconfig/autoconfigpb", "//pkg/sql/catalog/catpb", # keep "//pkg/sql/catalog/descpb", "//pkg/sql/sem/tree", # keep diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 4331f9480051..d97ce193bd6d 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -24,6 +24,7 @@ import "sql/sessiondatapb/session_data.proto"; import "util/hlc/timestamp.proto"; import "clusterversion/cluster_version.proto"; import "google/protobuf/timestamp.proto"; +import "server/autoconfig/autoconfigpb/autoconfig.proto"; enum EncryptionMode { Passphrase = 0; @@ -1177,6 +1178,21 @@ message AutoConfigRunnerDetails { message AutoConfigRunnerProgress { } +message AutoConfigEnvRunnerDetails { + string env_id = 1 [(gogoproto.customname) = "EnvID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb.EnvironmentID"]; +} + +message AutoConfigEnvRunnerProgress { +} + +message AutoConfigTaskDetails { + string env_id = 1 [(gogoproto.customname) = "EnvID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb.EnvironmentID"]; + cockroach.server.autoconfig.autoconfigpb.Task task = 2 [(gogoproto.nullable) = false]; +} + +message AutoConfigTaskProgress { +} + message Payload { string description = 1; // If empty, the description is assumed to be the statement. @@ -1236,6 +1252,8 @@ message Payload { PollJobsStatsDetails poll_jobs_stats = 39; AutoConfigRunnerDetails auto_config_runner = 41; + AutoConfigEnvRunnerDetails auto_config_env_runner = 42; + AutoConfigTaskDetails auto_config_task = 43; } reserved 26; // PauseReason is used to describe the reason that the job is currently paused @@ -1263,7 +1281,7 @@ message Payload { // specifies how old such record could get before this job is canceled. int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"]; - // NEXT ID: 42 + // NEXT ID: 44 } message Progress { @@ -1306,6 +1324,8 @@ message Progress { KeyVisualizerProgress keyVisualizerProgress = 27; PollJobsStatsProgress pollJobsStats = 28; AutoConfigRunnerProgress auto_config_runner = 29; + AutoConfigEnvRunnerProgress auto_config_env_runner = 30; + AutoConfigTaskProgress auto_config_task = 31; } uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; @@ -1338,6 +1358,8 @@ enum Type { KEY_VISUALIZER = 18 [(gogoproto.enumvalue_customname) = "TypeKeyVisualizer"]; POLL_JOBS_STATS = 19 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"]; AUTO_CONFIG_RUNNER = 20 [(gogoproto.enumvalue_customname) = "TypeAutoConfigRunner"]; + AUTO_CONFIG_ENV_RUNNER = 21 [(gogoproto.enumvalue_customname) = "TypeAutoConfigEnvRunner"]; + AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"]; } message Job { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index a97443c8b43b..7855a6035605 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -49,6 +49,8 @@ var ( _ Details = SchemaTelemetryDetails{} _ Details = KeyVisualizerDetails{} _ Details = AutoConfigRunnerDetails{} + _ Details = AutoConfigEnvRunnerDetails{} + _ Details = AutoConfigTaskDetails{} ) // ProgressDetails is a marker interface for job progress details proto structs. @@ -70,6 +72,8 @@ var ( _ ProgressDetails = SchemaTelemetryProgress{} _ ProgressDetails = KeyVisualizerProgress{} _ ProgressDetails = AutoConfigRunnerProgress{} + _ ProgressDetails = AutoConfigEnvRunnerProgress{} + _ ProgressDetails = AutoConfigTaskProgress{} ) // Type returns the payload's job type and panics if the type is invalid. @@ -149,6 +153,8 @@ var AutomaticJobTypes = [...]Type{ TypeAutoSchemaTelemetry, TypePollJobsStats, TypeAutoConfigRunner, + TypeAutoConfigEnvRunner, + TypeAutoConfigTask, TypeKeyVisualizer, } @@ -197,6 +203,10 @@ func DetailsType(d isPayload_Details) (Type, error) { return TypePollJobsStats, nil case *Payload_AutoConfigRunner: return TypeAutoConfigRunner, nil + case *Payload_AutoConfigEnvRunner: + return TypeAutoConfigEnvRunner, nil + case *Payload_AutoConfigTask: + return TypeAutoConfigTask, nil default: return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d) } @@ -238,6 +248,8 @@ var JobDetailsForEveryJobType = map[Type]Details{ TypeKeyVisualizer: KeyVisualizerDetails{}, TypePollJobsStats: PollJobsStatsDetails{}, TypeAutoConfigRunner: AutoConfigRunnerDetails{}, + TypeAutoConfigEnvRunner: AutoConfigEnvRunnerDetails{}, + TypeAutoConfigTask: AutoConfigTaskDetails{}, } // WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper @@ -287,6 +299,10 @@ func WrapProgressDetails(details ProgressDetails) interface { return &Progress_PollJobsStats{PollJobsStats: &d} case AutoConfigRunnerProgress: return &Progress_AutoConfigRunner{AutoConfigRunner: &d} + case AutoConfigEnvRunnerProgress: + return &Progress_AutoConfigEnvRunner{AutoConfigEnvRunner: &d} + case AutoConfigTaskProgress: + return &Progress_AutoConfigTask{AutoConfigTask: &d} default: panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d)) } @@ -334,6 +350,10 @@ func (p *Payload) UnwrapDetails() Details { return *d.PollJobsStats case *Payload_AutoConfigRunner: return *d.AutoConfigRunner + case *Payload_AutoConfigEnvRunner: + return *d.AutoConfigEnvRunner + case *Payload_AutoConfigTask: + return *d.AutoConfigTask default: return nil } @@ -381,6 +401,10 @@ func (p *Progress) UnwrapDetails() ProgressDetails { return *d.PollJobsStats case *Progress_AutoConfigRunner: return *d.AutoConfigRunner + case *Progress_AutoConfigEnvRunner: + return *d.AutoConfigEnvRunner + case *Progress_AutoConfigTask: + return *d.AutoConfigTask default: return nil } @@ -452,6 +476,10 @@ func WrapPayloadDetails(details Details) interface { return &Payload_PollJobsStats{PollJobsStats: &d} case AutoConfigRunnerDetails: return &Payload_AutoConfigRunner{AutoConfigRunner: &d} + case AutoConfigEnvRunnerDetails: + return &Payload_AutoConfigEnvRunner{AutoConfigEnvRunner: &d} + case AutoConfigTaskDetails: + return &Payload_AutoConfigTask{AutoConfigTask: &d} default: panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -487,7 +515,7 @@ const ( func (Type) SafeValue() {} // NumJobTypes is the number of jobs types. -const NumJobTypes = 21 +const NumJobTypes = 23 // ChangefeedDetailsMarshaler allows for dependency injection of // cloud.SanitizeExternalStorageURI to avoid the dependency from this diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 788c0a2ef99f..c67ff539ac6e 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -16,6 +16,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -163,6 +164,9 @@ type Registry struct { draining bool } + drainJobs chan struct{} + startedControllerTasksWG sync.WaitGroup + // withSessionEvery ensures that logging when failing to get a live session // is not too loud. withSessionEvery log.EveryN @@ -236,6 +240,7 @@ func MakeRegistry( // if a notification is already queued. adoptionCh: make(chan adoptionNotice, 1), withSessionEvery: log.Every(time.Second), + drainJobs: make(chan struct{}), } if knobs != nil { r.knobs = *knobs @@ -1065,7 +1070,10 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { } }) + r.startedControllerTasksWG.Add(1) if err := stopper.RunAsyncTask(ctx, "jobs/cancel", func(ctx context.Context) { + defer r.startedControllerTasksWG.Done() + ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -1080,6 +1088,10 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { // Note: the jobs are cancelled by virtue of being run with a // WithCancelOnQuesce context. See the resumeJob() function. return + case <-r.drainJobs: + log.Warningf(ctx, "canceling all adopted jobs due to graceful drain request") + r.cancelAllAdoptedJobs() + return case <-lc.timer.C: lc.timer.Read = true cancelLoopTask(ctx) @@ -1087,9 +1099,14 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { } } }); err != nil { + r.startedControllerTasksWG.Done() return err } + + r.startedControllerTasksWG.Add(1) if err := stopper.RunAsyncTask(ctx, "jobs/gc", func(ctx context.Context) { + defer r.startedControllerTasksWG.Done() + ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -1110,6 +1127,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { lc.onUpdate() case <-stopper.ShouldQuiesce(): return + case <-r.drainJobs: + return case <-lc.timer.C: lc.timer.Read = true old := timeutil.Now().Add(-1 * retentionDuration()) @@ -1120,9 +1139,14 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { } } }); err != nil { + r.startedControllerTasksWG.Done() return err } - return stopper.RunAsyncTask(ctx, "jobs/adopt", func(ctx context.Context) { + + r.startedControllerTasksWG.Add(1) + if err := stopper.RunAsyncTask(ctx, "jobs/adopt", func(ctx context.Context) { + defer r.startedControllerTasksWG.Done() + ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() lc, cleanup := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt) @@ -1133,6 +1157,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { lc.onUpdate() case <-stopper.ShouldQuiesce(): return + case <-r.drainJobs: + return case shouldClaim := <-r.adoptionCh: // Try to adopt the most recently created job. if shouldClaim { @@ -1146,7 +1172,11 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { lc.onExecute() } } - }) + }); err != nil { + r.startedControllerTasksWG.Done() + return err + } + return nil } func (r *Registry) maybeCancelJobs(ctx context.Context, s sqlliveness.Session) { @@ -1915,14 +1945,22 @@ func (r *Registry) IsDraining() bool { return r.mu.draining } +// WaitForJobShutdown(ctx context.Context) { +func (r *Registry) WaitForRegistryShutdown(ctx context.Context) { + log.Infof(ctx, "starting to wait for job registry to shut down") + defer log.Infof(ctx, "job registry tasks successfully shut down") + r.startedControllerTasksWG.Wait() +} + // SetDraining informs the job system if the node is draining. -// -// NB: Check the implementation of drain before adding code that would -// make this block. -func (r *Registry) SetDraining(draining bool) { +func (r *Registry) SetDraining() { r.mu.Lock() defer r.mu.Unlock() - r.mu.draining = draining + alreadyDraining := r.mu.draining + r.mu.draining = true + if !alreadyDraining { + close(r.drainJobs) + } } // TestingIsJobIdle returns true if the job is adopted and currently idle. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 9d02467f2e61..2c61e55fed0d 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -160,6 +160,7 @@ go_library( "//pkg/security/securityassets", "//pkg/security/username", "//pkg/server/autoconfig", + "//pkg/server/autoconfig/acprovider", "//pkg/server/debug", "//pkg/server/diagnostics", "//pkg/server/diagnostics/diagnosticspb", diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index c0ba3ae9693b..52b16892af01 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -1,16 +1,60 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "autoconfig", - srcs = ["auto_config.go"], + srcs = [ + "auto_config.go", + "auto_config_env_runner.go", + "auto_config_task.go", + "doc.go", + "task_markers.go", + "testing_knobs.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig", visibility = ["//visibility:public"], deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/security/username", + "//pkg/server/autoconfig/acprovider", + "//pkg/server/autoconfig/autoconfigpb", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/isql", + "//pkg/sql/sessiondata", + "//pkg/util/encoding", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", + ], +) + +go_test( + name = "autoconfig_test", + srcs = [ + "auto_config_test.go", + "main_test.go", + "task_markers_test.go", + ], + args = ["-test.timeout=295s"], + deps = [ + ":autoconfig", + "//pkg/base", + "//pkg/clusterversion", + "//pkg/jobs", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/security/username", + "//pkg/server", + "//pkg/server/autoconfig/acprovider", + "//pkg/server/autoconfig/autoconfigpb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/autoconfig/acprovider/BUILD.bazel b/pkg/server/autoconfig/acprovider/BUILD.bazel new file mode 100644 index 000000000000..4b1fbb05848f --- /dev/null +++ b/pkg/server/autoconfig/acprovider/BUILD.bazel @@ -0,0 +1,12 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "acprovider", + srcs = ["provider.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider", + visibility = ["//visibility:public"], + deps = ["//pkg/server/autoconfig/autoconfigpb"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/server/autoconfig/acprovider/provider.go b/pkg/server/autoconfig/acprovider/provider.go new file mode 100644 index 000000000000..bdb6ddd869ae --- /dev/null +++ b/pkg/server/autoconfig/acprovider/provider.go @@ -0,0 +1,78 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package acprovider + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" +) + +// Provider is an interface through which the auto config runner +// can receive new tasks to run. +// For more details, see the package-level documentation for +// package "autoconfig". +type Provider interface { + // EnvUpdate returns a channel that receives a message any time the + // current set of environments change. The channel receives an + // initial event immediately. + EnvUpdate() <-chan struct{} + + // ActiveEnvironments returns the IDs of environments that have + // tasks available for execution. + ActiveEnvironments() []autoconfigpb.EnvironmentID + + // Peek will block waiting for the first task the provider believes + // still needs to be run for the given environment. It will return + // an error if the context is canceled while waiting. It will + // return ErrNoMoreTasks to indicate the caller does not need + // to process tasks for this environment any more. + Peek(ctx context.Context, env autoconfigpb.EnvironmentID) (autoconfigpb.Task, error) + + // Pop will report the completion of all tasks with ID up to + // completed for the given environment. + Pop(ctx context.Context, env autoconfigpb.EnvironmentID, completed autoconfigpb.TaskID) error +} + +type errNoMoreTasks struct{} + +func (errNoMoreTasks) Error() string { return "no more tasks" } + +// ErrNoMoreTasks can be checked with errors.Is on the result of Peek +// when an environment is not providing tasks for the foreseeable future. +var ErrNoMoreTasks error = errNoMoreTasks{} + +// NoTaskProvider is a stub provider which delivers no tasks. +type NoTaskProvider struct{} + +var _ Provider = NoTaskProvider{} + +// EnvUpdate is part of the Provider interface. +func (NoTaskProvider) EnvUpdate() <-chan struct{} { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return ch +} + +// ActiveEnvironments is part of the Provider interface. +func (NoTaskProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID { return nil } + +// Peek is part of the Provider interface. +func (NoTaskProvider) Peek( + ctx context.Context, _ autoconfigpb.EnvironmentID, +) (autoconfigpb.Task, error) { + return autoconfigpb.Task{}, ErrNoMoreTasks +} + +// Pop is part of the Provider interface. +func (NoTaskProvider) Pop(context.Context, autoconfigpb.EnvironmentID, autoconfigpb.TaskID) error { + return nil +} diff --git a/pkg/server/autoconfig/auto_config.go b/pkg/server/autoconfig/auto_config.go index f067ebaec6c5..221f201e29b9 100644 --- a/pkg/server/autoconfig/auto_config.go +++ b/pkg/server/autoconfig/auto_config.go @@ -15,10 +15,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) +// autoConfigRunner runs the job that accepts new auto configuration +// payloads and sequences the creation of individual jobs to execute +// them. +// The auto config payloads are run by the taskRunner defined in +// auto_config_task.go. +// +// Refer to the package-level documentation for more details. type autoConfigRunner struct { job *jobs.Job } @@ -32,6 +45,12 @@ func (r *autoConfigRunner) OnFailOrCancel( return nil } +// EnvironmentID aliases autoconfigpb.EnvironmentID +type EnvironmentID = autoconfigpb.EnvironmentID + +// TaskID aliases autoconfigpb.TaskID +type TaskID = autoconfigpb.TaskID + // Resume is part of the Resumer interface. func (r *autoConfigRunner) Resume(ctx context.Context, execCtx interface{}) error { // The auto config runner is a forever running background job. @@ -40,23 +59,79 @@ func (r *autoConfigRunner) Resume(ctx context.Context, execCtx interface{}) erro // status. r.job.MarkIdle(true) - wait := make(chan struct{}) + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() - // Note: even though the job registry cancels all running job upon - // shutdown, we find some tests fail unless this job also does its - // own wait. - shutdownCh := execCtx.(sql.JobExecContext).ExecCfg().RPCContext.Stopper.ShouldQuiesce() + // Provider gives us tasks to run. + provider := getProvider(ctx, execCfg) - select { - case <-ctx.Done(): - return ctx.Err() + // waitForEnvChange is the channel that indicates the set of + // environments is updated. + waitForEnvChange := provider.EnvUpdate() - case <-shutdownCh: - return context.Canceled + for { + // No tasks to create. Just wait until some tasks are delivered. + log.Infof(ctx, "waiting for environment activation...") + select { + case <-ctx.Done(): + return ctx.Err() - case <-wait: + case <-waitForEnvChange: + } + + r.job.MarkIdle(false) + for _, envID := range provider.ActiveEnvironments() { + if err := refreshEnvJob(ctx, execCfg, envID); err != nil { + log.Warningf(ctx, "error refreshing environment %q: %v", envID, err) + } + } + r.job.MarkIdle(true) } +} +func getProvider(ctx context.Context, execCfg *sql.ExecutorConfig) acprovider.Provider { + provider := execCfg.AutoConfigProvider + if provider == nil { + panic(errors.AssertionFailedf("programming error: missing provider")) + } + log.Infof(ctx, "using provider with type %T", provider) + return provider +} + +func refreshEnvJob(ctx context.Context, execCfg *sql.ExecutorConfig, envID EnvironmentID) error { + log.Infof(ctx, "refreshing runner job for environment %q", envID) + jobID := execCfg.JobRegistry.MakeJobID() + var jobCreated bool + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // Do we already have a job for this environment? + row, err := txn.QueryRowEx(ctx, + "get-env-runner-job", txn.KV(), sessiondata.NodeUserSessionDataOverride, + `SELECT id FROM system.jobs WHERE job_type = $1 AND created_by_type = $2`, + jobspb.TypeAutoConfigEnvRunner.String(), + makeEnvRunnerJobCreatedKey(envID)) + if err != nil { + return err + } + if row != nil { + log.Infof(ctx, "found existing job %v for environment %q", row[0], envID) + return nil + } + + // The job did not exist yet. Create it now. + if err := createEnvRunnerJob(ctx, txn, execCfg.JobRegistry, jobID, envID); err != nil { + return errors.Wrapf(err, "creating job %d for env %q", jobID, envID) + } + jobCreated = true + return nil + }); err != nil { + return err + } + if jobCreated { + log.Infof(ctx, "created job %d for env %q", jobID, envID) + // Start the job immediately. This speeds up the application + // of initial configuration tasks. + execCfg.JobRegistry.NotifyToResume(ctx, jobID) + } return nil } @@ -67,6 +142,5 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &autoConfigRunner{job: job} } - jobs.RegisterConstructor(jobspb.TypeAutoConfigRunner, createResumerFn, - jobs.DisablesTenantCostControl) + jobs.RegisterConstructor(jobspb.TypeAutoConfigRunner, createResumerFn, jobs.DisablesTenantCostControl) } diff --git a/pkg/server/autoconfig/auto_config_env_runner.go b/pkg/server/autoconfig/auto_config_env_runner.go new file mode 100644 index 000000000000..024c54d9c6ac --- /dev/null +++ b/pkg/server/autoconfig/auto_config_env_runner.go @@ -0,0 +1,245 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" +) + +// createEnvRunnerJob creates a job to execute tasks for the given +// environment. +// +// Refer to the package-level documentation for more details. +func createEnvRunnerJob( + ctx context.Context, + txn isql.Txn, + registry *jobs.Registry, + jobID jobspb.JobID, + envID EnvironmentID, +) error { + jobRecord := jobs.Record{ + Description: "runs configuration tasks", + Username: username.NodeUserName(), + Details: jobspb.AutoConfigEnvRunnerDetails{EnvID: envID}, + Progress: jobspb.AutoConfigEnvRunnerProgress{}, + CreatedBy: &jobs.CreatedByInfo{ + Name: makeEnvRunnerJobCreatedKey(envID), + }, + NonCancelable: true, + } + + _, err := registry.CreateJobWithTxn(ctx, jobRecord, jobID, txn) + return err +} + +func makeEnvRunnerJobCreatedKey(envID EnvironmentID) string { + const autoConfigEnvRunnerCreatedName = "auto-config-env-runner" + return fmt.Sprintf("%s:%s", autoConfigEnvRunnerCreatedName, envID) +} + +// envRunner is the runner for one task environment. +type envRunner struct { + envID EnvironmentID + job *jobs.Job +} + +var _ jobs.Resumer = (*envRunner)(nil) + +// OnFailOrCancel is a part of the Resumer interface. +func (r *envRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jobErr error) error { + return nil +} + +// Resume is part of the Resumer interface. +func (r *envRunner) Resume(ctx context.Context, execCtx interface{}) error { + ctx = logtags.AddTag(ctx, "taskenv", r.envID) + + // The auto config runner is a forever running background job. + // It's always safe to wind the SQL pod down whenever it's + // running, something we indicate through the job's idle + // status. + r.job.MarkIdle(true) + + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() + + // Provider gives us tasks to run. + provider := getProvider(ctx, execCfg) + + for { + log.Infof(ctx, "waiting for more tasks...") + task, err := provider.Peek(ctx, r.envID) + if err != nil { + if errors.Is(err, acprovider.ErrNoMoreTasks) { + // No more tasks to process. Just stop the runner. + return nil + } + return err + } + + r.job.MarkIdle(false) + var waitSome bool + waitSome, err = r.maybeRunNextTask(ctx, provider, execCfg, task) + if err != nil { + log.Warningf(ctx, "error processing auto config: %v", err) + } + r.job.MarkIdle(true) + if err != nil || waitSome { + // Wait a bit before retrying. + select { + // TODO(knz): Maybe make this delay configurable. + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + } +} + +func (r *envRunner) maybeRunNextTask( + ctx context.Context, + provider acprovider.Provider, + execCfg *sql.ExecutorConfig, + nextTask autoconfigpb.Task, +) (waitSome bool, err error) { + // Wait on any task jobs, if any. + // + // This is an optimization: the logic below would work without this step. + // We do this to avoid waiting too much after a task has completed + // (the logic below waits for a random amount of time in case of conflict). + if err = r.maybeWaitForCurrentTaskJob(ctx, execCfg); err != nil { + return true, err + } + + // The job ID we'll use. + jobID := execCfg.JobRegistry.MakeJobID() + log.Infof(ctx, "allocated job ID %d for next task candidate %d", jobID, nextTask.TaskID) + var nextTaskID TaskID + + err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (resErr error) { + // Re-check if there any other started task already. + otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID) + if err != nil { + return err + } + if otherTaskID != 0 { + // What likely happened is that another node caught up with us, + // observed the job completion for the previous task and already + // created the start marker and job for the next one. + log.Infof(ctx, "found start marker for task %d, unable to start another task", otherTaskID) + // We will simply retry, with some jittered delay. + waitSome = true + return nil + } + + // Find the latest completed task. + lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID) + if err != nil { + return err + } + + // Did another node catch up with us? + if lastTaskID >= nextTask.TaskID { + // Yes. Just tell the provider what we found and retry + // the peek. + return provider.Pop(ctx, r.envID, lastTaskID) + } + + // Can we even start this task? It may have a min version condition. + if !execCfg.Settings.Version.ActiveVersion(ctx).IsActiveVersion(nextTask.MinVersion) { + log.Infof(ctx, "next task %d has min version requirement %v while cluster is at version %v; waiting...", nextTask.TaskID, nextTask.MinVersion, execCfg.Settings.Version.ActiveVersion(ctx)) + waitSome = true + return nil + } + + // Enable the log event at the end of the surrounding function. + nextTaskID = nextTask.TaskID + + // Now we can create the start marker for our next task. + // + // We report the job ID in the value field to help with + // observability and to enable the call to + // maybeWaitForCurrentTaskJob(), which is an optimization. Storing + // the job ID is not strictly required for sequencing the tasks. + if err := writeStartMarker(ctx, txn, + InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID); err != nil { + return errors.Wrapf(err, "unable to write start marker for task %d", nextTaskID) + } + + // Finally, create the job. + if err := createTaskJob(ctx, txn, execCfg.JobRegistry, jobID, r.envID, nextTask); err != nil { + return errors.Wrapf(err, "unable to create job %d for task %d", jobID, nextTaskID) + } + return nil + }) + + if err == nil && nextTaskID != 0 { + log.Infof(ctx, "created job %d for task %d", jobID, nextTaskID) + } + + return waitSome, err +} + +// maybeWaitForCurrentTaskJob checks whether there is a current task +// running (there's a start marker for one); if there is, it waits for +// its job to complete. +func (r *envRunner) maybeWaitForCurrentTaskJob( + ctx context.Context, execCfg *sql.ExecutorConfig, +) error { + var prevJobID jobspb.JobID + var prevTaskID TaskID + + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var err error + prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID) + return err + }); err != nil { + return errors.Wrap(err, "checking latest task job") + } + + if prevJobID != 0 { + // We have a job already. Just wait for it. + log.Infof(ctx, "waiting for task %d, job %d to complete", prevTaskID, prevJobID) + if err := execCfg.JobRegistry.Run(ctx, []jobspb.JobID{prevJobID}); err != nil { + // Job fail errors here are not hard errors; it may be that + // the job was simply cancelled by the user. + log.Infof(ctx, "previous task job error: %v", err) + } + } + return nil +} + +func init() { + // Note: we disable tenant cost control because auto-config is used + // by operators and should thus not incur costs (or performance + // penalties) to tenants. + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + jd := job.Details() + details := jd.(jobspb.AutoConfigEnvRunnerDetails) + return &envRunner{envID: details.EnvID, job: job} + } + jobs.RegisterConstructor(jobspb.TypeAutoConfigEnvRunner, createResumerFn, jobs.DisablesTenantCostControl) +} diff --git a/pkg/server/autoconfig/auto_config_task.go b/pkg/server/autoconfig/auto_config_task.go new file mode 100644 index 000000000000..ee2c6affc09d --- /dev/null +++ b/pkg/server/autoconfig/auto_config_task.go @@ -0,0 +1,187 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" +) + +// createTaskJob creates a job to execute the given task. +// +// Refer to the package-level documentation for more details. +func createTaskJob( + ctx context.Context, + txn isql.Txn, + registry *jobs.Registry, + jobID jobspb.JobID, + envID EnvironmentID, + task autoconfigpb.Task, +) error { + jobRecord := jobs.Record{ + Description: fmt.Sprintf("configuration task: %s", task.Description), + Username: username.NodeUserName(), + Details: jobspb.AutoConfigTaskDetails{EnvID: envID, Task: task}, + Progress: jobspb.AutoConfigTaskProgress{}, + CreatedBy: &jobs.CreatedByInfo{ + Name: fmt.Sprintf("%s:%s", AutoConfigTaskCreatedName, envID), + ID: int64(task.TaskID), + }, + } + + _, err := registry.CreateJobWithTxn(ctx, jobRecord, jobID, txn) + return err +} + +// AutoConfigTaskCreatedName is the value in created_by_name for jobs +// created by the auto config runner. +const AutoConfigTaskCreatedName = "auto-config-task" + +// taskRunner is the runner for one task. +type taskRunner struct { + envID EnvironmentID + task autoconfigpb.Task +} + +var _ jobs.Resumer = (*taskRunner)(nil) + +// OnFailOrCancel is a part of the Resumer interface. +func (r *taskRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jobErr error) error { + ctx = logtags.AddTag(ctx, "taskenv", r.envID) + ctx = logtags.AddTag(ctx, "task", r.task.TaskID) + log.Infof(ctx, "task execution failed: %v", jobErr) + + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() + provider := getProvider(ctx, execCfg) + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return markTaskComplete(ctx, txn, + InfoKeyTaskRef{Environment: r.envID, Task: r.task.TaskID}, + []byte("task error")) + }); err != nil { + return err + } + + // Tell the provider we're done so the task is not served again to + // the runner. + if err := provider.Pop(ctx, r.envID, r.task.TaskID); err != nil { + // Failing to Pop is not a hard job error: the runner also knows + // how to pop upon finding unexpected completion markers. + log.Warningf(ctx, "error popping the task off the queue: %v", err) + } + return nil +} + +// Resume is part of the Resumer interface. +func (r *taskRunner) Resume(ctx context.Context, execCtx interface{}) error { + ctx = logtags.AddTag(ctx, "taskenv", r.envID) + ctx = logtags.AddTag(ctx, "task", r.task.TaskID) + log.Infof(ctx, "starting execution") + + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() + provider := getProvider(ctx, execCfg) + + switch payload := r.task.GetPayload().(type) { + case *autoconfigpb.Task_SimpleSQL: + if err := execSimpleSQL(ctx, execCfg, r.envID, r.task.TaskID, payload.SimpleSQL); err != nil { + return err + } + + default: + return errors.AssertionFailedf("unknown task payload type: %T", payload) + } + + // Tell the provider we're done so the task is not served again to + // the runner. + if err := provider.Pop(ctx, r.envID, r.task.TaskID); err != nil { + // Failing to Pop is not a hard job error: the runner also knows + // how to pop upon finding unexpected completion markers. + log.Warningf(ctx, "error popping the task off the queue: %v", err) + } + return nil +} + +// execSimpleSQL executs a SQL payload a a single combined transaction +// that also includes the removal of the start marker and the creation +// of the completion marker. +func execSimpleSQL( + ctx context.Context, + execCfg *sql.ExecutorConfig, + envID EnvironmentID, + taskID TaskID, + sqlPayload *autoconfigpb.SimpleSQL, +) error { + // Which SQL identity to use. + sqlUsername := username.RootUserName() + if sqlPayload.UsernameProto != "" { + sqlUsername = sqlPayload.UsernameProto.Decode() + } + execOverride := sessiondata.InternalExecutorOverride{ + User: sqlUsername, + } + // First execute all the non-transactional, idempotent SQL statements. + if len(sqlPayload.NonTransactionalStatements) > 0 { + exec := execCfg.InternalDB.Executor() + for _, stmt := range sqlPayload.NonTransactionalStatements { + log.Infof(ctx, "attempting execution of non-txn task statement:\n%s", stmt) + _, err := exec.ExecEx(ctx, "exec-task-statement", nil, /* txn */ + execOverride, stmt) + if err != nil { + return err + } + } + log.Infof(ctx, "finished executing non-txn task statements") + } + + // Now execute all the transactional, potentially not idempotent + // statements. + return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + for _, stmt := range sqlPayload.TransactionalStatements { + log.Infof(ctx, "attempting execution of task statement:\n%s", stmt) + _, err := txn.ExecEx(ctx, "exec-task-statement", + txn.KV(), + execOverride, + stmt) + if err != nil { + return err + } + } + log.Infof(ctx, "finished executing txn statements") + return markTaskComplete(ctx, txn, + InfoKeyTaskRef{Environment: envID, Task: taskID}, + []byte("task success")) + }) +} + +func init() { + // Note: we disable tenant cost control because auto-config is used + // by operators and should thus not incur costs (or performance + // penalties) to tenants. + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + jd := job.Details() + details := jd.(jobspb.AutoConfigTaskDetails) + return &taskRunner{envID: details.EnvID, task: details.Task} + } + jobs.RegisterConstructor(jobspb.TypeAutoConfigTask, createResumerFn, jobs.DisablesTenantCostControl) +} diff --git a/pkg/server/autoconfig/auto_config_test.go b/pkg/server/autoconfig/auto_config_test.go new file mode 100644 index 000000000000..f927584c7387 --- /dev/null +++ b/pkg/server/autoconfig/auto_config_test.go @@ -0,0 +1,201 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +const testEnvID autoconfigpb.EnvironmentID = "my test env" + +type testProvider struct { + t *testing.T + notifyCh chan struct{} + peekWaitCh chan struct{} + tasks []testTask +} + +type testTask struct { + task autoconfigpb.Task + seen bool +} + +var testTasks = []testTask{ + {task: autoconfigpb.Task{ + TaskID: 123, + Description: "test task that creates a system table", + MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + UsernameProto: username.NodeUserName().EncodeProto(), + NonTransactionalStatements: []string{ + "CREATE TABLE IF NOT EXISTS system.foo(x INT)", + // This checks that the non-txn part works properly: SET + // CLUSTER SETTING can only be run outside of explicit txns. + "SET CLUSTER SETTING cluster.organization = 'woo'", + }, + }, + }, + }}, + {task: autoconfigpb.Task{ + TaskID: 345, + Description: "test task that fails with an error", + MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + TransactionalStatements: []string{"SELECT invalid"}, + }, + }, + }}, + {task: autoconfigpb.Task{ + TaskID: 456, + Description: "test task that creates another system table", + MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + UsernameProto: username.NodeUserName().EncodeProto(), + NonTransactionalStatements: []string{"CREATE TABLE IF NOT EXISTS system.bar(y INT)"}, + }, + }, + }}, +} + +func (p *testProvider) EnvUpdate() <-chan struct{} { + p.t.Logf("runner has registered env update channel") + return p.notifyCh +} + +func (p *testProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID { + return []autoconfigpb.EnvironmentID{testEnvID} +} + +func (p *testProvider) Pop( + _ context.Context, envID autoconfigpb.EnvironmentID, taskID autoconfigpb.TaskID, +) error { + p.t.Logf("runner reports completed task %d (env %q)", taskID, envID) + for len(p.tasks) > 0 { + if taskID >= p.tasks[0].task.TaskID { + p.t.Logf("popping task %d from queue", p.tasks[0].task.TaskID) + p.tasks = p.tasks[1:] + continue + } + break + } + return nil +} + +func (p *testProvider) Peek( + ctx context.Context, envID autoconfigpb.EnvironmentID, +) (autoconfigpb.Task, error) { + p.t.Logf("runner peeking (env %q)", envID) + if len(p.tasks) == 0 { + return autoconfigpb.Task{}, acprovider.ErrNoMoreTasks + } + if !p.tasks[0].seen { + // seen ensures that the runner job won't have to wait a second + // time when peeking the task. + select { + case <-ctx.Done(): + return autoconfigpb.Task{}, ctx.Err() + case <-p.peekWaitCh: + } + } + p.tasks[0].seen = true + return p.tasks[0].task, nil +} + +func TestAutoConfig(t *testing.T) { + defer leaktest.AfterTest(t)() + + provider := &testProvider{ + t: t, + notifyCh: make(chan struct{}, 1), + peekWaitCh: make(chan struct{}), + tasks: testTasks, + } + provider.notifyCh <- struct{}{} + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + AutoConfig: &autoconfig.TestingKnobs{ + Provider: provider, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + t.Logf("waiting for runner job...") + testutils.SucceedsSoon(t, func() error { + var jobID int64 + if err := sqlDB.QueryRowContext(ctx, `SELECT id FROM system.jobs WHERE id = $1`, + jobs.AutoConfigRunnerJobID).Scan(&jobID); err != nil { + return err + } + t.Logf("found runner job: %d", jobID) + return nil + }) + + waitForTaskCompleted := func(taskID autoconfigpb.TaskID) (result []byte) { + taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnvID, Task: taskID} + completionMarker := taskRef.EncodeCompletionMarkerKey() + testutils.SucceedsSoon(t, func() error { + err := sqlDB.QueryRowContext(ctx, ` +SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`, + jobs.AutoConfigRunnerJobID, + completionMarker).Scan(&result) + if err != nil { + return err + } + t.Logf("found task completion: %q", string(result)) + return nil + }) + return result + } + + provider.peekWaitCh <- struct{}{} + t.Logf("waiting for first task completion marker...") + result := waitForTaskCompleted(testTasks[0].task.TaskID) + require.Equal(t, []byte("task success"), result) + + t.Logf("check that the effects of the first task are visible") + var unused int + err := sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.foo`).Scan(&unused) + require.NoError(t, err) + + provider.peekWaitCh <- struct{}{} + t.Logf("waiting for 2nd task completion marker...") + result = waitForTaskCompleted(testTasks[1].task.TaskID) + require.Equal(t, []byte("task error"), result) + + provider.peekWaitCh <- struct{}{} + t.Logf("waiting for 3rd task completion marker...") + result = waitForTaskCompleted(testTasks[2].task.TaskID) + require.Equal(t, []byte("task success"), result) + + t.Logf("check that the effects of the first tasks are visible") + err = sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.bar`).Scan(&unused) + require.NoError(t, err) +} diff --git a/pkg/server/autoconfig/autoconfigpb/BUILD.bazel b/pkg/server/autoconfig/autoconfigpb/BUILD.bazel new file mode 100644 index 000000000000..576051229d26 --- /dev/null +++ b/pkg/server/autoconfig/autoconfigpb/BUILD.bazel @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("//build:STRINGER.bzl", "stringer") + +proto_library( + name = "autoconfigpb_proto", + srcs = ["autoconfig.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:roachpb_proto", + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + ], +) + +go_proto_library( + name = "autoconfigpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb", + proto = ":autoconfigpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/security/username", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "autoconfigpb", + srcs = ["autoconfig.go"], + embed = [":autoconfigpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/server/autoconfig/autoconfigpb/autoconfig.go b/pkg/server/autoconfig/autoconfigpb/autoconfig.go new file mode 100644 index 000000000000..347fd8fd3ac1 --- /dev/null +++ b/pkg/server/autoconfig/autoconfigpb/autoconfig.go @@ -0,0 +1,17 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfigpb + +// EnvironmentID is the type of an environment that provides tasks. +type EnvironmentID string + +// TaskID is the type of a task ID. +type TaskID uint64 diff --git a/pkg/server/autoconfig/autoconfigpb/autoconfig.proto b/pkg/server/autoconfig/autoconfigpb/autoconfig.proto new file mode 100644 index 000000000000..2f3ea9a220b8 --- /dev/null +++ b/pkg/server/autoconfig/autoconfigpb/autoconfig.proto @@ -0,0 +1,97 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.server.autoconfig.autoconfigpb; +option go_package = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb"; + +import "gogoproto/gogo.proto"; +import "roachpb/metadata.proto"; + +// TaskBundle contains tasks defined by multiple environments. +// See the package-level documentation for package autoconfig +// for more details. +message TaskBundle { + option (gogoproto.equal) = true; + + // EnvTasks defines the tasks provided by one environment. + // The tasks of one environment execute sequentially. + message EnvTasks { + option (gogoproto.equal) = true; + string env_id = 1 [(gogoproto.customname) = "EnvID", (gogoproto.casttype) = "EnvironmentID"]; + repeated Task tasks = 2 [(gogoproto.nullable) = false]; + // NEXT ID: 3; + } + + // Bundles defines tasks for different environments. There should + // be only one entry in the array per environment ID. + // Tasks from different environments can execute concurrently. + repeated EnvTasks bundles = 1 [(gogoproto.nullable) = false]; + + // NEXT ID: 2; +} + +// Task defines one auto config task to run by a tenant. +message Task { + option (gogoproto.equal) = true; + + // TaskID is the key for this task within one environment. + // Tasks with a given ID are executed at least once, with + // extra efforts to make it exactly-once whenever possible. + // Tasks within the same environment are executed sequentially + // in the order of their task ID. + // See the package-level documentation for package autoconfig + // for more details. + uint64 task_id = 1 [(gogoproto.customname) = "TaskID", (gogoproto.casttype) = "TaskID"]; + + // Description of what the auto configuration task does. + // This is reported unredactable in logs and telemetry and thus must be void of PII. + string description = 2; + + // MinVersion is the minimum active cluster version that + // must be encountered before this task can run. + roachpb.Version min_version = 3 [(gogoproto.nullable) = false]; + + // Payload describes what the task should run. + oneof payload { + SimpleSQL simpleSQL = 4; + } + + // NEXT ID: 5; +} + +// SimpleSQL describes a combination of some optional +// non-transactional SQL statements (which should be idempotent), and +// some optional transactional SQL statements. +// +// The SQL is executed using the node user identity. The task fails if the +// SQL aborts with an error and is not auto-retried. +message SimpleSQL { + option (gogoproto.equal) = true; + + // SQL identity to run the statements as. + // If left empty, execution will use the 'root' identity. + string username_proto = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; + + + // SQL statements to execute as standalone transactions. These are + // executed before the transactional statements below. If any of + // these fails or the transactional statements below need to be + // retried, all the non-transactional statements will be executed + // again. They should be defined to only contain idempotent SQL, + // for example SHOW CLUSTER SETTING or CREATE TABLE IF NOT EXISTS. + repeated string non_transactional_statements = 2; + + // SQL statements to execute transactionally with the placement task + // completion marker. This will be executed at most once. + repeated string transactional_statements = 3; + + // NEXT ID: 4; +} diff --git a/pkg/server/autoconfig/doc.go b/pkg/server/autoconfig/doc.go new file mode 100644 index 000000000000..f2eeb94e3f87 --- /dev/null +++ b/pkg/server/autoconfig/doc.go @@ -0,0 +1,162 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package autoconfig defines a mechanism through which the SQL service +// for a tenant (including, possibly, the system tenant) can run +// SQL tasks defined externally only once, using jobs. +// +// The main purposes of this logic are: +// +// - To support SQL-based initialization of a cluster's parameters +// (including cluster settings, initial user creation, initial +// db creation) prior to accepting application clients. +// - To support logical cluster maintainance by CockroachCloud +// SREs for Serverless customers without incurring RU billing. +// - To support the automation of maintainance operations across +// a fleet of secondary tenants (such as the creation of +// multi-region system tables when initializing a multi-region +// CC serverless cluster). +// +// In a nutshell, the mechanism works as follows: +// +// - There is a permanent job called the 'auto config runner'. +// - The runner job accepts task definitions from zero or more +// "external environments" (see below for details), and organizes +// their execution by creating individual 'auto config task' jobs. +// - Each auto config task job runs one task; it is cancellable +// by the end user (see below for a note about cancellability). +// - The runner job only creates the job for the next task after the +// job for the previous task by a given environment has fully +// completed (either successfully, with an error, or has been +// cancelled). +// +// # Task provenance - data model +// +// We plan to use tasks for various purposes (troubleshooting, +// migrations, etc.) and so we need sequential semantics: a guarantee +// that a task doesn't start before the previous one has completed. +// +// However, if we did this for all tasks we would have a problem +// when a task gets stuck - then the entire system wouldn't make +// progress any more. +// +// To reconcile our desire for sequences and preserve our ability +// to "repair" broken situations, we also introduce a notion of +// *environment* where tasks are defined: +// +// - an environment identifies a potential source of tasks. +// - tasks are grouped by environment; or rather, each environment +// provisions its own tasks (potentially at different rates). +// - the execution of tasks within one environment is sequential. +// - the execution of tasks of separate environments is independent, +// and so different environments can execute tasks concurrently +// with each other. +// - the system supports the dynamic addition of new environments +// (by using a yet-unused environment name for new tasks). +// +// This makes it possible to "abandon" an environment whose tasks +// got stuck and start tasks using a new one. +// +// # Task provenance - mechanism +// +// The auto config runner job takes its input from a "task provider" +// (Go interface `Provider`, from the package `acprovider`). The task +// provider has a notification channel and an accessor that lists +// environments with pending tasks ("active"). +// +// For each active environment, the top-level runner creates a runner +// job specific to that environment. Each environment-specific runner +// then retrieves tasks from the provider to run them. +// +// In practice, there are 2 main task providers envisioned at the time +// of this writing: +// +// - A provider of static configuration tasks for the system tenant, +// via hardcoded "configuration profiles" selected when a cluster is +// initially created. +// - A provider of dynamic configuration asks for secondary tenants, +// provisioned through a RPC in the "tenant connector". +// +// # Task definition and execution guarantees +// +// Under normal conditions, and barring explicit contrarian action by +// the user, the runner logic provides at-least-once execution +// semantics, and tries hard to provide exactly-once semantics. (As +// the theory tells us that exactly-once is not possible, see below +// for a discussion of exceptions.) It also guarantees that auto +// config tasks from the same environment are not executed +// concurrently with each other, with at most one node executing a +// task at a time. +// +// In a nutshell, the mechanism works as follows: +// +// - Tasks are numbered sequentially (task ID); it is the +// responsibility of "the environment" (the task provider) to +// guarantee that later tasks have greater IDs than earlier tasks. +// - The runner uses "started" and "completed" markers in the +// system.job_info table to ensure that no more than one job is +// created for a given task. +// +// More specifically, when the runner is ready to execute a task, it +// transactionally: +// +// 1. checks that there is no in-flight task already. +// (e.g. one started by the runner on another node) +// 2. retrieves the latest completion marker by task ID from job_info, +// (indicates the latest task completed by ID) +// 3. it ratchets the queue of incoming tasks forward +// to skip over all tasks with an ID lower or equal to the +// last task completed. +// 4. it creates a "auto config task" job entry for that task +// and places a start marker for it in job_info. +// +// Then, the "auto config task" executes the SQL in the task +// transactionally with the removal of its start marker and the +// creation of its completion marker. +// +// # Possible exceptions to the "exactly-once" semantics +// +// The tasks are executed as jobs: +// +// - if a task's job is cancelled by the user before it runs, its +// side-effects will never be observed. +// - if a node crashes while a task job is executing, it +// will be restarted later (either after this node restarts, +// or by being adopted by another node). So any +// side effects not otherwise aborted by the transaction +// supporting the task's execution may be observed more than once. +// This includes e.g. changes to cluster settings. +// +// # Task errors vs progress across tasks +// +// Note that if a task fail, its job is marked as permanently failed +// and it will not be retried. This also goes for tasks that have been +// canceled by the user. +// +// Of note, a task's failure does not prevent the next task from +// starting. +// +// This begs the question: what if the environment wishes to mark +// a task as "required", such that no progress is allowed in subsequent +// tasks until the required task completes successfully? +// +// While this is not yet supported in this package, a solution +// would probably contain the following: +// +// - the task's job would be marked as non-cancellable by the +// end-user. +// - the task's job would not be marked as permanently failed +// upon encountering execution failures, so it gets retried. +// - we would need another mechanism to cancel execution +// of that task and its environment in case the operator +// wants to abandon the whole sequence. +// +// This enhancement is left to a later iteration. +package autoconfig diff --git a/pkg/server/autoconfig/main_test.go b/pkg/server/autoconfig/main_test.go new file mode 100644 index 000000000000..2e8cfa7a970b --- /dev/null +++ b/pkg/server/autoconfig/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/server/autoconfig/task_markers.go b/pkg/server/autoconfig/task_markers.go new file mode 100644 index 000000000000..93a85b51fab6 --- /dev/null +++ b/pkg/server/autoconfig/task_markers.go @@ -0,0 +1,214 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import ( + "context" + "encoding/hex" + "strconv" + "strings" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/errors" +) + +// infoKeyCompletionPrefix is the prefix of the key inserted in job_info +// when a task has completed. +const infoKeyCompletionPrefix = "completed-" + +// infoKeyStartPrefix is the prefix of the key inserted in job_info +// when a task has started. +const infoKeyStartPrefix = "started-" + +// InfoKeyStartPrefix returns the info_key scan start key for +// all task start markers for the given environment. +func InfoKeyStartPrefix(env EnvironmentID) string { + var buf strings.Builder + buf.Grow(len(infoKeyStartPrefix) + len(env)) + buf.WriteString(infoKeyStartPrefix) + buf.WriteString(string(env)) + return buf.String() +} + +// InfoKeyCompletionPrefix returns the info_key scan start key for +// all task completion markers for the given environment. +func InfoKeyCompletionPrefix(env EnvironmentID) string { + var buf strings.Builder + buf.Grow(len(infoKeyCompletionPrefix) + len(env)) + buf.WriteString(infoKeyCompletionPrefix) + buf.WriteString(string(env)) + return buf.String() +} + +// InfoKeyTaskRef represents the reference to a task stored in +// job_info task markers. +type InfoKeyTaskRef struct { + Environment EnvironmentID + Task TaskID +} + +// EncodeStartMarker creates a job_info info key that identifies +// a start marker for this task. +func (tr *InfoKeyTaskRef) EncodeStartMarkerKey() string { + return tr.encodeInternal(infoKeyStartPrefix) +} + +// DecodeStartMarker decodes a job_info info key that identifies a +// start marker for this task. +func (tr *InfoKeyTaskRef) DecodeStartMarkerKey(infoKey string) error { + return tr.decodeInternal(infoKeyStartPrefix, infoKey) +} + +// EncodeCompletionMarker creates a job_info info key that identifies +// a completion marker for this task. +func (tr *InfoKeyTaskRef) EncodeCompletionMarkerKey() string { + return tr.encodeInternal(infoKeyCompletionPrefix) +} + +// DecodeCompletionMarker decodes a job_info info key that identifies +// a completion marker for this task. +func (tr *InfoKeyTaskRef) DecodeCompletionMarkerKey(infoKey string) error { + return tr.decodeInternal(infoKeyCompletionPrefix, infoKey) +} + +func (tr *InfoKeyTaskRef) encodeInternal(prefix string) string { + orderedKeyBytes := make([]byte, 0, len(tr.Environment)+10) + orderedKeyBytes = encoding.EncodeStringAscending(orderedKeyBytes, string(tr.Environment)) + orderedKeyBytes = encoding.EncodeUvarintAscending(orderedKeyBytes, uint64(tr.Task)) + + var buf strings.Builder + buf.Grow(len(prefix) + hex.EncodedLen(len(orderedKeyBytes))) + buf.WriteString(prefix) + hexEncode := hex.NewEncoder(&buf) + _, err := hexEncode.Write(orderedKeyBytes) + if err != nil { + // This can't happen because strings.Builder always auto-grows. + panic(errors.HandleAsAssertionFailure(err)) + } + + return buf.String() +} + +func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error { + if !strings.HasPrefix(infoKey, prefix) { + return errors.AssertionFailedf("programming error: prefix %q missing: %q", prefix, infoKey) + } + infoKey = infoKey[len(prefix):] + bytes, err := hex.DecodeString(infoKey) + if err != nil { + return errors.Wrapf(err, "decoding hex-encoded info key (%q)", infoKey) + } + rest, s, err := encoding.DecodeUnsafeStringAscendingDeepCopy(bytes, nil) + if err != nil { + return errors.Wrap(err, "decoding environment from task info key") + } + _, v, err := encoding.DecodeUvarintAscending(rest) + if err != nil { + return errors.Wrap(err, "decoding task ID from task info key") + } + tr.Environment = EnvironmentID(s) + tr.Task = TaskID(v) + return nil +} + +// writeStartMarker writes a start marker for the given task ID and +// also writes its job ID into the value part. +func writeStartMarker( + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, +) error { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + return infoStorage.Write(ctx, + taskRef.EncodeStartMarkerKey(), + []byte(strconv.FormatUint(uint64(jobID), 10))) +} + +// getCurrentlyStartedTaskID retrieves the ID of the last task which +// has a start marker in job_info. +func getCurrentlyStartedTaskID( + ctx context.Context, txn isql.Txn, env EnvironmentID, +) (prevTaskID TaskID, prevJobID jobspb.JobID, err error) { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + + if err := infoStorage.GetLast(ctx, + InfoKeyStartPrefix(env), + func(infoKey string, value []byte) error { + var taskRef InfoKeyTaskRef + if err := taskRef.DecodeStartMarkerKey(infoKey); err != nil { + return errors.Wrapf(err, "decoding info key (%q)", infoKey) + } + prevTaskID = taskRef.Task + + // Also retrieve is job ID from the value bytes. + jid, err := strconv.ParseInt(string(value), 10, 64) + if err != nil { + return errors.Wrapf(err, + "while decoding value (%q) for start marker for task %d", + string(value), prevTaskID) + } + prevJobID = jobspb.JobID(jid) + return nil + }); err != nil { + return 0, 0, errors.Wrap(err, "finding last task start marker") + } + + return prevTaskID, prevJobID, nil +} + +// getLastCompletedTaskID retrieves the task ID of the last task which +// has a completion marker in job_info. +func getLastCompletedTaskID( + ctx context.Context, txn isql.Txn, env EnvironmentID, +) (lastTaskID TaskID, err error) { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + + if err := infoStorage.GetLast(ctx, + InfoKeyCompletionPrefix(env), + func(infoKey string, value []byte) error { + // There's a task. + var taskRef InfoKeyTaskRef + if err := taskRef.DecodeCompletionMarkerKey(infoKey); err != nil { + return errors.Wrapf(err, "decoding info key (%q)", infoKey) + } + lastTaskID = taskRef.Task + return nil + }); err != nil { + return 0, errors.Wrap(err, "finding last task completion marker") + } + + return lastTaskID, nil +} + +// markTaskCompletes transactionally removes the task's start marker +// and creates a completion marker. +func markTaskComplete( + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, +) error { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + + // Remove the start marker. + if err := infoStorage.Delete(ctx, taskRef.EncodeStartMarkerKey()); err != nil { + return err + } + + // Remove any previous completion marker. This avoids the + // accumulation of past completion markers over time. + completionKeyPrefix := InfoKeyCompletionPrefix(taskRef.Environment) + completionInfoKey := taskRef.EncodeCompletionMarkerKey() + if err := infoStorage.DeleteRange(ctx, completionKeyPrefix, completionInfoKey); err != nil { + return err + } + + // Add our completion marker. + return infoStorage.Write(ctx, completionInfoKey, completionValue) +} diff --git a/pkg/server/autoconfig/task_markers_test.go b/pkg/server/autoconfig/task_markers_test.go new file mode 100644 index 000000000000..cc82ae90090c --- /dev/null +++ b/pkg/server/autoconfig/task_markers_test.go @@ -0,0 +1,112 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig_test + +import ( + "fmt" + "math" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +// TestEncodeDecodeMarkers tests that the result of the encode +// functions can be processed by the decode functions. +func TestEncodeDecodeMarkers(t *testing.T) { + defer leaktest.AfterTest(t)() + + for _, testEnv := range []autoconfig.EnvironmentID{"", "foo", "bar"} { + for _, testTask := range []autoconfig.TaskID{0, 1, 10, math.MaxUint64} { + t.Run(fmt.Sprintf("%s/%d", testEnv, testTask), func(t *testing.T) { + taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnv, Task: testTask} + + encodedStart := taskRef.EncodeStartMarkerKey() + var tr autoconfig.InfoKeyTaskRef + require.NoError(t, tr.DecodeStartMarkerKey(encodedStart)) + require.Equal(t, taskRef, tr) + + require.Error(t, tr.DecodeCompletionMarkerKey(encodedStart)) + + encodedComplete := taskRef.EncodeCompletionMarkerKey() + var tr2 autoconfig.InfoKeyTaskRef + require.NoError(t, tr2.DecodeCompletionMarkerKey(encodedComplete)) + require.Equal(t, taskRef, tr2) + + require.Error(t, tr2.DecodeStartMarkerKey(encodedComplete)) + }) + } + } +} + +// TestMarkerOrdering tests that the ordering of the encoded markers +// is the same as the ordering of the task IDs. +func TestMarkerOrdering(t *testing.T) { + defer leaktest.AfterTest(t)() + + var taskRefs []autoconfig.InfoKeyTaskRef + for _, testEnv := range []autoconfig.EnvironmentID{"", "bar", "foo"} { + for i := 0; i < 10; i++ { + taskRefs = append(taskRefs, autoconfig.InfoKeyTaskRef{Environment: testEnv, Task: autoconfig.TaskID(i)}) + } + } + + // Copy taskRefs to taskRefsRandom. + taskRefsRandom := make([]autoconfig.InfoKeyTaskRef, len(taskRefs)) + copy(taskRefsRandom, taskRefs) + + // Randomize the ordering of taskRefsRandom. + r, _ := randutil.NewTestRand() + for i := range taskRefsRandom { + j := i + int(r.Int31n(int32(len(taskRefsRandom)-i))) + taskRefsRandom[i], taskRefsRandom[j] = taskRefsRandom[j], taskRefsRandom[i] + } + + // Get start marker encodings for all taskRefs. + var startMarkers []string + for _, taskRef := range taskRefsRandom { + startMarkers = append(startMarkers, taskRef.EncodeStartMarkerKey()) + } + // Sort the start markers. This should be the same as sorting the taskRefs. + sort.Strings(startMarkers) + + // Decode the start markers. + var decodedStartMarkers []autoconfig.InfoKeyTaskRef + for _, startMarker := range startMarkers { + var taskRef autoconfig.InfoKeyTaskRef + require.NoError(t, taskRef.DecodeStartMarkerKey(startMarker)) + decodedStartMarkers = append(decodedStartMarkers, taskRef) + } + + // Check that the decoded start markers are the same as the original taskRefs. + require.Equal(t, taskRefs, decodedStartMarkers) + + // Get completion marker encodings for all taskRefs. + var completionMarkers []string + for _, taskRef := range taskRefsRandom { + completionMarkers = append(completionMarkers, taskRef.EncodeCompletionMarkerKey()) + } + // Sort the completion markers. This should be the same as sorting the taskRefs. + sort.Strings(completionMarkers) + + // Decode the completion markers. + var decodedCompletionMarkers []autoconfig.InfoKeyTaskRef + for _, completionMarker := range completionMarkers { + var taskRef autoconfig.InfoKeyTaskRef + require.NoError(t, taskRef.DecodeCompletionMarkerKey(completionMarker)) + decodedCompletionMarkers = append(decodedCompletionMarkers, taskRef) + } + // Check that the decoded completion markers are the same as the original taskRefs. + require.Equal(t, taskRefs, decodedCompletionMarkers) +} diff --git a/pkg/server/autoconfig/testing_knobs.go b/pkg/server/autoconfig/testing_knobs.go new file mode 100644 index 000000000000..37710e749072 --- /dev/null +++ b/pkg/server/autoconfig/testing_knobs.go @@ -0,0 +1,19 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + +type TestingKnobs struct { + Provider acprovider.Provider +} + +func (*TestingKnobs) ModuleTestingKnobs() {} diff --git a/pkg/server/drain.go b/pkg/server/drain.go index 4d8ae408d58e..adb351a79f18 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -347,9 +347,6 @@ func (s *drainServer) drainClients( s.drainSleepFn(drainWait.Get(&s.sqlServer.execCfg.Settings.SV)) } - // Inform the job system that the node is draining. - s.sqlServer.jobRegistry.SetDraining(true) - // Wait for users to close the existing SQL connections. // During this phase, the server is rejecting new SQL connections. // The server exits this phase either once all SQL connections are closed, @@ -358,6 +355,18 @@ func (s *drainServer) drainClients( return err } + // Inform the job system that the node is draining. + // + // We cannot do this before SQL clients disconnect, because + // otherwise there is a risk that one of the remaining SQL sessions + // issues a BACKUP or some other job-based statement before it + // disconnects, and encounters a job error as a result -- that the + // registry is now unavailable due to the drain. + s.sqlServer.jobRegistry.SetDraining() + + // Inform the auto-stats tasks that the node is draining. + s.sqlServer.statsRefresher.SetDraining() + // Drain any remaining SQL connections. // The queryWait duration is a timeout for waiting for SQL queries to finish. // If the timeout is reached, any remaining connections @@ -372,12 +381,32 @@ func (s *drainServer) drainClients( s.sqlServer.distSQLServer.Drain(ctx, queryMaxWait, reporter) // Flush in-memory SQL stats into the statement stats system table. - s.sqlServer.pgServer.SQLServer.GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) + statsProvider := s.sqlServer.pgServer.SQLServer.GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats) + statsProvider.Flush(ctx) + statsProvider.Stop(ctx) + + // Inform the async tasks for table stats that the node is draining + // and wait for task shutdown. + s.sqlServer.statsRefresher.WaitForAutoStatsShutdown(ctx) + + // Inform the job system that the node is draining and wait for task + // shutdown. + s.sqlServer.jobRegistry.WaitForRegistryShutdown(ctx) // Drain all SQL table leases. This must be done after the pgServer has // given sessions a chance to finish ongoing work. s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter) + // Mark this phase in the logs to clarify the context of any subsequent + // errors/warnings, if any. + log.Infof(ctx, "SQL server drained successfully; SQL queries cannot execute any more") + + // FIXME(Jeff): Add code here to remove the sql_instances row or + // something similar. + + // Mark the node as fully drained. + s.sqlServer.gracefulDrainComplete.Set(true) + // Done. This executes the defers set above to drain SQL leases. return nil } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 1097d4c4bb80..16652914a2c1 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -50,9 +50,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/clientsecopts" "github.com/cockroachdb/cockroach/pkg/security/username" - // Ensure the auto config runner job is registered to avoid log spam. - // Pending merge of https://github.com/cockroachdb/cockroach/pull/98466. - _ "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/server/diagnostics" "github.com/cockroachdb/cockroach/pkg/server/pgurl" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -191,6 +190,11 @@ type SQLServer struct { // This is set to true when the server has started accepting client conns. isReady syncutil.AtomicBool + // gracefulDrainComplete indicates when a graceful drain has + // completed successfully. We use this to document cases where a + // graceful drain did _not_ occur. + gracefulDrainComplete syncutil.AtomicBool + // internalDBMemMonitor is the memory monitor corresponding to the // InternalDB singleton. It only gets closed when // Server is closed. Every Executor created via the factory @@ -1026,6 +1030,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { EventsExporter: cfg.eventsExporter, NodeDescs: cfg.nodeDescs, TenantCapabilitiesReader: cfg.tenantCapabilitiesReader, + // TODO(knz): We will replace this provider by an actual provider + // in a later commit. + AutoConfigProvider: acprovider.NoTaskProvider{}, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { @@ -1102,6 +1109,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if externalConnKnobs := cfg.TestingKnobs.ExternalConnection; externalConnKnobs != nil { execCfg.ExternalConnectionTestingKnobs = externalConnKnobs.(*externalconn.TestingKnobs) } + if autoConfigKnobs := cfg.TestingKnobs.AutoConfig; autoConfigKnobs != nil { + knobs := autoConfigKnobs.(*autoconfig.TestingKnobs) + if knobs.Provider != nil { + execCfg.AutoConfigProvider = knobs.Provider + } + } statsRefresher := stats.MakeRefresher( cfg.AmbientCtx, @@ -1649,6 +1662,24 @@ func (s *SQLServer) preStart( s.execCfg.CaptureIndexUsageStatsKnobs, ) s.execCfg.SyntheticPrivilegeCache.Start(ctx) + + // Report a warning if the server is being shut down via the stopper + // before it was gracefully drained. This warning may be innocuous + // in tests where there is no use of the test server/cluster after + // shutdown; but may be a sign of a problem in production or for + // tests that need to restart a server. + stopper.AddCloser(stop.CloserFn(func() { + if !s.gracefulDrainComplete.Get() { + warnCtx := s.AnnotateCtx(context.Background()) + + if knobs.Server != nil && knobs.Server.(*TestingKnobs).RequireGracefulDrain { + log.Fatalf(warnCtx, "drain required but not performed") + } + + log.Warningf(warnCtx, "server shutdown without a prior graceful drain") + } + })) + return nil } diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index a570b674d1c7..24516c0fca2e 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -134,6 +134,10 @@ type TestingKnobs struct { // We use clusterversion.Key rather than a roachpb.Version because it will be used // to get initial values to use during bootstrap. BootstrapVersionKeyOverride clusterversion.Key + + // RequireGracefulDrain, if set, causes a shutdown to fail with a log.Fatal + // if the server is not gracefully drained prior to its stopper shutting down. + RequireGracefulDrain bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 22f798fb0835..eeb990b86818 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -339,6 +339,7 @@ go_library( "//pkg/security/password", "//pkg/security/sessionrevival", "//pkg/security/username", + "//pkg/server/autoconfig/acprovider", "//pkg/server/pgurl", "//pkg/server/serverpb", "//pkg/server/status/statuspb", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 7b589fb23738..c52816787804 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/server/pgurl" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" @@ -1440,6 +1441,10 @@ type ExecutorConfig struct { NodeDescs kvcoord.NodeDescStore TenantCapabilitiesReader SystemTenantOnly[tenantcapabilities.Reader] + + // AutoConfigProvider informs the auto config runner job of new + // tasks to run. + AutoConfigProvider acprovider.Provider } // UpdateVersionSystemSettingHook provides a callback that allows us diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 16be8bbae875..7bfb6dcb9972 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -115,6 +115,7 @@ type SessionEventListener interface { // to replace a session that has expired and deleted from the table. // TODO(rima): Rename Instance to avoid confusion with sqlinstance.SQLInstance. type Instance struct { + log.AmbientContext clock *hlc.Clock settings *cluster.Settings stopper *stop.Stopper @@ -369,6 +370,7 @@ func (l *Instance) heartbeatLoopInner(ctx context.Context) error { // // sessionEvents, if not nil, gets notified of some session state transitions. func NewSQLInstance( + ambientCtx log.AmbientContext, stopper *stop.Stopper, clock *hlc.Clock, storage Writer, @@ -381,11 +383,12 @@ func NewSQLInstance( } l := &Instance{ - clock: clock, - settings: settings, - storage: storage, - stopper: stopper, - sessionEvents: sessionEvents, + AmbientContext: ambientCtx, + clock: clock, + settings: settings, + storage: storage, + stopper: stopper, + sessionEvents: sessionEvents, ttl: func() time.Duration { return DefaultTTL.Get(&settings.SV) }, @@ -406,7 +409,8 @@ func (l *Instance) Start(ctx context.Context, regionPhysicalRep []byte) { log.Infof(ctx, "starting SQL liveness instance") // Detach from ctx's cancelation. - taskCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + taskCtx := l.AnnotateCtx(context.Background()) + taskCtx = logtags.WithTags(taskCtx, logtags.FromContext(ctx)) _ = l.stopper.RunAsyncTask(taskCtx, "slinstance", l.heartbeatLoop) } diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 185fd87b4cf6..84997e2743c5 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -36,6 +36,7 @@ func TestSQLInstance(t *testing.T) { ctx, stopper := context.Background(), stop.NewStopper() defer stopper.Stop(ctx) + var ambientCtx log.AmbientContext clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 42))) settings := cluster.MakeTestingClusterSettingsWithVersions( clusterversion.TestingBinaryVersion, @@ -45,11 +46,11 @@ func TestSQLInstance(t *testing.T) { slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() - sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) + sqlInstance := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil) sqlInstance.Start(ctx, nil) // Add one more instance to introduce concurrent access to storage. - dummy := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) + dummy := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil) dummy.Start(ctx, nil) s1, err := sqlInstance.Session(ctx) @@ -114,7 +115,8 @@ func TestSQLInstanceWithRegion(t *testing.T) { slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() - sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil, nil) + var ambientCtx log.AmbientContext + sqlInstance := slinstance.NewSQLInstance(ambientCtx, stopper, clock, fakeStorage, settings, nil, nil) sqlInstance.Start(ctx, []byte{42}) s1, err := sqlInstance.Session(ctx) diff --git a/pkg/sql/sqlliveness/slprovider/slprovider.go b/pkg/sql/sqlliveness/slprovider/slprovider.go index 33c9cb25b3c2..95a891116c44 100644 --- a/pkg/sql/sqlliveness/slprovider/slprovider.go +++ b/pkg/sql/sqlliveness/slprovider/slprovider.go @@ -43,7 +43,7 @@ func New( sessionEvents slinstance.SessionEventListener, ) sqlliveness.Provider { storage := slstorage.NewStorage(ambientCtx, stopper, clock, db, codec, settings, settingsWatcher) - instance := slinstance.NewSQLInstance(stopper, clock, storage, settings, testingKnobs, sessionEvents) + instance := slinstance.NewSQLInstance(ambientCtx, stopper, clock, storage, settings, testingKnobs, sessionEvents) return &provider{ Storage: storage, Instance: instance, diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 3a95761a8499..988a6f7ded5f 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -16,6 +16,7 @@ package persistedsqlstats import ( "context" "math/rand" + "sync" "sync/atomic" "time" @@ -68,10 +69,15 @@ type PersistedSQLStats struct { lastFlushStarted time.Time jobMonitor jobMonitor - - atomic struct { + atomic struct { nextFlushAt atomic.Value } + + // drain is closed when a graceful drain is initiated. + drain chan struct{} + setDraining sync.Once + // tasksDoneWG is used to wait for all background tasks to finish. + tasksDoneWG sync.WaitGroup } var _ sqlstats.Provider = &PersistedSQLStats{} @@ -82,6 +88,7 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats { SQLStats: memSQLStats, cfg: cfg, memoryPressureSignal: make(chan struct{}), + drain: make(chan struct{}), } p.jobMonitor = jobMonitor{ @@ -100,19 +107,32 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats { // Start implements sqlstats.Provider interface. func (s *PersistedSQLStats) Start(ctx context.Context, stopper *stop.Stopper) { s.startSQLStatsFlushLoop(ctx, stopper) - s.jobMonitor.start(ctx, stopper) + s.jobMonitor.start(ctx, stopper, s.drain, &s.tasksDoneWG) stopper.AddCloser(stop.CloserFn(func() { s.cfg.InternalExecutorMonitor.Stop(ctx) })) } +// Stop stops the background tasks. This is used during graceful drain +// to quiesce just the SQL activity. +func (s *PersistedSQLStats) Stop(ctx context.Context) { + log.Infof(ctx, "stopping persisted SQL stats tasks") + defer log.Infof(ctx, "persisted SQL stats tasks successfully shut down") + s.setDraining.Do(func() { + close(s.drain) + }) + s.tasksDoneWG.Wait() +} + // GetController returns the controller of the PersistedSQLStats. func (s *PersistedSQLStats) GetController(server serverpb.SQLStatusServer) *Controller { return NewController(s, server, s.cfg.DB) } func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper *stop.Stopper) { - _ = stopper.RunAsyncTask(ctx, "sql-stats-worker", func(ctx context.Context) { + s.tasksDoneWG.Add(1) + err := stopper.RunAsyncTask(ctx, "sql-stats-worker", func(ctx context.Context) { + defer s.tasksDoneWG.Done() var resetIntervalChanged = make(chan struct{}, 1) SQLStatsFlushInterval.SetOnChange(&s.cfg.Settings.SV, func(ctx context.Context) { @@ -142,6 +162,8 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper // In this case, we would restart the loop without performing any flush // and recalculate the flush interval in the for-loop's post statement. continue + case <-s.drain: + return case <-stopper.ShouldQuiesce(): return } @@ -149,6 +171,10 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper s.Flush(ctx) } }) + if err != nil { + s.tasksDoneWG.Done() + log.Warningf(ctx, "failed to start sql-stats-worker: %v", err) + } } // GetLocalMemProvider returns a sqlstats.Provider that can only be used to diff --git a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go index 4ddda076af72..cf5ba9f8e91e 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go +++ b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go @@ -12,6 +12,7 @@ package persistedsqlstats import ( "context" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -67,8 +68,13 @@ type jobMonitor struct { } } -func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) { - _ = stopper.RunAsyncTask(ctx, "sql-stats-scheduled-compaction-job-monitor", func(ctx context.Context) { +func (j *jobMonitor) start( + ctx context.Context, stopper *stop.Stopper, drain chan struct{}, tasksWG *sync.WaitGroup, +) { + tasksWG.Add(1) + err := stopper.RunAsyncTask(ctx, "sql-stats-scheduled-compaction-job-monitor", func(ctx context.Context) { + defer tasksWG.Done() + nextJobScheduleCheck := timeutil.Now() currentRecurrence := SQLStatsCleanupRecurrence.Get(&j.st.SV) @@ -93,7 +99,11 @@ func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) { select { case <-timer.C: timer.Read = true + case <-drain: + // Graceful shutdown. + return case <-stopCtx.Done(): + // Expedited shutdown. return } @@ -109,6 +119,10 @@ func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) { timer.Reset(updateCheckInterval) } }) + if err != nil { + tasksWG.Done() + log.Warningf(ctx, "error starting sql stats scheduled compaction job monitor: %v", err) + } } func (j *jobMonitor) getSchedule( diff --git a/pkg/sql/stats/automatic_stats.go b/pkg/sql/stats/automatic_stats.go index 2de9a1709c1a..883cd8a156fa 100644 --- a/pkg/sql/stats/automatic_stats.go +++ b/pkg/sql/stats/automatic_stats.go @@ -15,6 +15,7 @@ import ( "fmt" "math" "math/rand" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -242,6 +243,15 @@ type Refresher struct { // numTablesEnsured is an internal counter for testing ensureAllTables. numTablesEnsured int + + // drainAutoStats is a channel that is closed when the server starts + // to shut down gracefully. + drainAutoStats chan struct{} + setDraining sync.Once + + // startedTasksWG is a sync group that tracks the auto-stats + // background tasks. + startedTasksWG sync.WaitGroup } // mutation contains metadata about a SQL mutation and is the message passed to @@ -284,6 +294,7 @@ func MakeRefresher( extraTime: time.Duration(rand.Int63n(int64(time.Hour))), mutationCounts: make(map[descpb.ID]int64, 16), settingOverrides: make(map[descpb.ID]catpb.AutoStatsSettings), + drainAutoStats: make(chan struct{}), } } @@ -364,6 +375,23 @@ func (r *Refresher) getTableDescriptor( return desc } +// WaitForAutoStatsShutdown waits for all auto-stats tasks to shut down. +func (r *Refresher) WaitForAutoStatsShutdown(ctx context.Context) { + log.Infof(ctx, "starting to wait for auto-stats tasks to shut down") + defer log.Infof(ctx, "auto-stats tasks successfully shut down") + r.startedTasksWG.Wait() +} + +// SetDraining informs the job system if the node is draining. +// +// NB: Check the implementation of drain before adding code that would +// make this block. +func (r *Refresher) SetDraining() { + r.setDraining.Do(func() { + close(r.drainAutoStats) + }) +} + // Start starts the stats refresher thread, which polls for messages about // new SQL mutations and refreshes the table statistics with probability // proportional to the percentage of rows affected. @@ -371,7 +399,10 @@ func (r *Refresher) Start( ctx context.Context, stopper *stop.Stopper, refreshInterval time.Duration, ) error { bgCtx := r.AnnotateCtx(context.Background()) - _ = stopper.RunAsyncTask(bgCtx, "refresher", func(ctx context.Context) { + r.startedTasksWG.Add(1) + if err := stopper.RunAsyncTask(bgCtx, "refresher", func(ctx context.Context) { + defer r.startedTasksWG.Done() + // We always sleep for r.asOfTime at the beginning of each refresh, so // subtract it from the refreshInterval. refreshInterval -= r.asOfTime @@ -416,8 +447,11 @@ func (r *Refresher) Start( } } + r.startedTasksWG.Add(1) if err := stopper.RunAsyncTask( ctx, "stats.Refresher: maybeRefreshStats", func(ctx context.Context) { + defer r.startedTasksWG.Done() + // Record the start time of processing this batch of tables. start := timeutil.Now() @@ -428,6 +462,8 @@ func (r *Refresher) Start( select { case <-timerAsOf.C: break + case <-r.drainAutoStats: + return case <-stopper.ShouldQuiesce(): return } @@ -470,19 +506,23 @@ func (r *Refresher) Start( explicitSettings = &settings } } - r.maybeRefreshStats(ctx, tableID, explicitSettings, rowsAffected, r.asOfTime) + r.maybeRefreshStats(ctx, stopper, tableID, explicitSettings, rowsAffected, r.asOfTime) select { case <-stopper.ShouldQuiesce(): // Don't bother trying to refresh the remaining tables if we // are shutting down. return + case <-r.drainAutoStats: + // Ditto. + return default: } } timer.Reset(refreshInterval) }); err != nil { - log.Errorf(ctx, "failed to refresh stats: %v", err) + r.startedTasksWG.Done() + log.Errorf(ctx, "failed to start async stats task: %v", err) } // This clears out any tables that may have been added to the // mutationCounts map by ensureAllTables and any mutation counts that @@ -503,12 +543,18 @@ func (r *Refresher) Start( case clusterSettingOverride := <-r.settings: r.settingOverrides[clusterSettingOverride.tableID] = clusterSettingOverride.settings + case <-r.drainAutoStats: + log.Infof(ctx, "draining auto stats refresher") + return case <-stopper.ShouldQuiesce(): log.Info(ctx, "quiescing auto stats refresher") return } } - }) + }); err != nil { + r.startedTasksWG.Done() + log.Warningf(ctx, "refresher task failed to start: %v", err) + } return nil } @@ -676,6 +722,7 @@ func (r *Refresher) NotifyMutation(table catalog.TableDescriptor, rowsAffected i // for this table. func (r *Refresher) maybeRefreshStats( ctx context.Context, + stopper *stop.Stopper, tableID descpb.ID, explicitSettings *catpb.AutoStatsSettings, rowsAffected int64, @@ -734,6 +781,7 @@ func (r *Refresher) maybeRefreshStats( if errors.Is(err, ConcurrentCreateStatsError) { // Another stats job was already running. Attempt to reschedule this // refresh. + var newEvent mutation if mustRefresh { // For the cases where mustRefresh=true (stats don't yet exist or it // has been 2x the average time since a refresh), we want to make sure @@ -741,16 +789,29 @@ func (r *Refresher) maybeRefreshStats( // cycle so that we have another chance to trigger a refresh. We pass // rowsAffected=0 so that we don't force a refresh if another node has // already done it. - r.mutations <- mutation{tableID: tableID, rowsAffected: 0} + newEvent = mutation{tableID: tableID, rowsAffected: 0} } else { // If this refresh was caused by a "dice roll", we want to make sure // that the refresh is rescheduled so that we adhere to the // AutomaticStatisticsFractionStaleRows statistical ideal. We // ensure that the refresh is triggered during the next cycle by // passing a very large number for rowsAffected. - r.mutations <- mutation{tableID: tableID, rowsAffected: math.MaxInt32} + newEvent = mutation{tableID: tableID, rowsAffected: math.MaxInt32} + } + select { + case r.mutations <- newEvent: + return + case <-r.drainAutoStats: + // Shutting down due to a graceful drain. + // We don't want to force a write to the mutations here + // otherwise we could block the graceful shutdown. + err = errors.New("server is shutting down") + case <-stopper.ShouldQuiesce(): + // Shutting down due to direct stopper Stop call. + // This is not strictly required for correctness but + // helps avoiding log spam. + err = errors.New("server is shutting down") } - return } // Log other errors but don't automatically reschedule the refresh, since diff --git a/pkg/sql/stats/automatic_stats_test.go b/pkg/sql/stats/automatic_stats_test.go index 46b758595f89..4186913fe505 100644 --- a/pkg/sql/stats/automatic_stats_test.go +++ b/pkg/sql/stats/automatic_stats_test.go @@ -84,7 +84,7 @@ func TestMaybeRefreshStats(t *testing.T) { // There are no stats yet, so this must refresh the statistics on table t // even though rowsAffected=0. refresher.maybeRefreshStats( - ctx, descA.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descA.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descA, 1 /* expected */); err != nil { t.Fatal(err) @@ -93,7 +93,7 @@ func TestMaybeRefreshStats(t *testing.T) { // Try to refresh again. With rowsAffected=0, the probability of a refresh // is 0, so refreshing will not succeed. refresher.maybeRefreshStats( - ctx, descA.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descA.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descA, 1 /* expected */); err != nil { t.Fatal(err) @@ -103,7 +103,7 @@ func TestMaybeRefreshStats(t *testing.T) { minStaleRows := int64(100000000) explicitSettings := catpb.AutoStatsSettings{MinStaleRows: &minStaleRows} refresher.maybeRefreshStats( - ctx, descA.GetID(), &explicitSettings, 10 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descA.GetID(), &explicitSettings, 10 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descA, 1 /* expected */); err != nil { t.Fatal(err) @@ -114,7 +114,7 @@ func TestMaybeRefreshStats(t *testing.T) { fractionStaleRows := float64(100000000) explicitSettings = catpb.AutoStatsSettings{FractionStaleRows: &fractionStaleRows} refresher.maybeRefreshStats( - ctx, descA.GetID(), &explicitSettings, 10 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descA.GetID(), &explicitSettings, 10 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descA, 1 /* expected */); err != nil { t.Fatal(err) @@ -123,7 +123,7 @@ func TestMaybeRefreshStats(t *testing.T) { // With rowsAffected=10, refreshing should work. Since there are more rows // updated than exist in the table, the probability of a refresh is 100%. refresher.maybeRefreshStats( - ctx, descA.GetID(), nil /* explicitSettings */, 10 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descA.GetID(), nil /* explicitSettings */, 10 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descA, 2 /* expected */); err != nil { t.Fatal(err) @@ -134,7 +134,7 @@ func TestMaybeRefreshStats(t *testing.T) { descRoleOptions := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "system", "role_options") refresher.maybeRefreshStats( - ctx, descRoleOptions.GetID(), nil /* explicitSettings */, 10000 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descRoleOptions.GetID(), nil /* explicitSettings */, 10000 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descRoleOptions, 5 /* expected */); err != nil { t.Fatal(err) @@ -144,7 +144,7 @@ func TestMaybeRefreshStats(t *testing.T) { descLease := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "system", "lease") refresher.maybeRefreshStats( - ctx, descLease.GetID(), nil /* explicitSettings */, 10000 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descLease.GetID(), nil /* explicitSettings */, 10000 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descLease, 0 /* expected */); err != nil { t.Fatal(err) @@ -154,7 +154,7 @@ func TestMaybeRefreshStats(t *testing.T) { descTableStats := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "system", "table_statistics") refresher.maybeRefreshStats( - ctx, descTableStats.GetID(), nil /* explicitSettings */, 10000 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descTableStats.GetID(), nil /* explicitSettings */, 10000 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, descTableStats, 0 /* expected */); err != nil { t.Fatal(err) @@ -165,7 +165,7 @@ func TestMaybeRefreshStats(t *testing.T) { // TODO(rytaft): Should not enqueue views to begin with. descVW := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "vw") refresher.maybeRefreshStats( - ctx, descVW.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), descVW.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) select { case <-refresher.mutations: @@ -461,7 +461,7 @@ func TestAverageRefreshTime(t *testing.T) { // the statistics on table t. With rowsAffected=0, the probability of refresh // is 0. refresher.maybeRefreshStats( - ctx, table.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), table.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, table, 20 /* expected */); err != nil { t.Fatal(err) @@ -511,7 +511,7 @@ func TestAverageRefreshTime(t *testing.T) { // remain (5 from column k and 5 from column v), since the old stats on k // and v were deleted. refresher.maybeRefreshStats( - ctx, table.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ + ctx, s.Stopper(), table.GetID(), nil /* explicitSettings */, 0 /* rowsAffected */, time.Microsecond, /* asOf */ ) if err := checkStatsCount(ctx, cache, table, 10 /* expected */); err != nil { t.Fatal(err) @@ -654,7 +654,7 @@ func TestNoRetryOnFailure(t *testing.T) { // Try to refresh stats on a table that doesn't exist. r.maybeRefreshStats( - ctx, 100 /* tableID */, nil /* explicitSettings */, math.MaxInt32, + ctx, s.Stopper(), 100 /* tableID */, nil /* explicitSettings */, math.MaxInt32, time.Microsecond, /* asOfTime */ ) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index fead6330d32d..011e66fb93b3 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3711,7 +3711,7 @@ var charts = []sectionDescription{ }, }, { - Title: "Auto Config Runner Job", + Title: "Auto Config Top-level Runner Job", Metrics: []string{ "jobs.auto_config_runner.fail_or_cancel_completed", "jobs.auto_config_runner.fail_or_cancel_failed", @@ -3721,6 +3721,28 @@ var charts = []sectionDescription{ "jobs.auto_config_runner.resume_retry_error", }, }, + { + Title: "Auto Config Per-environment Runner Jobs", + Metrics: []string{ + "jobs.auto_config_env_runner.fail_or_cancel_completed", + "jobs.auto_config_env_runner.fail_or_cancel_failed", + "jobs.auto_config_env_runner.fail_or_cancel_retry_error", + "jobs.auto_config_env_runner.resume_completed", + "jobs.auto_config_env_runner.resume_failed", + "jobs.auto_config_env_runner.resume_retry_error", + }, + }, + { + Title: "Auto Config Tasks", + Metrics: []string{ + "jobs.auto_config_task.fail_or_cancel_completed", + "jobs.auto_config_task.fail_or_cancel_failed", + "jobs.auto_config_task.fail_or_cancel_retry_error", + "jobs.auto_config_task.resume_completed", + "jobs.auto_config_task.resume_failed", + "jobs.auto_config_task.resume_retry_error", + }, + }, }, }, {