diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 3881a6947b0a..8e4ccb58f278 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -389,6 +389,8 @@ storage.max_sync_duration.fatal.enabled boolean true if true, fatal the process storage.sstable.compression_algorithm enumeration snappy determines the compression algorithm to use when compressing sstable data blocks for use in a Pebble store; [snappy = 1, zstd = 2, none = 3] system-visible storage.sstable.compression_algorithm_backup_storage enumeration snappy determines the compression algorithm to use when compressing sstable data blocks for backup row data storage; [snappy = 1, zstd = 2, none = 3] system-visible storage.sstable.compression_algorithm_backup_transport enumeration snappy determines the compression algorithm to use when compressing sstable data blocks for backup transport; [snappy = 1, zstd = 2, none = 3] system-visible +tablemetadata.automatic_updates.enabled boolean false enables automatic updates of the table metadata cache system.table_metadata application +tablemetadata.data_valid_duration duration 20m0s the duration for which the data in system.table_metadata is considered valid application timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. system-visible timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. system-visible trace.debug_http_endpoint.enabled (alias: trace.debug.enable) boolean false if set, traces for recent requests can be seen at https:///debug/requests application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index cb682c0855dc..cf1da1bba681 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -345,6 +345,8 @@
storage.sstable.compression_algorithm_backup_storage
enumerationsnappydetermines the compression algorithm to use when compressing sstable data blocks for backup row data storage; [snappy = 1, zstd = 2, none = 3]Dedicated/Self-hosted (read-write); Serverless (read-only)
storage.sstable.compression_algorithm_backup_transport
enumerationsnappydetermines the compression algorithm to use when compressing sstable data blocks for backup transport; [snappy = 1, zstd = 2, none = 3]Dedicated/Self-hosted (read-write); Serverless (read-only)
storage.wal_failover.unhealthy_op_threshold
duration100msthe latency of a WAL write considered unhealthy and triggers a failover to a secondary WAL locationDedicated/Self-Hosted +
tablemetadata.automatic_updates.enabled
booleanfalseenables automatic updates of the table metadata cache system.table_metadataServerless/Dedicated/Self-Hosted +
tablemetadata.data_valid_duration
duration20m0sthe duration for which the data in system.table_metadata is considered validServerless/Dedicated/Self-Hosted
timeseries.storage.enabled
booleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhereDedicated/Self-Hosted
timeseries.storage.resolution_10s.ttl
duration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.Dedicated/Self-hosted (read-write); Serverless (read-only)
timeseries.storage.resolution_30m.ttl
duration2160h0m0sthe maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.Dedicated/Self-hosted (read-write); Serverless (read-only) diff --git a/pkg/sql/tablemetadatacache/BUILD.bazel b/pkg/sql/tablemetadatacache/BUILD.bazel index 279b2de02b24..0e25de750ef5 100644 --- a/pkg/sql/tablemetadatacache/BUILD.bazel +++ b/pkg/sql/tablemetadatacache/BUILD.bazel @@ -2,12 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "tablemetadatacache", - srcs = ["update_table_metadata_cache_job.go"], + srcs = [ + "cluster_settings.go", + "update_table_metadata_cache_job.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache", visibility = ["//visibility:public"], deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/isql", @@ -34,13 +38,15 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/server/serverpb", + "//pkg/sql/isql", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", - "@com_github_cockroachdb_errors//:errors", + "//pkg/util/syncutil", + "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/tablemetadatacache/cluster_settings.go b/pkg/sql/tablemetadatacache/cluster_settings.go new file mode 100644 index 000000000000..a9c1202fafc4 --- /dev/null +++ b/pkg/sql/tablemetadatacache/cluster_settings.go @@ -0,0 +1,41 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tablemetadatacache + +import ( + "errors" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +const defaultDataValidDuration = time.Minute * 20 + +var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "obs.tablemetadata.automatic_updates.enabled", + "enables automatic updates of the table metadata cache system.table_metadata", + false, + settings.WithPublic) + +var TableMetadataCacheValidDuration = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "obs.tablemetadata.data_valid_duration", + "the duration for which the data in system.table_metadata is considered valid", + defaultDataValidDuration, + settings.WithValidateDuration(func(t time.Duration) error { + // This prevents the update loop from running too frequently. + if t < time.Minute { + return errors.New("validity period can't be less than 1 minute") + } + return nil + }), + settings.WithPublic) diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go index 6fc9cd22cb22..6d7646c0eab9 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go @@ -26,6 +26,20 @@ import ( io_prometheus_client "github.com/prometheus/client_model/go" ) +// updateJobExecFn specifies the function that is run on each iteration of the +// table metadata update job. It can be overriden in tests. +var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache + +// MockJobExecFn sets the function that is run on each iteration of the +// table metadata update job. It is not thread-safe and should only be used in +// tests prior to starting the cluster. +func MockJobExecFn(fn func(context.Context, isql.Executor) error) { + if fn == nil { + fn = updateTableMetadataCache + } + updateJobExecFn = fn +} + type tableMetadataUpdateJobResumer struct { job *jobs.Job } @@ -55,18 +69,46 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int // Channel used to signal the job should run. signalCh := execCtx.ExecCfg().SQLStatusServer.GetUpdateTableMetadataCacheSignal() + // Register callbacks to signal the job to reset the timer when timer related settings change. + scheduleSettingsCh := make(chan struct{}) + tableMetadataCacheAutoUpdatesEnabled.SetOnChange(&execCtx.ExecCfg().Settings.SV, func(_ context.Context) { + select { + case scheduleSettingsCh <- struct{}{}: + default: + } + }) + TableMetadataCacheValidDuration.SetOnChange(&execCtx.ExecCfg().Settings.SV, func(_ context.Context) { + select { + case scheduleSettingsCh <- struct{}{}: + default: + } + }) + + var timer timeutil.Timer for { + if tableMetadataCacheAutoUpdatesEnabled.Get(&execCtx.ExecCfg().Settings.SV) { + timer.Reset(TableMetadataCacheValidDuration.Get(&execCtx.ExecCfg().Settings.SV)) + } select { + case <-scheduleSettingsCh: + // Restart the loop to recompute the timer. + timer.Stop() + continue + case <-timer.C: + timer.Read = true + log.Info(ctx, "running table metadata update job after data cache expiration") case <-signalCh: - log.Infof(ctx, "running table metadata update job") - metrics.NumRuns.Inc(1) - j.updateLastRunTime(ctx) - - // TODO(xinhaoz): implement the actual table metadata update logic. - + log.Info(ctx, "running table metadata update job via grpc signal") case <-ctx.Done(): return ctx.Err() } + + // Run table metadata update job. + metrics.NumRuns.Inc(1) + j.updateLastRunTime(ctx) + if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil { + log.Errorf(ctx, "error running table metadata update job: %s", err) + } } } @@ -87,6 +129,10 @@ func (j *tableMetadataUpdateJobResumer) updateLastRunTime(ctx context.Context) { } } +func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error { + return nil +} + // OnFailOrCancel implements jobs.Resumer. func (j *tableMetadataUpdateJobResumer) OnFailOrCancel( ctx context.Context, execCtx interface{}, jobErr error, @@ -116,7 +162,7 @@ func (m TableMetadataUpdateJobMetrics) MetricStruct() {} func newTableMetadataUpdateJobMetrics() metric.Struct { return TableMetadataUpdateJobMetrics{ NumRuns: metric.NewCounter(metric.Metadata{ - Name: "tablemetadatacache.update_job.runs", + Name: "obs.tablemetadata.update_job.runs", Help: "The total number of runs of the update table metadata job.", Measurement: "Executions", Unit: metric.Unit_COUNT, diff --git a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go index 902f921aab92..e3d2639b85d9 100644 --- a/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go +++ b/pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go @@ -12,20 +12,25 @@ package tablemetadatacache_test import ( "context" + "errors" + "fmt" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -85,3 +90,105 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ return nil }) } + +// TestUpdateTableMetadataCacheAutomaticUpdates tests that: +// 1. The update table metadata cache job does not run automatically by default. +// 2. The job runs automatically on the data validity interval when automatic +// updates are enabled. +func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // We'll mock the job execution function to track when the job is run and + // to avoid running the actual job which could take longer - we don't care + // about the actual update logic in this test. + var mockCalls []time.Time + mockMutex := syncutil.RWMutex{} + jobRunCh := make(chan struct{}) + tablemetadatacache.MockJobExecFn(func(ctx context.Context, ie isql.Executor) error { + mockMutex.Lock() + defer mockMutex.Unlock() + mockCalls = append(mockCalls, timeutil.Now()) + select { + case jobRunCh <- struct{}{}: + default: + } + return nil + }) + + // Server setup. + s := serverutils.StartServerOnly(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t)) + metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct(). + JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(tablemetadatacache.TableMetadataUpdateJobMetrics) + + getMockCallCount := func() int { + mockMutex.RLock() + defer mockMutex.RUnlock() + return len(mockCalls) + } + + waitForJobRuns := func(count int, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for i := 0; i < count; i++ { + select { + case <-jobRunCh: + case <-ctx.Done(): + return fmt.Errorf("timed out waiting for job run %d", i+1) + } + } + return nil + } + + testutils.SucceedsSoon(t, func() error { + row := conn.Query(t, ` + SELECT claim_instance_id FROM system.jobs + WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJobID) + if !row.Next() { + return errors.New("no node has claimed the job") + } + return nil + }) + + t.Run("JobNotRunningByDefault", func(t *testing.T) { + require.Zero(t, metrics.NumRuns.Count(), "Job should not run automatically by default") + }) + + t.Run("AutomaticUpdatesEnabled", func(t *testing.T) { + conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`) + tablemetadatacache.TableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond) + err := waitForJobRuns(3, 5*time.Second) + require.NoError(t, err, "Job should have run at least 3 times") + mockCallsCount := getMockCallCount() + require.GreaterOrEqual(t, mockCallsCount, 3, "Job should have run at least 3 times") + require.Equal(t, int64(mockCallsCount), metrics.NumRuns.Count()) + conn.Exec(t, `RESET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled`) + // We'll wait for one more signal in case the job was running when the setting was disabled. + // Ignore the error since it could timeout or be successful. + _ = waitForJobRuns(1, 200*time.Millisecond) + }) + + t.Run("AutomaticUpdatesDisabled", func(t *testing.T) { + conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`) + initialCount := getMockCallCount() + err := waitForJobRuns(1, 200*time.Millisecond) + require.Error(t, err, "Job should not run after being disabled") + require.Equal(t, initialCount, getMockCallCount(), "Job count should not increase after being disabled") + }) + + t.Run("VerifyTimeBetweenCalls", func(t *testing.T) { + mockMutex.Lock() + defer mockMutex.Unlock() + for i := 1; i < len(mockCalls); i++ { + timeBetweenCalls := mockCalls[i].Sub(mockCalls[i-1]) + require.GreaterOrEqual(t, timeBetweenCalls, 50*time.Millisecond, + "Time between calls %d and %d should be at least 50ms", i-1, i) + } + }) +}