From 846eba018b875a5a4d03b03e275c0768b36f6e04 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sat, 11 Mar 2023 20:05:14 +0000 Subject: [PATCH] server,autoconfig: automatic configuration via config tasks This change introduces "auto config tasks", a mechanism through which configuration payloads ("tasks") can be injected into a running SQL service. This is driven via the "auto config runner" job that was introduced in the previous commit. The job listens for the arrival of new environment/task definitions via a `Provider` interface. When new environments are known, it spans "env runner" jobs; each waiting for its own tasks. When new tasks are known, and previous tasks have completed, the "env runner" job creates a new separate job for the first next task. Release note: None --- docs/generated/http/BUILD.bazel | 1 + pkg/BUILD.bazel | 6 + pkg/base/testing_knobs.go | 1 + pkg/gen/protobuf.bzl | 1 + pkg/jobs/jobspb/BUILD.bazel | 2 + pkg/jobs/jobspb/jobs.proto | 24 +- pkg/jobs/jobspb/wrap.go | 30 ++- pkg/server/BUILD.bazel | 1 + pkg/server/autoconfig/BUILD.bazel | 48 +++- pkg/server/autoconfig/acprovider/BUILD.bazel | 12 + pkg/server/autoconfig/acprovider/provider.go | 78 ++++++ pkg/server/autoconfig/auto_config.go | 100 ++++++- .../autoconfig/auto_config_env_runner.go | 245 ++++++++++++++++++ pkg/server/autoconfig/auto_config_task.go | 187 +++++++++++++ pkg/server/autoconfig/auto_config_test.go | 201 ++++++++++++++ .../autoconfig/autoconfigpb/BUILD.bazel | 39 +++ .../autoconfig/autoconfigpb/autoconfig.go | 17 ++ .../autoconfig/autoconfigpb/autoconfig.proto | 97 +++++++ pkg/server/autoconfig/doc.go | 162 ++++++++++++ pkg/server/autoconfig/main_test.go | 31 +++ pkg/server/autoconfig/task_markers.go | 214 +++++++++++++++ pkg/server/autoconfig/task_markers_test.go | 112 ++++++++ pkg/server/autoconfig/testing_knobs.go | 19 ++ pkg/server/server_sql.go | 14 +- pkg/sql/BUILD.bazel | 1 + pkg/sql/exec_util.go | 5 + pkg/ts/catalog/chart_catalog.go | 24 +- 27 files changed, 1651 insertions(+), 21 deletions(-) create mode 100644 pkg/server/autoconfig/acprovider/BUILD.bazel create mode 100644 pkg/server/autoconfig/acprovider/provider.go create mode 100644 pkg/server/autoconfig/auto_config_env_runner.go create mode 100644 pkg/server/autoconfig/auto_config_task.go create mode 100644 pkg/server/autoconfig/auto_config_test.go create mode 100644 pkg/server/autoconfig/autoconfigpb/BUILD.bazel create mode 100644 pkg/server/autoconfig/autoconfigpb/autoconfig.go create mode 100644 pkg/server/autoconfig/autoconfigpb/autoconfig.proto create mode 100644 pkg/server/autoconfig/doc.go create mode 100644 pkg/server/autoconfig/main_test.go create mode 100644 pkg/server/autoconfig/task_markers.go create mode 100644 pkg/server/autoconfig/task_markers_test.go create mode 100644 pkg/server/autoconfig/testing_knobs.go diff --git a/docs/generated/http/BUILD.bazel b/docs/generated/http/BUILD.bazel index aaa68e421d2f..ac506ba99659 100644 --- a/docs/generated/http/BUILD.bazel +++ b/docs/generated/http/BUILD.bazel @@ -17,6 +17,7 @@ genrule( "//pkg/multitenant/mtinfopb:mtinfopb_proto", "//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb:tenantcapabilitiespb_proto", "//pkg/roachpb:roachpb_proto", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb_proto", "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_proto", "//pkg/server/serverpb:serverpb_proto", "//pkg/server/status/statuspb:statuspb_proto", diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index eae00a52033a..4dcf66dc70d4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -277,6 +277,7 @@ ALL_TESTS = [ "//pkg/security/username:username_disallowed_imports_test", "//pkg/security/username:username_test", "//pkg/security:security_test", + "//pkg/server/autoconfig:autoconfig_test", "//pkg/server/debug/goroutineui:goroutineui_test", "//pkg/server/debug/pprofui:pprofui_test", "//pkg/server/debug:debug_test", @@ -1432,7 +1433,10 @@ GO_TARGETS = [ "//pkg/security/username:username_test", "//pkg/security:security", "//pkg/security:security_test", + "//pkg/server/autoconfig/acprovider:acprovider", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb", "//pkg/server/autoconfig:autoconfig", + "//pkg/server/autoconfig:autoconfig_test", "//pkg/server/debug/goroutineui:goroutineui", "//pkg/server/debug/goroutineui:goroutineui_test", "//pkg/server/debug/pprofui:pprofui", @@ -2815,6 +2819,8 @@ GET_X_DATA_TARGETS = [ "//pkg/security/username:get_x_data", "//pkg/server:get_x_data", "//pkg/server/autoconfig:get_x_data", + "//pkg/server/autoconfig/acprovider:get_x_data", + "//pkg/server/autoconfig/autoconfigpb:get_x_data", "//pkg/server/debug:get_x_data", "//pkg/server/debug/goroutineui:get_x_data", "//pkg/server/debug/pprofui:get_x_data", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 5183a1f09522..eb6bf757970a 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -56,4 +56,5 @@ type TestingKnobs struct { LOQRecovery ModuleTestingKnobs KeyVisualizer ModuleTestingKnobs TenantCapabilitiesTestingKnobs ModuleTestingKnobs + AutoConfig ModuleTestingKnobs } diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 4aac989b5e90..cccecc398c4b 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -49,6 +49,7 @@ PROTOBUF_SRCS = [ "//pkg/repstream/streampb:streampb_go_proto", "//pkg/roachpb:roachpb_go_proto", "//pkg/rpc:rpc_go_proto", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb_go_proto", "//pkg/server/diagnostics/diagnosticspb:diagnosticspb_go_proto", "//pkg/server/serverpb:serverpb_go_proto", "//pkg/server/status/statuspb:statuspb_go_proto", diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index bbd4e42e80b9..7e821431f20b 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -32,6 +32,7 @@ proto_library( "//pkg/kv/kvpb:kvpb_proto", "//pkg/multitenant/mtinfopb:mtinfopb_proto", "//pkg/roachpb:roachpb_proto", + "//pkg/server/autoconfig/autoconfigpb:autoconfigpb_proto", "//pkg/sql/catalog/descpb:descpb_proto", "//pkg/sql/sessiondatapb:sessiondatapb_proto", "//pkg/util/hlc:hlc_proto", @@ -54,6 +55,7 @@ go_proto_library( "//pkg/multitenant/mtinfopb", "//pkg/roachpb", "//pkg/security/username", # keep + "//pkg/server/autoconfig/autoconfigpb", "//pkg/sql/catalog/catpb", # keep "//pkg/sql/catalog/descpb", "//pkg/sql/sem/tree", # keep diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 4331f9480051..d97ce193bd6d 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -24,6 +24,7 @@ import "sql/sessiondatapb/session_data.proto"; import "util/hlc/timestamp.proto"; import "clusterversion/cluster_version.proto"; import "google/protobuf/timestamp.proto"; +import "server/autoconfig/autoconfigpb/autoconfig.proto"; enum EncryptionMode { Passphrase = 0; @@ -1177,6 +1178,21 @@ message AutoConfigRunnerDetails { message AutoConfigRunnerProgress { } +message AutoConfigEnvRunnerDetails { + string env_id = 1 [(gogoproto.customname) = "EnvID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb.EnvironmentID"]; +} + +message AutoConfigEnvRunnerProgress { +} + +message AutoConfigTaskDetails { + string env_id = 1 [(gogoproto.customname) = "EnvID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb.EnvironmentID"]; + cockroach.server.autoconfig.autoconfigpb.Task task = 2 [(gogoproto.nullable) = false]; +} + +message AutoConfigTaskProgress { +} + message Payload { string description = 1; // If empty, the description is assumed to be the statement. @@ -1236,6 +1252,8 @@ message Payload { PollJobsStatsDetails poll_jobs_stats = 39; AutoConfigRunnerDetails auto_config_runner = 41; + AutoConfigEnvRunnerDetails auto_config_env_runner = 42; + AutoConfigTaskDetails auto_config_task = 43; } reserved 26; // PauseReason is used to describe the reason that the job is currently paused @@ -1263,7 +1281,7 @@ message Payload { // specifies how old such record could get before this job is canceled. int64 maximum_pts_age = 40 [(gogoproto.casttype) = "time.Duration", (gogoproto.customname) = "MaximumPTSAge"]; - // NEXT ID: 42 + // NEXT ID: 44 } message Progress { @@ -1306,6 +1324,8 @@ message Progress { KeyVisualizerProgress keyVisualizerProgress = 27; PollJobsStatsProgress pollJobsStats = 28; AutoConfigRunnerProgress auto_config_runner = 29; + AutoConfigEnvRunnerProgress auto_config_env_runner = 30; + AutoConfigTaskProgress auto_config_task = 31; } uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; @@ -1338,6 +1358,8 @@ enum Type { KEY_VISUALIZER = 18 [(gogoproto.enumvalue_customname) = "TypeKeyVisualizer"]; POLL_JOBS_STATS = 19 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"]; AUTO_CONFIG_RUNNER = 20 [(gogoproto.enumvalue_customname) = "TypeAutoConfigRunner"]; + AUTO_CONFIG_ENV_RUNNER = 21 [(gogoproto.enumvalue_customname) = "TypeAutoConfigEnvRunner"]; + AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"]; } message Job { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index a97443c8b43b..7855a6035605 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -49,6 +49,8 @@ var ( _ Details = SchemaTelemetryDetails{} _ Details = KeyVisualizerDetails{} _ Details = AutoConfigRunnerDetails{} + _ Details = AutoConfigEnvRunnerDetails{} + _ Details = AutoConfigTaskDetails{} ) // ProgressDetails is a marker interface for job progress details proto structs. @@ -70,6 +72,8 @@ var ( _ ProgressDetails = SchemaTelemetryProgress{} _ ProgressDetails = KeyVisualizerProgress{} _ ProgressDetails = AutoConfigRunnerProgress{} + _ ProgressDetails = AutoConfigEnvRunnerProgress{} + _ ProgressDetails = AutoConfigTaskProgress{} ) // Type returns the payload's job type and panics if the type is invalid. @@ -149,6 +153,8 @@ var AutomaticJobTypes = [...]Type{ TypeAutoSchemaTelemetry, TypePollJobsStats, TypeAutoConfigRunner, + TypeAutoConfigEnvRunner, + TypeAutoConfigTask, TypeKeyVisualizer, } @@ -197,6 +203,10 @@ func DetailsType(d isPayload_Details) (Type, error) { return TypePollJobsStats, nil case *Payload_AutoConfigRunner: return TypeAutoConfigRunner, nil + case *Payload_AutoConfigEnvRunner: + return TypeAutoConfigEnvRunner, nil + case *Payload_AutoConfigTask: + return TypeAutoConfigTask, nil default: return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d) } @@ -238,6 +248,8 @@ var JobDetailsForEveryJobType = map[Type]Details{ TypeKeyVisualizer: KeyVisualizerDetails{}, TypePollJobsStats: PollJobsStatsDetails{}, TypeAutoConfigRunner: AutoConfigRunnerDetails{}, + TypeAutoConfigEnvRunner: AutoConfigEnvRunnerDetails{}, + TypeAutoConfigTask: AutoConfigTaskDetails{}, } // WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper @@ -287,6 +299,10 @@ func WrapProgressDetails(details ProgressDetails) interface { return &Progress_PollJobsStats{PollJobsStats: &d} case AutoConfigRunnerProgress: return &Progress_AutoConfigRunner{AutoConfigRunner: &d} + case AutoConfigEnvRunnerProgress: + return &Progress_AutoConfigEnvRunner{AutoConfigEnvRunner: &d} + case AutoConfigTaskProgress: + return &Progress_AutoConfigTask{AutoConfigTask: &d} default: panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d)) } @@ -334,6 +350,10 @@ func (p *Payload) UnwrapDetails() Details { return *d.PollJobsStats case *Payload_AutoConfigRunner: return *d.AutoConfigRunner + case *Payload_AutoConfigEnvRunner: + return *d.AutoConfigEnvRunner + case *Payload_AutoConfigTask: + return *d.AutoConfigTask default: return nil } @@ -381,6 +401,10 @@ func (p *Progress) UnwrapDetails() ProgressDetails { return *d.PollJobsStats case *Progress_AutoConfigRunner: return *d.AutoConfigRunner + case *Progress_AutoConfigEnvRunner: + return *d.AutoConfigEnvRunner + case *Progress_AutoConfigTask: + return *d.AutoConfigTask default: return nil } @@ -452,6 +476,10 @@ func WrapPayloadDetails(details Details) interface { return &Payload_PollJobsStats{PollJobsStats: &d} case AutoConfigRunnerDetails: return &Payload_AutoConfigRunner{AutoConfigRunner: &d} + case AutoConfigEnvRunnerDetails: + return &Payload_AutoConfigEnvRunner{AutoConfigEnvRunner: &d} + case AutoConfigTaskDetails: + return &Payload_AutoConfigTask{AutoConfigTask: &d} default: panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -487,7 +515,7 @@ const ( func (Type) SafeValue() {} // NumJobTypes is the number of jobs types. -const NumJobTypes = 21 +const NumJobTypes = 23 // ChangefeedDetailsMarshaler allows for dependency injection of // cloud.SanitizeExternalStorageURI to avoid the dependency from this diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index fc03ce4c7f81..a1d4de3b17ca 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -160,6 +160,7 @@ go_library( "//pkg/security/securityassets", "//pkg/security/username", "//pkg/server/autoconfig", + "//pkg/server/autoconfig/acprovider", "//pkg/server/debug", "//pkg/server/diagnostics", "//pkg/server/diagnostics/diagnosticspb", diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index c0ba3ae9693b..52b16892af01 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -1,16 +1,60 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "autoconfig", - srcs = ["auto_config.go"], + srcs = [ + "auto_config.go", + "auto_config_env_runner.go", + "auto_config_task.go", + "doc.go", + "task_markers.go", + "testing_knobs.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig", visibility = ["//visibility:public"], deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/security/username", + "//pkg/server/autoconfig/acprovider", + "//pkg/server/autoconfig/autoconfigpb", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/isql", + "//pkg/sql/sessiondata", + "//pkg/util/encoding", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", + ], +) + +go_test( + name = "autoconfig_test", + srcs = [ + "auto_config_test.go", + "main_test.go", + "task_markers_test.go", + ], + args = ["-test.timeout=295s"], + deps = [ + ":autoconfig", + "//pkg/base", + "//pkg/clusterversion", + "//pkg/jobs", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/security/username", + "//pkg/server", + "//pkg/server/autoconfig/acprovider", + "//pkg/server/autoconfig/autoconfigpb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/autoconfig/acprovider/BUILD.bazel b/pkg/server/autoconfig/acprovider/BUILD.bazel new file mode 100644 index 000000000000..4b1fbb05848f --- /dev/null +++ b/pkg/server/autoconfig/acprovider/BUILD.bazel @@ -0,0 +1,12 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "acprovider", + srcs = ["provider.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider", + visibility = ["//visibility:public"], + deps = ["//pkg/server/autoconfig/autoconfigpb"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/server/autoconfig/acprovider/provider.go b/pkg/server/autoconfig/acprovider/provider.go new file mode 100644 index 000000000000..bdb6ddd869ae --- /dev/null +++ b/pkg/server/autoconfig/acprovider/provider.go @@ -0,0 +1,78 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package acprovider + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" +) + +// Provider is an interface through which the auto config runner +// can receive new tasks to run. +// For more details, see the package-level documentation for +// package "autoconfig". +type Provider interface { + // EnvUpdate returns a channel that receives a message any time the + // current set of environments change. The channel receives an + // initial event immediately. + EnvUpdate() <-chan struct{} + + // ActiveEnvironments returns the IDs of environments that have + // tasks available for execution. + ActiveEnvironments() []autoconfigpb.EnvironmentID + + // Peek will block waiting for the first task the provider believes + // still needs to be run for the given environment. It will return + // an error if the context is canceled while waiting. It will + // return ErrNoMoreTasks to indicate the caller does not need + // to process tasks for this environment any more. + Peek(ctx context.Context, env autoconfigpb.EnvironmentID) (autoconfigpb.Task, error) + + // Pop will report the completion of all tasks with ID up to + // completed for the given environment. + Pop(ctx context.Context, env autoconfigpb.EnvironmentID, completed autoconfigpb.TaskID) error +} + +type errNoMoreTasks struct{} + +func (errNoMoreTasks) Error() string { return "no more tasks" } + +// ErrNoMoreTasks can be checked with errors.Is on the result of Peek +// when an environment is not providing tasks for the foreseeable future. +var ErrNoMoreTasks error = errNoMoreTasks{} + +// NoTaskProvider is a stub provider which delivers no tasks. +type NoTaskProvider struct{} + +var _ Provider = NoTaskProvider{} + +// EnvUpdate is part of the Provider interface. +func (NoTaskProvider) EnvUpdate() <-chan struct{} { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return ch +} + +// ActiveEnvironments is part of the Provider interface. +func (NoTaskProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID { return nil } + +// Peek is part of the Provider interface. +func (NoTaskProvider) Peek( + ctx context.Context, _ autoconfigpb.EnvironmentID, +) (autoconfigpb.Task, error) { + return autoconfigpb.Task{}, ErrNoMoreTasks +} + +// Pop is part of the Provider interface. +func (NoTaskProvider) Pop(context.Context, autoconfigpb.EnvironmentID, autoconfigpb.TaskID) error { + return nil +} diff --git a/pkg/server/autoconfig/auto_config.go b/pkg/server/autoconfig/auto_config.go index f067ebaec6c5..221f201e29b9 100644 --- a/pkg/server/autoconfig/auto_config.go +++ b/pkg/server/autoconfig/auto_config.go @@ -15,10 +15,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) +// autoConfigRunner runs the job that accepts new auto configuration +// payloads and sequences the creation of individual jobs to execute +// them. +// The auto config payloads are run by the taskRunner defined in +// auto_config_task.go. +// +// Refer to the package-level documentation for more details. type autoConfigRunner struct { job *jobs.Job } @@ -32,6 +45,12 @@ func (r *autoConfigRunner) OnFailOrCancel( return nil } +// EnvironmentID aliases autoconfigpb.EnvironmentID +type EnvironmentID = autoconfigpb.EnvironmentID + +// TaskID aliases autoconfigpb.TaskID +type TaskID = autoconfigpb.TaskID + // Resume is part of the Resumer interface. func (r *autoConfigRunner) Resume(ctx context.Context, execCtx interface{}) error { // The auto config runner is a forever running background job. @@ -40,23 +59,79 @@ func (r *autoConfigRunner) Resume(ctx context.Context, execCtx interface{}) erro // status. r.job.MarkIdle(true) - wait := make(chan struct{}) + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() - // Note: even though the job registry cancels all running job upon - // shutdown, we find some tests fail unless this job also does its - // own wait. - shutdownCh := execCtx.(sql.JobExecContext).ExecCfg().RPCContext.Stopper.ShouldQuiesce() + // Provider gives us tasks to run. + provider := getProvider(ctx, execCfg) - select { - case <-ctx.Done(): - return ctx.Err() + // waitForEnvChange is the channel that indicates the set of + // environments is updated. + waitForEnvChange := provider.EnvUpdate() - case <-shutdownCh: - return context.Canceled + for { + // No tasks to create. Just wait until some tasks are delivered. + log.Infof(ctx, "waiting for environment activation...") + select { + case <-ctx.Done(): + return ctx.Err() - case <-wait: + case <-waitForEnvChange: + } + + r.job.MarkIdle(false) + for _, envID := range provider.ActiveEnvironments() { + if err := refreshEnvJob(ctx, execCfg, envID); err != nil { + log.Warningf(ctx, "error refreshing environment %q: %v", envID, err) + } + } + r.job.MarkIdle(true) } +} +func getProvider(ctx context.Context, execCfg *sql.ExecutorConfig) acprovider.Provider { + provider := execCfg.AutoConfigProvider + if provider == nil { + panic(errors.AssertionFailedf("programming error: missing provider")) + } + log.Infof(ctx, "using provider with type %T", provider) + return provider +} + +func refreshEnvJob(ctx context.Context, execCfg *sql.ExecutorConfig, envID EnvironmentID) error { + log.Infof(ctx, "refreshing runner job for environment %q", envID) + jobID := execCfg.JobRegistry.MakeJobID() + var jobCreated bool + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // Do we already have a job for this environment? + row, err := txn.QueryRowEx(ctx, + "get-env-runner-job", txn.KV(), sessiondata.NodeUserSessionDataOverride, + `SELECT id FROM system.jobs WHERE job_type = $1 AND created_by_type = $2`, + jobspb.TypeAutoConfigEnvRunner.String(), + makeEnvRunnerJobCreatedKey(envID)) + if err != nil { + return err + } + if row != nil { + log.Infof(ctx, "found existing job %v for environment %q", row[0], envID) + return nil + } + + // The job did not exist yet. Create it now. + if err := createEnvRunnerJob(ctx, txn, execCfg.JobRegistry, jobID, envID); err != nil { + return errors.Wrapf(err, "creating job %d for env %q", jobID, envID) + } + jobCreated = true + return nil + }); err != nil { + return err + } + if jobCreated { + log.Infof(ctx, "created job %d for env %q", jobID, envID) + // Start the job immediately. This speeds up the application + // of initial configuration tasks. + execCfg.JobRegistry.NotifyToResume(ctx, jobID) + } return nil } @@ -67,6 +142,5 @@ func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &autoConfigRunner{job: job} } - jobs.RegisterConstructor(jobspb.TypeAutoConfigRunner, createResumerFn, - jobs.DisablesTenantCostControl) + jobs.RegisterConstructor(jobspb.TypeAutoConfigRunner, createResumerFn, jobs.DisablesTenantCostControl) } diff --git a/pkg/server/autoconfig/auto_config_env_runner.go b/pkg/server/autoconfig/auto_config_env_runner.go new file mode 100644 index 000000000000..024c54d9c6ac --- /dev/null +++ b/pkg/server/autoconfig/auto_config_env_runner.go @@ -0,0 +1,245 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" +) + +// createEnvRunnerJob creates a job to execute tasks for the given +// environment. +// +// Refer to the package-level documentation for more details. +func createEnvRunnerJob( + ctx context.Context, + txn isql.Txn, + registry *jobs.Registry, + jobID jobspb.JobID, + envID EnvironmentID, +) error { + jobRecord := jobs.Record{ + Description: "runs configuration tasks", + Username: username.NodeUserName(), + Details: jobspb.AutoConfigEnvRunnerDetails{EnvID: envID}, + Progress: jobspb.AutoConfigEnvRunnerProgress{}, + CreatedBy: &jobs.CreatedByInfo{ + Name: makeEnvRunnerJobCreatedKey(envID), + }, + NonCancelable: true, + } + + _, err := registry.CreateJobWithTxn(ctx, jobRecord, jobID, txn) + return err +} + +func makeEnvRunnerJobCreatedKey(envID EnvironmentID) string { + const autoConfigEnvRunnerCreatedName = "auto-config-env-runner" + return fmt.Sprintf("%s:%s", autoConfigEnvRunnerCreatedName, envID) +} + +// envRunner is the runner for one task environment. +type envRunner struct { + envID EnvironmentID + job *jobs.Job +} + +var _ jobs.Resumer = (*envRunner)(nil) + +// OnFailOrCancel is a part of the Resumer interface. +func (r *envRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jobErr error) error { + return nil +} + +// Resume is part of the Resumer interface. +func (r *envRunner) Resume(ctx context.Context, execCtx interface{}) error { + ctx = logtags.AddTag(ctx, "taskenv", r.envID) + + // The auto config runner is a forever running background job. + // It's always safe to wind the SQL pod down whenever it's + // running, something we indicate through the job's idle + // status. + r.job.MarkIdle(true) + + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() + + // Provider gives us tasks to run. + provider := getProvider(ctx, execCfg) + + for { + log.Infof(ctx, "waiting for more tasks...") + task, err := provider.Peek(ctx, r.envID) + if err != nil { + if errors.Is(err, acprovider.ErrNoMoreTasks) { + // No more tasks to process. Just stop the runner. + return nil + } + return err + } + + r.job.MarkIdle(false) + var waitSome bool + waitSome, err = r.maybeRunNextTask(ctx, provider, execCfg, task) + if err != nil { + log.Warningf(ctx, "error processing auto config: %v", err) + } + r.job.MarkIdle(true) + if err != nil || waitSome { + // Wait a bit before retrying. + select { + // TODO(knz): Maybe make this delay configurable. + case <-time.After(5 * time.Second): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + } +} + +func (r *envRunner) maybeRunNextTask( + ctx context.Context, + provider acprovider.Provider, + execCfg *sql.ExecutorConfig, + nextTask autoconfigpb.Task, +) (waitSome bool, err error) { + // Wait on any task jobs, if any. + // + // This is an optimization: the logic below would work without this step. + // We do this to avoid waiting too much after a task has completed + // (the logic below waits for a random amount of time in case of conflict). + if err = r.maybeWaitForCurrentTaskJob(ctx, execCfg); err != nil { + return true, err + } + + // The job ID we'll use. + jobID := execCfg.JobRegistry.MakeJobID() + log.Infof(ctx, "allocated job ID %d for next task candidate %d", jobID, nextTask.TaskID) + var nextTaskID TaskID + + err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (resErr error) { + // Re-check if there any other started task already. + otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID) + if err != nil { + return err + } + if otherTaskID != 0 { + // What likely happened is that another node caught up with us, + // observed the job completion for the previous task and already + // created the start marker and job for the next one. + log.Infof(ctx, "found start marker for task %d, unable to start another task", otherTaskID) + // We will simply retry, with some jittered delay. + waitSome = true + return nil + } + + // Find the latest completed task. + lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID) + if err != nil { + return err + } + + // Did another node catch up with us? + if lastTaskID >= nextTask.TaskID { + // Yes. Just tell the provider what we found and retry + // the peek. + return provider.Pop(ctx, r.envID, lastTaskID) + } + + // Can we even start this task? It may have a min version condition. + if !execCfg.Settings.Version.ActiveVersion(ctx).IsActiveVersion(nextTask.MinVersion) { + log.Infof(ctx, "next task %d has min version requirement %v while cluster is at version %v; waiting...", nextTask.TaskID, nextTask.MinVersion, execCfg.Settings.Version.ActiveVersion(ctx)) + waitSome = true + return nil + } + + // Enable the log event at the end of the surrounding function. + nextTaskID = nextTask.TaskID + + // Now we can create the start marker for our next task. + // + // We report the job ID in the value field to help with + // observability and to enable the call to + // maybeWaitForCurrentTaskJob(), which is an optimization. Storing + // the job ID is not strictly required for sequencing the tasks. + if err := writeStartMarker(ctx, txn, + InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID); err != nil { + return errors.Wrapf(err, "unable to write start marker for task %d", nextTaskID) + } + + // Finally, create the job. + if err := createTaskJob(ctx, txn, execCfg.JobRegistry, jobID, r.envID, nextTask); err != nil { + return errors.Wrapf(err, "unable to create job %d for task %d", jobID, nextTaskID) + } + return nil + }) + + if err == nil && nextTaskID != 0 { + log.Infof(ctx, "created job %d for task %d", jobID, nextTaskID) + } + + return waitSome, err +} + +// maybeWaitForCurrentTaskJob checks whether there is a current task +// running (there's a start marker for one); if there is, it waits for +// its job to complete. +func (r *envRunner) maybeWaitForCurrentTaskJob( + ctx context.Context, execCfg *sql.ExecutorConfig, +) error { + var prevJobID jobspb.JobID + var prevTaskID TaskID + + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + var err error + prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID) + return err + }); err != nil { + return errors.Wrap(err, "checking latest task job") + } + + if prevJobID != 0 { + // We have a job already. Just wait for it. + log.Infof(ctx, "waiting for task %d, job %d to complete", prevTaskID, prevJobID) + if err := execCfg.JobRegistry.Run(ctx, []jobspb.JobID{prevJobID}); err != nil { + // Job fail errors here are not hard errors; it may be that + // the job was simply cancelled by the user. + log.Infof(ctx, "previous task job error: %v", err) + } + } + return nil +} + +func init() { + // Note: we disable tenant cost control because auto-config is used + // by operators and should thus not incur costs (or performance + // penalties) to tenants. + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + jd := job.Details() + details := jd.(jobspb.AutoConfigEnvRunnerDetails) + return &envRunner{envID: details.EnvID, job: job} + } + jobs.RegisterConstructor(jobspb.TypeAutoConfigEnvRunner, createResumerFn, jobs.DisablesTenantCostControl) +} diff --git a/pkg/server/autoconfig/auto_config_task.go b/pkg/server/autoconfig/auto_config_task.go new file mode 100644 index 000000000000..ee2c6affc09d --- /dev/null +++ b/pkg/server/autoconfig/auto_config_task.go @@ -0,0 +1,187 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" +) + +// createTaskJob creates a job to execute the given task. +// +// Refer to the package-level documentation for more details. +func createTaskJob( + ctx context.Context, + txn isql.Txn, + registry *jobs.Registry, + jobID jobspb.JobID, + envID EnvironmentID, + task autoconfigpb.Task, +) error { + jobRecord := jobs.Record{ + Description: fmt.Sprintf("configuration task: %s", task.Description), + Username: username.NodeUserName(), + Details: jobspb.AutoConfigTaskDetails{EnvID: envID, Task: task}, + Progress: jobspb.AutoConfigTaskProgress{}, + CreatedBy: &jobs.CreatedByInfo{ + Name: fmt.Sprintf("%s:%s", AutoConfigTaskCreatedName, envID), + ID: int64(task.TaskID), + }, + } + + _, err := registry.CreateJobWithTxn(ctx, jobRecord, jobID, txn) + return err +} + +// AutoConfigTaskCreatedName is the value in created_by_name for jobs +// created by the auto config runner. +const AutoConfigTaskCreatedName = "auto-config-task" + +// taskRunner is the runner for one task. +type taskRunner struct { + envID EnvironmentID + task autoconfigpb.Task +} + +var _ jobs.Resumer = (*taskRunner)(nil) + +// OnFailOrCancel is a part of the Resumer interface. +func (r *taskRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jobErr error) error { + ctx = logtags.AddTag(ctx, "taskenv", r.envID) + ctx = logtags.AddTag(ctx, "task", r.task.TaskID) + log.Infof(ctx, "task execution failed: %v", jobErr) + + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() + provider := getProvider(ctx, execCfg) + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return markTaskComplete(ctx, txn, + InfoKeyTaskRef{Environment: r.envID, Task: r.task.TaskID}, + []byte("task error")) + }); err != nil { + return err + } + + // Tell the provider we're done so the task is not served again to + // the runner. + if err := provider.Pop(ctx, r.envID, r.task.TaskID); err != nil { + // Failing to Pop is not a hard job error: the runner also knows + // how to pop upon finding unexpected completion markers. + log.Warningf(ctx, "error popping the task off the queue: %v", err) + } + return nil +} + +// Resume is part of the Resumer interface. +func (r *taskRunner) Resume(ctx context.Context, execCtx interface{}) error { + ctx = logtags.AddTag(ctx, "taskenv", r.envID) + ctx = logtags.AddTag(ctx, "task", r.task.TaskID) + log.Infof(ctx, "starting execution") + + exec := execCtx.(sql.JobExecContext) + execCfg := exec.ExecCfg() + provider := getProvider(ctx, execCfg) + + switch payload := r.task.GetPayload().(type) { + case *autoconfigpb.Task_SimpleSQL: + if err := execSimpleSQL(ctx, execCfg, r.envID, r.task.TaskID, payload.SimpleSQL); err != nil { + return err + } + + default: + return errors.AssertionFailedf("unknown task payload type: %T", payload) + } + + // Tell the provider we're done so the task is not served again to + // the runner. + if err := provider.Pop(ctx, r.envID, r.task.TaskID); err != nil { + // Failing to Pop is not a hard job error: the runner also knows + // how to pop upon finding unexpected completion markers. + log.Warningf(ctx, "error popping the task off the queue: %v", err) + } + return nil +} + +// execSimpleSQL executs a SQL payload a a single combined transaction +// that also includes the removal of the start marker and the creation +// of the completion marker. +func execSimpleSQL( + ctx context.Context, + execCfg *sql.ExecutorConfig, + envID EnvironmentID, + taskID TaskID, + sqlPayload *autoconfigpb.SimpleSQL, +) error { + // Which SQL identity to use. + sqlUsername := username.RootUserName() + if sqlPayload.UsernameProto != "" { + sqlUsername = sqlPayload.UsernameProto.Decode() + } + execOverride := sessiondata.InternalExecutorOverride{ + User: sqlUsername, + } + // First execute all the non-transactional, idempotent SQL statements. + if len(sqlPayload.NonTransactionalStatements) > 0 { + exec := execCfg.InternalDB.Executor() + for _, stmt := range sqlPayload.NonTransactionalStatements { + log.Infof(ctx, "attempting execution of non-txn task statement:\n%s", stmt) + _, err := exec.ExecEx(ctx, "exec-task-statement", nil, /* txn */ + execOverride, stmt) + if err != nil { + return err + } + } + log.Infof(ctx, "finished executing non-txn task statements") + } + + // Now execute all the transactional, potentially not idempotent + // statements. + return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + for _, stmt := range sqlPayload.TransactionalStatements { + log.Infof(ctx, "attempting execution of task statement:\n%s", stmt) + _, err := txn.ExecEx(ctx, "exec-task-statement", + txn.KV(), + execOverride, + stmt) + if err != nil { + return err + } + } + log.Infof(ctx, "finished executing txn statements") + return markTaskComplete(ctx, txn, + InfoKeyTaskRef{Environment: envID, Task: taskID}, + []byte("task success")) + }) +} + +func init() { + // Note: we disable tenant cost control because auto-config is used + // by operators and should thus not incur costs (or performance + // penalties) to tenants. + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + jd := job.Details() + details := jd.(jobspb.AutoConfigTaskDetails) + return &taskRunner{envID: details.EnvID, task: details.Task} + } + jobs.RegisterConstructor(jobspb.TypeAutoConfigTask, createResumerFn, jobs.DisablesTenantCostControl) +} diff --git a/pkg/server/autoconfig/auto_config_test.go b/pkg/server/autoconfig/auto_config_test.go new file mode 100644 index 000000000000..f927584c7387 --- /dev/null +++ b/pkg/server/autoconfig/auto_config_test.go @@ -0,0 +1,201 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +const testEnvID autoconfigpb.EnvironmentID = "my test env" + +type testProvider struct { + t *testing.T + notifyCh chan struct{} + peekWaitCh chan struct{} + tasks []testTask +} + +type testTask struct { + task autoconfigpb.Task + seen bool +} + +var testTasks = []testTask{ + {task: autoconfigpb.Task{ + TaskID: 123, + Description: "test task that creates a system table", + MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + UsernameProto: username.NodeUserName().EncodeProto(), + NonTransactionalStatements: []string{ + "CREATE TABLE IF NOT EXISTS system.foo(x INT)", + // This checks that the non-txn part works properly: SET + // CLUSTER SETTING can only be run outside of explicit txns. + "SET CLUSTER SETTING cluster.organization = 'woo'", + }, + }, + }, + }}, + {task: autoconfigpb.Task{ + TaskID: 345, + Description: "test task that fails with an error", + MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + TransactionalStatements: []string{"SELECT invalid"}, + }, + }, + }}, + {task: autoconfigpb.Task{ + TaskID: 456, + Description: "test task that creates another system table", + MinVersion: clusterversion.ByKey(clusterversion.V23_1Start), + Payload: &autoconfigpb.Task_SimpleSQL{ + SimpleSQL: &autoconfigpb.SimpleSQL{ + UsernameProto: username.NodeUserName().EncodeProto(), + NonTransactionalStatements: []string{"CREATE TABLE IF NOT EXISTS system.bar(y INT)"}, + }, + }, + }}, +} + +func (p *testProvider) EnvUpdate() <-chan struct{} { + p.t.Logf("runner has registered env update channel") + return p.notifyCh +} + +func (p *testProvider) ActiveEnvironments() []autoconfigpb.EnvironmentID { + return []autoconfigpb.EnvironmentID{testEnvID} +} + +func (p *testProvider) Pop( + _ context.Context, envID autoconfigpb.EnvironmentID, taskID autoconfigpb.TaskID, +) error { + p.t.Logf("runner reports completed task %d (env %q)", taskID, envID) + for len(p.tasks) > 0 { + if taskID >= p.tasks[0].task.TaskID { + p.t.Logf("popping task %d from queue", p.tasks[0].task.TaskID) + p.tasks = p.tasks[1:] + continue + } + break + } + return nil +} + +func (p *testProvider) Peek( + ctx context.Context, envID autoconfigpb.EnvironmentID, +) (autoconfigpb.Task, error) { + p.t.Logf("runner peeking (env %q)", envID) + if len(p.tasks) == 0 { + return autoconfigpb.Task{}, acprovider.ErrNoMoreTasks + } + if !p.tasks[0].seen { + // seen ensures that the runner job won't have to wait a second + // time when peeking the task. + select { + case <-ctx.Done(): + return autoconfigpb.Task{}, ctx.Err() + case <-p.peekWaitCh: + } + } + p.tasks[0].seen = true + return p.tasks[0].task, nil +} + +func TestAutoConfig(t *testing.T) { + defer leaktest.AfterTest(t)() + + provider := &testProvider{ + t: t, + notifyCh: make(chan struct{}, 1), + peekWaitCh: make(chan struct{}), + tasks: testTasks, + } + provider.notifyCh <- struct{}{} + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + AutoConfig: &autoconfig.TestingKnobs{ + Provider: provider, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + t.Logf("waiting for runner job...") + testutils.SucceedsSoon(t, func() error { + var jobID int64 + if err := sqlDB.QueryRowContext(ctx, `SELECT id FROM system.jobs WHERE id = $1`, + jobs.AutoConfigRunnerJobID).Scan(&jobID); err != nil { + return err + } + t.Logf("found runner job: %d", jobID) + return nil + }) + + waitForTaskCompleted := func(taskID autoconfigpb.TaskID) (result []byte) { + taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnvID, Task: taskID} + completionMarker := taskRef.EncodeCompletionMarkerKey() + testutils.SucceedsSoon(t, func() error { + err := sqlDB.QueryRowContext(ctx, ` +SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2 LIMIT 1`, + jobs.AutoConfigRunnerJobID, + completionMarker).Scan(&result) + if err != nil { + return err + } + t.Logf("found task completion: %q", string(result)) + return nil + }) + return result + } + + provider.peekWaitCh <- struct{}{} + t.Logf("waiting for first task completion marker...") + result := waitForTaskCompleted(testTasks[0].task.TaskID) + require.Equal(t, []byte("task success"), result) + + t.Logf("check that the effects of the first task are visible") + var unused int + err := sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.foo`).Scan(&unused) + require.NoError(t, err) + + provider.peekWaitCh <- struct{}{} + t.Logf("waiting for 2nd task completion marker...") + result = waitForTaskCompleted(testTasks[1].task.TaskID) + require.Equal(t, []byte("task error"), result) + + provider.peekWaitCh <- struct{}{} + t.Logf("waiting for 3rd task completion marker...") + result = waitForTaskCompleted(testTasks[2].task.TaskID) + require.Equal(t, []byte("task success"), result) + + t.Logf("check that the effects of the first tasks are visible") + err = sqlDB.QueryRowContext(ctx, `SELECT count(*) FROM system.bar`).Scan(&unused) + require.NoError(t, err) +} diff --git a/pkg/server/autoconfig/autoconfigpb/BUILD.bazel b/pkg/server/autoconfig/autoconfigpb/BUILD.bazel new file mode 100644 index 000000000000..576051229d26 --- /dev/null +++ b/pkg/server/autoconfig/autoconfigpb/BUILD.bazel @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("//build:STRINGER.bzl", "stringer") + +proto_library( + name = "autoconfigpb_proto", + srcs = ["autoconfig.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:roachpb_proto", + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + ], +) + +go_proto_library( + name = "autoconfigpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb", + proto = ":autoconfigpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/security/username", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "autoconfigpb", + srcs = ["autoconfig.go"], + embed = [":autoconfigpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/server/autoconfig/autoconfigpb/autoconfig.go b/pkg/server/autoconfig/autoconfigpb/autoconfig.go new file mode 100644 index 000000000000..347fd8fd3ac1 --- /dev/null +++ b/pkg/server/autoconfig/autoconfigpb/autoconfig.go @@ -0,0 +1,17 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfigpb + +// EnvironmentID is the type of an environment that provides tasks. +type EnvironmentID string + +// TaskID is the type of a task ID. +type TaskID uint64 diff --git a/pkg/server/autoconfig/autoconfigpb/autoconfig.proto b/pkg/server/autoconfig/autoconfigpb/autoconfig.proto new file mode 100644 index 000000000000..2f3ea9a220b8 --- /dev/null +++ b/pkg/server/autoconfig/autoconfigpb/autoconfig.proto @@ -0,0 +1,97 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.server.autoconfig.autoconfigpb; +option go_package = "github.com/cockroachdb/cockroach/pkg/server/autoconfig/autoconfigpb"; + +import "gogoproto/gogo.proto"; +import "roachpb/metadata.proto"; + +// TaskBundle contains tasks defined by multiple environments. +// See the package-level documentation for package autoconfig +// for more details. +message TaskBundle { + option (gogoproto.equal) = true; + + // EnvTasks defines the tasks provided by one environment. + // The tasks of one environment execute sequentially. + message EnvTasks { + option (gogoproto.equal) = true; + string env_id = 1 [(gogoproto.customname) = "EnvID", (gogoproto.casttype) = "EnvironmentID"]; + repeated Task tasks = 2 [(gogoproto.nullable) = false]; + // NEXT ID: 3; + } + + // Bundles defines tasks for different environments. There should + // be only one entry in the array per environment ID. + // Tasks from different environments can execute concurrently. + repeated EnvTasks bundles = 1 [(gogoproto.nullable) = false]; + + // NEXT ID: 2; +} + +// Task defines one auto config task to run by a tenant. +message Task { + option (gogoproto.equal) = true; + + // TaskID is the key for this task within one environment. + // Tasks with a given ID are executed at least once, with + // extra efforts to make it exactly-once whenever possible. + // Tasks within the same environment are executed sequentially + // in the order of their task ID. + // See the package-level documentation for package autoconfig + // for more details. + uint64 task_id = 1 [(gogoproto.customname) = "TaskID", (gogoproto.casttype) = "TaskID"]; + + // Description of what the auto configuration task does. + // This is reported unredactable in logs and telemetry and thus must be void of PII. + string description = 2; + + // MinVersion is the minimum active cluster version that + // must be encountered before this task can run. + roachpb.Version min_version = 3 [(gogoproto.nullable) = false]; + + // Payload describes what the task should run. + oneof payload { + SimpleSQL simpleSQL = 4; + } + + // NEXT ID: 5; +} + +// SimpleSQL describes a combination of some optional +// non-transactional SQL statements (which should be idempotent), and +// some optional transactional SQL statements. +// +// The SQL is executed using the node user identity. The task fails if the +// SQL aborts with an error and is not auto-retried. +message SimpleSQL { + option (gogoproto.equal) = true; + + // SQL identity to run the statements as. + // If left empty, execution will use the 'root' identity. + string username_proto = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; + + + // SQL statements to execute as standalone transactions. These are + // executed before the transactional statements below. If any of + // these fails or the transactional statements below need to be + // retried, all the non-transactional statements will be executed + // again. They should be defined to only contain idempotent SQL, + // for example SHOW CLUSTER SETTING or CREATE TABLE IF NOT EXISTS. + repeated string non_transactional_statements = 2; + + // SQL statements to execute transactionally with the placement task + // completion marker. This will be executed at most once. + repeated string transactional_statements = 3; + + // NEXT ID: 4; +} diff --git a/pkg/server/autoconfig/doc.go b/pkg/server/autoconfig/doc.go new file mode 100644 index 000000000000..f2eeb94e3f87 --- /dev/null +++ b/pkg/server/autoconfig/doc.go @@ -0,0 +1,162 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package autoconfig defines a mechanism through which the SQL service +// for a tenant (including, possibly, the system tenant) can run +// SQL tasks defined externally only once, using jobs. +// +// The main purposes of this logic are: +// +// - To support SQL-based initialization of a cluster's parameters +// (including cluster settings, initial user creation, initial +// db creation) prior to accepting application clients. +// - To support logical cluster maintainance by CockroachCloud +// SREs for Serverless customers without incurring RU billing. +// - To support the automation of maintainance operations across +// a fleet of secondary tenants (such as the creation of +// multi-region system tables when initializing a multi-region +// CC serverless cluster). +// +// In a nutshell, the mechanism works as follows: +// +// - There is a permanent job called the 'auto config runner'. +// - The runner job accepts task definitions from zero or more +// "external environments" (see below for details), and organizes +// their execution by creating individual 'auto config task' jobs. +// - Each auto config task job runs one task; it is cancellable +// by the end user (see below for a note about cancellability). +// - The runner job only creates the job for the next task after the +// job for the previous task by a given environment has fully +// completed (either successfully, with an error, or has been +// cancelled). +// +// # Task provenance - data model +// +// We plan to use tasks for various purposes (troubleshooting, +// migrations, etc.) and so we need sequential semantics: a guarantee +// that a task doesn't start before the previous one has completed. +// +// However, if we did this for all tasks we would have a problem +// when a task gets stuck - then the entire system wouldn't make +// progress any more. +// +// To reconcile our desire for sequences and preserve our ability +// to "repair" broken situations, we also introduce a notion of +// *environment* where tasks are defined: +// +// - an environment identifies a potential source of tasks. +// - tasks are grouped by environment; or rather, each environment +// provisions its own tasks (potentially at different rates). +// - the execution of tasks within one environment is sequential. +// - the execution of tasks of separate environments is independent, +// and so different environments can execute tasks concurrently +// with each other. +// - the system supports the dynamic addition of new environments +// (by using a yet-unused environment name for new tasks). +// +// This makes it possible to "abandon" an environment whose tasks +// got stuck and start tasks using a new one. +// +// # Task provenance - mechanism +// +// The auto config runner job takes its input from a "task provider" +// (Go interface `Provider`, from the package `acprovider`). The task +// provider has a notification channel and an accessor that lists +// environments with pending tasks ("active"). +// +// For each active environment, the top-level runner creates a runner +// job specific to that environment. Each environment-specific runner +// then retrieves tasks from the provider to run them. +// +// In practice, there are 2 main task providers envisioned at the time +// of this writing: +// +// - A provider of static configuration tasks for the system tenant, +// via hardcoded "configuration profiles" selected when a cluster is +// initially created. +// - A provider of dynamic configuration asks for secondary tenants, +// provisioned through a RPC in the "tenant connector". +// +// # Task definition and execution guarantees +// +// Under normal conditions, and barring explicit contrarian action by +// the user, the runner logic provides at-least-once execution +// semantics, and tries hard to provide exactly-once semantics. (As +// the theory tells us that exactly-once is not possible, see below +// for a discussion of exceptions.) It also guarantees that auto +// config tasks from the same environment are not executed +// concurrently with each other, with at most one node executing a +// task at a time. +// +// In a nutshell, the mechanism works as follows: +// +// - Tasks are numbered sequentially (task ID); it is the +// responsibility of "the environment" (the task provider) to +// guarantee that later tasks have greater IDs than earlier tasks. +// - The runner uses "started" and "completed" markers in the +// system.job_info table to ensure that no more than one job is +// created for a given task. +// +// More specifically, when the runner is ready to execute a task, it +// transactionally: +// +// 1. checks that there is no in-flight task already. +// (e.g. one started by the runner on another node) +// 2. retrieves the latest completion marker by task ID from job_info, +// (indicates the latest task completed by ID) +// 3. it ratchets the queue of incoming tasks forward +// to skip over all tasks with an ID lower or equal to the +// last task completed. +// 4. it creates a "auto config task" job entry for that task +// and places a start marker for it in job_info. +// +// Then, the "auto config task" executes the SQL in the task +// transactionally with the removal of its start marker and the +// creation of its completion marker. +// +// # Possible exceptions to the "exactly-once" semantics +// +// The tasks are executed as jobs: +// +// - if a task's job is cancelled by the user before it runs, its +// side-effects will never be observed. +// - if a node crashes while a task job is executing, it +// will be restarted later (either after this node restarts, +// or by being adopted by another node). So any +// side effects not otherwise aborted by the transaction +// supporting the task's execution may be observed more than once. +// This includes e.g. changes to cluster settings. +// +// # Task errors vs progress across tasks +// +// Note that if a task fail, its job is marked as permanently failed +// and it will not be retried. This also goes for tasks that have been +// canceled by the user. +// +// Of note, a task's failure does not prevent the next task from +// starting. +// +// This begs the question: what if the environment wishes to mark +// a task as "required", such that no progress is allowed in subsequent +// tasks until the required task completes successfully? +// +// While this is not yet supported in this package, a solution +// would probably contain the following: +// +// - the task's job would be marked as non-cancellable by the +// end-user. +// - the task's job would not be marked as permanently failed +// upon encountering execution failures, so it gets retried. +// - we would need another mechanism to cancel execution +// of that task and its environment in case the operator +// wants to abandon the whole sequence. +// +// This enhancement is left to a later iteration. +package autoconfig diff --git a/pkg/server/autoconfig/main_test.go b/pkg/server/autoconfig/main_test.go new file mode 100644 index 000000000000..2e8cfa7a970b --- /dev/null +++ b/pkg/server/autoconfig/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/server/autoconfig/task_markers.go b/pkg/server/autoconfig/task_markers.go new file mode 100644 index 000000000000..93a85b51fab6 --- /dev/null +++ b/pkg/server/autoconfig/task_markers.go @@ -0,0 +1,214 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import ( + "context" + "encoding/hex" + "strconv" + "strings" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/errors" +) + +// infoKeyCompletionPrefix is the prefix of the key inserted in job_info +// when a task has completed. +const infoKeyCompletionPrefix = "completed-" + +// infoKeyStartPrefix is the prefix of the key inserted in job_info +// when a task has started. +const infoKeyStartPrefix = "started-" + +// InfoKeyStartPrefix returns the info_key scan start key for +// all task start markers for the given environment. +func InfoKeyStartPrefix(env EnvironmentID) string { + var buf strings.Builder + buf.Grow(len(infoKeyStartPrefix) + len(env)) + buf.WriteString(infoKeyStartPrefix) + buf.WriteString(string(env)) + return buf.String() +} + +// InfoKeyCompletionPrefix returns the info_key scan start key for +// all task completion markers for the given environment. +func InfoKeyCompletionPrefix(env EnvironmentID) string { + var buf strings.Builder + buf.Grow(len(infoKeyCompletionPrefix) + len(env)) + buf.WriteString(infoKeyCompletionPrefix) + buf.WriteString(string(env)) + return buf.String() +} + +// InfoKeyTaskRef represents the reference to a task stored in +// job_info task markers. +type InfoKeyTaskRef struct { + Environment EnvironmentID + Task TaskID +} + +// EncodeStartMarker creates a job_info info key that identifies +// a start marker for this task. +func (tr *InfoKeyTaskRef) EncodeStartMarkerKey() string { + return tr.encodeInternal(infoKeyStartPrefix) +} + +// DecodeStartMarker decodes a job_info info key that identifies a +// start marker for this task. +func (tr *InfoKeyTaskRef) DecodeStartMarkerKey(infoKey string) error { + return tr.decodeInternal(infoKeyStartPrefix, infoKey) +} + +// EncodeCompletionMarker creates a job_info info key that identifies +// a completion marker for this task. +func (tr *InfoKeyTaskRef) EncodeCompletionMarkerKey() string { + return tr.encodeInternal(infoKeyCompletionPrefix) +} + +// DecodeCompletionMarker decodes a job_info info key that identifies +// a completion marker for this task. +func (tr *InfoKeyTaskRef) DecodeCompletionMarkerKey(infoKey string) error { + return tr.decodeInternal(infoKeyCompletionPrefix, infoKey) +} + +func (tr *InfoKeyTaskRef) encodeInternal(prefix string) string { + orderedKeyBytes := make([]byte, 0, len(tr.Environment)+10) + orderedKeyBytes = encoding.EncodeStringAscending(orderedKeyBytes, string(tr.Environment)) + orderedKeyBytes = encoding.EncodeUvarintAscending(orderedKeyBytes, uint64(tr.Task)) + + var buf strings.Builder + buf.Grow(len(prefix) + hex.EncodedLen(len(orderedKeyBytes))) + buf.WriteString(prefix) + hexEncode := hex.NewEncoder(&buf) + _, err := hexEncode.Write(orderedKeyBytes) + if err != nil { + // This can't happen because strings.Builder always auto-grows. + panic(errors.HandleAsAssertionFailure(err)) + } + + return buf.String() +} + +func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error { + if !strings.HasPrefix(infoKey, prefix) { + return errors.AssertionFailedf("programming error: prefix %q missing: %q", prefix, infoKey) + } + infoKey = infoKey[len(prefix):] + bytes, err := hex.DecodeString(infoKey) + if err != nil { + return errors.Wrapf(err, "decoding hex-encoded info key (%q)", infoKey) + } + rest, s, err := encoding.DecodeUnsafeStringAscendingDeepCopy(bytes, nil) + if err != nil { + return errors.Wrap(err, "decoding environment from task info key") + } + _, v, err := encoding.DecodeUvarintAscending(rest) + if err != nil { + return errors.Wrap(err, "decoding task ID from task info key") + } + tr.Environment = EnvironmentID(s) + tr.Task = TaskID(v) + return nil +} + +// writeStartMarker writes a start marker for the given task ID and +// also writes its job ID into the value part. +func writeStartMarker( + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, +) error { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + return infoStorage.Write(ctx, + taskRef.EncodeStartMarkerKey(), + []byte(strconv.FormatUint(uint64(jobID), 10))) +} + +// getCurrentlyStartedTaskID retrieves the ID of the last task which +// has a start marker in job_info. +func getCurrentlyStartedTaskID( + ctx context.Context, txn isql.Txn, env EnvironmentID, +) (prevTaskID TaskID, prevJobID jobspb.JobID, err error) { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + + if err := infoStorage.GetLast(ctx, + InfoKeyStartPrefix(env), + func(infoKey string, value []byte) error { + var taskRef InfoKeyTaskRef + if err := taskRef.DecodeStartMarkerKey(infoKey); err != nil { + return errors.Wrapf(err, "decoding info key (%q)", infoKey) + } + prevTaskID = taskRef.Task + + // Also retrieve is job ID from the value bytes. + jid, err := strconv.ParseInt(string(value), 10, 64) + if err != nil { + return errors.Wrapf(err, + "while decoding value (%q) for start marker for task %d", + string(value), prevTaskID) + } + prevJobID = jobspb.JobID(jid) + return nil + }); err != nil { + return 0, 0, errors.Wrap(err, "finding last task start marker") + } + + return prevTaskID, prevJobID, nil +} + +// getLastCompletedTaskID retrieves the task ID of the last task which +// has a completion marker in job_info. +func getLastCompletedTaskID( + ctx context.Context, txn isql.Txn, env EnvironmentID, +) (lastTaskID TaskID, err error) { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + + if err := infoStorage.GetLast(ctx, + InfoKeyCompletionPrefix(env), + func(infoKey string, value []byte) error { + // There's a task. + var taskRef InfoKeyTaskRef + if err := taskRef.DecodeCompletionMarkerKey(infoKey); err != nil { + return errors.Wrapf(err, "decoding info key (%q)", infoKey) + } + lastTaskID = taskRef.Task + return nil + }); err != nil { + return 0, errors.Wrap(err, "finding last task completion marker") + } + + return lastTaskID, nil +} + +// markTaskCompletes transactionally removes the task's start marker +// and creates a completion marker. +func markTaskComplete( + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, +) error { + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + + // Remove the start marker. + if err := infoStorage.Delete(ctx, taskRef.EncodeStartMarkerKey()); err != nil { + return err + } + + // Remove any previous completion marker. This avoids the + // accumulation of past completion markers over time. + completionKeyPrefix := InfoKeyCompletionPrefix(taskRef.Environment) + completionInfoKey := taskRef.EncodeCompletionMarkerKey() + if err := infoStorage.DeleteRange(ctx, completionKeyPrefix, completionInfoKey); err != nil { + return err + } + + // Add our completion marker. + return infoStorage.Write(ctx, completionInfoKey, completionValue) +} diff --git a/pkg/server/autoconfig/task_markers_test.go b/pkg/server/autoconfig/task_markers_test.go new file mode 100644 index 000000000000..cc82ae90090c --- /dev/null +++ b/pkg/server/autoconfig/task_markers_test.go @@ -0,0 +1,112 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig_test + +import ( + "fmt" + "math" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +// TestEncodeDecodeMarkers tests that the result of the encode +// functions can be processed by the decode functions. +func TestEncodeDecodeMarkers(t *testing.T) { + defer leaktest.AfterTest(t)() + + for _, testEnv := range []autoconfig.EnvironmentID{"", "foo", "bar"} { + for _, testTask := range []autoconfig.TaskID{0, 1, 10, math.MaxUint64} { + t.Run(fmt.Sprintf("%s/%d", testEnv, testTask), func(t *testing.T) { + taskRef := autoconfig.InfoKeyTaskRef{Environment: testEnv, Task: testTask} + + encodedStart := taskRef.EncodeStartMarkerKey() + var tr autoconfig.InfoKeyTaskRef + require.NoError(t, tr.DecodeStartMarkerKey(encodedStart)) + require.Equal(t, taskRef, tr) + + require.Error(t, tr.DecodeCompletionMarkerKey(encodedStart)) + + encodedComplete := taskRef.EncodeCompletionMarkerKey() + var tr2 autoconfig.InfoKeyTaskRef + require.NoError(t, tr2.DecodeCompletionMarkerKey(encodedComplete)) + require.Equal(t, taskRef, tr2) + + require.Error(t, tr2.DecodeStartMarkerKey(encodedComplete)) + }) + } + } +} + +// TestMarkerOrdering tests that the ordering of the encoded markers +// is the same as the ordering of the task IDs. +func TestMarkerOrdering(t *testing.T) { + defer leaktest.AfterTest(t)() + + var taskRefs []autoconfig.InfoKeyTaskRef + for _, testEnv := range []autoconfig.EnvironmentID{"", "bar", "foo"} { + for i := 0; i < 10; i++ { + taskRefs = append(taskRefs, autoconfig.InfoKeyTaskRef{Environment: testEnv, Task: autoconfig.TaskID(i)}) + } + } + + // Copy taskRefs to taskRefsRandom. + taskRefsRandom := make([]autoconfig.InfoKeyTaskRef, len(taskRefs)) + copy(taskRefsRandom, taskRefs) + + // Randomize the ordering of taskRefsRandom. + r, _ := randutil.NewTestRand() + for i := range taskRefsRandom { + j := i + int(r.Int31n(int32(len(taskRefsRandom)-i))) + taskRefsRandom[i], taskRefsRandom[j] = taskRefsRandom[j], taskRefsRandom[i] + } + + // Get start marker encodings for all taskRefs. + var startMarkers []string + for _, taskRef := range taskRefsRandom { + startMarkers = append(startMarkers, taskRef.EncodeStartMarkerKey()) + } + // Sort the start markers. This should be the same as sorting the taskRefs. + sort.Strings(startMarkers) + + // Decode the start markers. + var decodedStartMarkers []autoconfig.InfoKeyTaskRef + for _, startMarker := range startMarkers { + var taskRef autoconfig.InfoKeyTaskRef + require.NoError(t, taskRef.DecodeStartMarkerKey(startMarker)) + decodedStartMarkers = append(decodedStartMarkers, taskRef) + } + + // Check that the decoded start markers are the same as the original taskRefs. + require.Equal(t, taskRefs, decodedStartMarkers) + + // Get completion marker encodings for all taskRefs. + var completionMarkers []string + for _, taskRef := range taskRefsRandom { + completionMarkers = append(completionMarkers, taskRef.EncodeCompletionMarkerKey()) + } + // Sort the completion markers. This should be the same as sorting the taskRefs. + sort.Strings(completionMarkers) + + // Decode the completion markers. + var decodedCompletionMarkers []autoconfig.InfoKeyTaskRef + for _, completionMarker := range completionMarkers { + var taskRef autoconfig.InfoKeyTaskRef + require.NoError(t, taskRef.DecodeCompletionMarkerKey(completionMarker)) + decodedCompletionMarkers = append(decodedCompletionMarkers, taskRef) + } + // Check that the decoded completion markers are the same as the original taskRefs. + require.Equal(t, taskRefs, decodedCompletionMarkers) +} diff --git a/pkg/server/autoconfig/testing_knobs.go b/pkg/server/autoconfig/testing_knobs.go new file mode 100644 index 000000000000..37710e749072 --- /dev/null +++ b/pkg/server/autoconfig/testing_knobs.go @@ -0,0 +1,19 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package autoconfig + +import "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" + +type TestingKnobs struct { + Provider acprovider.Provider +} + +func (*TestingKnobs) ModuleTestingKnobs() {} diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 262e4d953964..92ca93dbd824 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -50,9 +50,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/clientsecopts" "github.com/cockroachdb/cockroach/pkg/security/username" - // Ensure the auto config runner job is registered to avoid log spam. - // Pending merge of https://github.com/cockroachdb/cockroach/pull/98466. - _ "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/server/diagnostics" "github.com/cockroachdb/cockroach/pkg/server/pgurl" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -1023,6 +1022,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { EventsExporter: cfg.eventsExporter, NodeDescs: cfg.nodeDescs, TenantCapabilitiesReader: cfg.tenantCapabilitiesReader, + // TODO(knz): We will replace this provider by an actual provider + // in a later commit. + AutoConfigProvider: acprovider.NoTaskProvider{}, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { @@ -1099,6 +1101,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if externalConnKnobs := cfg.TestingKnobs.ExternalConnection; externalConnKnobs != nil { execCfg.ExternalConnectionTestingKnobs = externalConnKnobs.(*externalconn.TestingKnobs) } + if autoConfigKnobs := cfg.TestingKnobs.AutoConfig; autoConfigKnobs != nil { + knobs := autoConfigKnobs.(*autoconfig.TestingKnobs) + if knobs.Provider != nil { + execCfg.AutoConfigProvider = knobs.Provider + } + } statsRefresher := stats.MakeRefresher( cfg.AmbientCtx, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 8903772b2c98..8bffc0b0eff1 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -339,6 +339,7 @@ go_library( "//pkg/security/password", "//pkg/security/sessionrevival", "//pkg/security/username", + "//pkg/server/autoconfig/acprovider", "//pkg/server/pgurl", "//pkg/server/serverpb", "//pkg/server/status/statuspb", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 4aebccb2298e..111f88e158b3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/server/pgurl" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" @@ -1440,6 +1441,10 @@ type ExecutorConfig struct { NodeDescs kvcoord.NodeDescStore TenantCapabilitiesReader SystemTenantOnly[tenantcapabilities.Reader] + + // AutoConfigProvider informs the auto config runner job of new + // tasks to run. + AutoConfigProvider acprovider.Provider } // UpdateVersionSystemSettingHook provides a callback that allows us diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index e6109af378b1..aa30036f9134 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3701,7 +3701,7 @@ var charts = []sectionDescription{ }, }, { - Title: "Auto Config Runner Job", + Title: "Auto Config Top-level Runner Job", Metrics: []string{ "jobs.auto_config_runner.fail_or_cancel_completed", "jobs.auto_config_runner.fail_or_cancel_failed", @@ -3711,6 +3711,28 @@ var charts = []sectionDescription{ "jobs.auto_config_runner.resume_retry_error", }, }, + { + Title: "Auto Config Per-environment Runner Jobs", + Metrics: []string{ + "jobs.auto_config_env_runner.fail_or_cancel_completed", + "jobs.auto_config_env_runner.fail_or_cancel_failed", + "jobs.auto_config_env_runner.fail_or_cancel_retry_error", + "jobs.auto_config_env_runner.resume_completed", + "jobs.auto_config_env_runner.resume_failed", + "jobs.auto_config_env_runner.resume_retry_error", + }, + }, + { + Title: "Auto Config Tasks", + Metrics: []string{ + "jobs.auto_config_task.fail_or_cancel_completed", + "jobs.auto_config_task.fail_or_cancel_failed", + "jobs.auto_config_task.fail_or_cancel_retry_error", + "jobs.auto_config_task.resume_completed", + "jobs.auto_config_task.resume_failed", + "jobs.auto_config_task.resume_retry_error", + }, + }, }, }, {