diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index 9c5b3fd1bcfd..8dc8f2699262 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -1439,6 +1439,7 @@ APPLICATIONlogical_replication.replicated_time_secondsThe replicated time of the logical replication stream in seconds since the unix epoch.SecondsGAUGESECONDSAVGNONE APPLICATIONlogical_replication.retry_queue_bytesThe replicated time of the logical replication stream in seconds since the unix epoch.BytesGAUGEBYTESAVGNONE APPLICATIONlogical_replication.retry_queue_eventsThe replicated time of the logical replication stream in seconds since the unix epoch.EventsGAUGECOUNTAVGNONE +APPLICATIONobs.tablemetadata.update_job.runsThe total number of runs of the update table metadata job.ExecutionsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONphysical_replication.admit_latencyEvent admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processorNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONphysical_replication.commit_latencyEvent commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recordedNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONphysical_replication.cutover_progressThe number of ranges left to revert in order to complete an inflight cutoverRangesGAUGECOUNTAVGNONE @@ -1716,7 +1717,6 @@ APPLICATIONsqlliveness.sessions_deletion_runsNumber of calls to delete sessions which have been performedSessionsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONsqlliveness.write_failuresNumber of update or insert calls which have failedWritesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONsqlliveness.write_successesNumber of update or insert calls successfully performedWritesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE -APPLICATIONtablemetadatacache.update_job.runsThe total number of runs of the update table metadata job.ExecutionsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONtenant.cost_client.blocked_requestsNumber of requests currently blocked by the rate limiterRequestsGAUGECOUNTAVGNONE APPLICATIONtenant.sql_usage.cross_region_network_ruTotal number of RUs charged for cross-region network trafficRequest UnitsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONtenant.sql_usage.estimated_cpu_secondsEstimated amount of CPU consumed by a virtual clusterCPU SecondsCOUNTERSECONDSAVGNON_NEGATIVE_DERIVATIVE diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 3881a6947b0a..d71334bc9f28 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -99,6 +99,8 @@ kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, t kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application kv.transaction.write_pipelining.max_batch_size (alias: kv.transaction.write_pipelining_max_batch_size) integer 128 if non-zero, defines that maximum size batch that will be pipelined through Raft consensus application +obs.tablemetadata.automatic_updates.enabled boolean false enables automatic updates of the table metadata cache system.table_metadata application +obs.tablemetadata.data_valid_duration duration 20m0s the duration for which the data in system.table_metadata is considered valid application schedules.backup.gc_protection.enabled boolean true enable chaining of GC protection across backups run as part of a schedule application security.client_cert.subject_required.enabled boolean false mandates a requirement for subject role to be set for db user system-visible security.ocsp.mode enumeration off use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index cb682c0855dc..720b6fb7d13b 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -128,6 +128,8 @@
kv.transaction.write_pipelining.enabled
(alias: kv.transaction.write_pipelining_enabled)
booleantrueif enabled, transactional writes are pipelined through Raft consensusServerless/Dedicated/Self-Hosted
kv.transaction.write_pipelining.max_batch_size
(alias: kv.transaction.write_pipelining_max_batch_size)
integer128if non-zero, defines that maximum size batch that will be pipelined through Raft consensusServerless/Dedicated/Self-Hosted
kvadmission.store.provisioned_bandwidth
byte size0 Bif set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to trueDedicated/Self-Hosted +
obs.tablemetadata.automatic_updates.enabled
booleanfalseenables automatic updates of the table metadata cache system.table_metadataServerless/Dedicated/Self-Hosted +
obs.tablemetadata.data_valid_duration
duration20m0sthe duration for which the data in system.table_metadata is considered validServerless/Dedicated/Self-Hosted
schedules.backup.gc_protection.enabled
booleantrueenable chaining of GC protection across backups run as part of a scheduleServerless/Dedicated/Self-Hosted
security.client_cert.subject_required.enabled
booleanfalsemandates a requirement for subject role to be set for db userDedicated/Self-hosted (read-write); Serverless (read-only)
security.ocsp.mode
enumerationoffuse OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]Serverless/Dedicated/Self-Hosted diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel index 1674ef458a88..65053b3201d1 100644 --- a/pkg/sql/tablemetadatacache/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "tablemetadatacache", srcs = [ + "cluster_settings.go", "table_metadata_batch_iterator.go", "table_metadata_updater.go", "update_table_metadata_cache_job.go", @@ -13,6 +14,7 @@ go_library( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/isql", @@ -47,12 +49,14 @@ go_test( "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/syncutil", + "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", - "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/tablemetadatacache/cluster_settings.go b/pkg/sql/tablemetadatacache/cluster_settings.go new file mode 100644 index 000000000000..532b621f0ddc --- /dev/null +++ b/pkg/sql/tablemetadatacache/cluster_settings.go @@ -0,0 +1,41 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tablemetadatacache + +import ( + "errors" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +const defaultDataValidDuration = time.Minute * 20 + +var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "obs.tablemetadata.automatic_updates.enabled", + "enables automatic updates of the table metadata cache system.table_metadata", + false, + settings.WithPublic) + +var tableMetadataCacheValidDuration = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "obs.tablemetadata.data_valid_duration", + "the duration for which the data in system.table_metadata is considered valid", + defaultDataValidDuration, + settings.WithValidateDuration(func(t time.Duration) error { + // This prevents the update loop from running too frequently. + if t < time.Minute { + return errors.New("validity period can't be less than 1 minute") + } + return nil + }), + settings.WithPublic) diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index ed51f2ddbdd3..6e527b57a8bf 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -26,6 +26,10 @@ import ( io_prometheus_client "github.com/prometheus/client_model/go" ) +// updateJobExecFn specifies the function that is run on each iteration of the +// table metadata update job. It can be overriden in tests. +var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache + type tableMetadataUpdateJobResumer struct { job *jobs.Job } @@ -55,20 +59,46 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int // Channel used to signal the job should run. signalCh := execCtx.ExecCfg().SQLStatusServer.GetUpdateTableMetadataCacheSignal() + settings := execCtx.ExecCfg().Settings + // Register callbacks to signal the job to reset the timer when timer related settings change. + scheduleSettingsCh := make(chan struct{}) + tableMetadataCacheAutoUpdatesEnabled.SetOnChange(&settings.SV, func(_ context.Context) { + select { + case scheduleSettingsCh <- struct{}{}: + default: + } + }) + tableMetadataCacheValidDuration.SetOnChange(&settings.SV, func(_ context.Context) { + select { + case scheduleSettingsCh <- struct{}{}: + default: + } + }) + + var timer timeutil.Timer for { + if tableMetadataCacheAutoUpdatesEnabled.Get(&settings.SV) { + timer.Reset(tableMetadataCacheValidDuration.Get(&settings.SV)) + } select { + case <-scheduleSettingsCh: + timer.Stop() + continue + case <-timer.C: + timer.Read = true + log.Info(ctx, "running table metadata update job after data cache expiration") case <-signalCh: - log.Infof(ctx, "running table metadata update job") - metrics.NumRuns.Inc(1) - j.updateLastRunTime(ctx) - - if err := updateTableMetadataCache(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil { - log.Errorf(ctx, "%s", err.Error()) - } - + log.Info(ctx, "running table metadata update job via grpc signal") case <-ctx.Done(): return ctx.Err() } + + // Run table metadata update job. + metrics.NumRuns.Inc(1) + j.updateLastRunTime(ctx) + if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil { + log.Errorf(ctx, "error running table metadata update job: %s", err) + } } } @@ -132,7 +162,7 @@ func (m TableMetadataUpdateJobMetrics) MetricStruct() {} func newTableMetadataUpdateJobMetrics() metric.Struct { return TableMetadataUpdateJobMetrics{ NumRuns: metric.NewCounter(metric.Metadata{ - Name: "tablemetadatacache.update_job.runs", + Name: "obs.tablemetadata.update_job.runs", Help: "The total number of runs of the update table metadata job.", Measurement: "Executions", Unit: metric.Unit_COUNT, diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index 902f921aab92..f90730c833e6 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -8,24 +8,29 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package tablemetadatacache_test +package tablemetadatacache import ( "context" + "errors" + "fmt" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -66,7 +71,7 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ }) metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct(). - JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(tablemetadatacache.TableMetadataUpdateJobMetrics) + JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics) testutils.SucceedsSoon(t, func() error { if metrics.NumRuns.Count() != 1 { return errors.New("job hasn't run yet") @@ -85,3 +90,105 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ return nil }) } + +// TestUpdateTableMetadataCacheAutomaticUpdates tests that: +// 1. The update table metadata cache job does not run automatically by default. +// 2. The job runs automatically on the data validity interval when automatic +// updates are enabled. +func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "too slow under stress") + + ctx := context.Background() + + // We'll mock the job execution function to track when the job is run and + // to avoid running the actual job which could take longer - we don't care + // about the actual update logic in this test. + var mockCalls []time.Time + mockMutex := syncutil.RWMutex{} + jobRunCh := make(chan struct{}) + restoreUpdate := testutils.TestingHook(&updateJobExecFn, + func(ctx context.Context, ie isql.Executor) error { + mockMutex.Lock() + defer mockMutex.Unlock() + mockCalls = append(mockCalls, timeutil.Now()) + select { + case jobRunCh <- struct{}{}: + default: + } + return nil + }) + defer restoreUpdate() + + getMockCallCount := func() int { + mockMutex.RLock() + defer mockMutex.RUnlock() + return len(mockCalls) + } + + waitForJobRuns := func(count int, timeout time.Duration) error { + ctxWithCancel, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for i := 0; i < count; i++ { + select { + case <-jobRunCh: + case <-ctxWithCancel.Done(): + return fmt.Errorf("timed out waiting for job run %d", i+1) + } + } + return nil + } + + // Server setup. + s := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t)) + + // Wait for the job to be claimed by a node. + testutils.SucceedsSoon(t, func() error { + row := conn.Query(t, ` + SELECT claim_instance_id, status FROM system.jobs + WHERE id = $1 AND claim_instance_id IS NOT NULL + AND status = 'running'`, + jobs.UpdateTableMetadataCacheJobID) + if !row.Next() { + return errors.New("no node has claimed the job") + } + return nil + }) + + require.Zero(t, getMockCallCount(), "Job should not run automatically by default") + + t.Run("AutomaticUpdatesEnabled", func(t *testing.T) { + conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`) + tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + err := waitForJobRuns(3, 10*time.Second) + require.NoError(t, err, "Job should have run at least 3 times") + mockCallsCount := getMockCallCount() + require.GreaterOrEqual(t, mockCallsCount, 3, "Job should have run at least 3 times") + conn.Exec(t, `RESET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled`) + // We'll wait for one more signal in case the job was running when the setting was disabled. + // Ignore the error since it could timeout or be successful. + _ = waitForJobRuns(1, 200*time.Millisecond) + + // Verify time between calls. + mockMutex.RLock() + defer mockMutex.RUnlock() + for i := 1; i < len(mockCalls); i++ { + timeBetweenCalls := mockCalls[i].Sub(mockCalls[i-1]) + require.GreaterOrEqual(t, timeBetweenCalls, 50*time.Millisecond, + "Time between calls %d and %d should be at least 50ms", i-1, i) + } + }) + + t.Run("AutomaticUpdatesDisabled", func(t *testing.T) { + conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`) + tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + initialCount := getMockCallCount() + err := waitForJobRuns(1, 200*time.Millisecond) + require.Error(t, err, "Job should not run after being disabled") + require.Equal(t, initialCount, getMockCallCount(), "Job count should not increase after being disabled") + }) +}