Skip to content

Commit

Permalink
tablemetadatacache: add cluster settings to automate job execution
Browse files Browse the repository at this point in the history
This commit adds the ability to automate running the `UPDATE TABLE
METADATA` job. Two new cluster settings are introduced to control
the interval the job is run at:
- `obs.tablemetadatacache.data_valid_duration` non-negative duration
setting, which specifies the duration for which the data in
system.table_metadata is considered valid. This duration will be used as
a threshold to inform the system on when we should run the job to refresh
data. Default 20m.
- `obs.tablemetadatacache.automatic_updates.enabled` boolean setting,
which defaults to false. If true, the system will execute the update job
on according to the interval setting above.

Fixes: #130098
Epic: CRDB-37558

Release note (ops change): New cluster settings have been added
which control the refresh behaviour for the cached data shown
in the db pages in db console.
- `obs.tablemetadatacache.data_valid_duration` non-negative duration setting, which
specifies the duration for which the data in system.table_metadata is
considered valid. If the cache's last update time  exceeds this threshold,
users visiting the db pages will trigger a cache refresh. Default 20m.
- `obs.tablemetadatacache.automatic_updates.enabled` enables the cache to
be automatically updated according the validity interval. Default false.
  • Loading branch information
xinhaoz committed Sep 11, 2024
1 parent 3c5e4ad commit 56b580b
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ storage.max_sync_duration.fatal.enabled boolean true if true, fatal the process
storage.sstable.compression_algorithm enumeration snappy determines the compression algorithm to use when compressing sstable data blocks for use in a Pebble store; [snappy = 1, zstd = 2, none = 3] system-visible
storage.sstable.compression_algorithm_backup_storage enumeration snappy determines the compression algorithm to use when compressing sstable data blocks for backup row data storage; [snappy = 1, zstd = 2, none = 3] system-visible
storage.sstable.compression_algorithm_backup_transport enumeration snappy determines the compression algorithm to use when compressing sstable data blocks for backup transport; [snappy = 1, zstd = 2, none = 3] system-visible
tablemetadata.automatic_updates.enabled boolean false enables automatic updates of the table metadata cache system.table_metadata application
tablemetadata.data_valid_duration duration 20m0s the duration for which the data in system.table_metadata is considered valid application
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. system-visible
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. system-visible
trace.debug_http_endpoint.enabled (alias: trace.debug.enable) boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests application
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@
<tr><td><div id="setting-storage-sstable-compression-algorithm-backup-storage" class="anchored"><code>storage.sstable.compression_algorithm_backup_storage</code></div></td><td>enumeration</td><td><code>snappy</code></td><td>determines the compression algorithm to use when compressing sstable data blocks for backup row data storage; [snappy = 1, zstd = 2, none = 3]</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-storage-sstable-compression-algorithm-backup-transport" class="anchored"><code>storage.sstable.compression_algorithm_backup_transport</code></div></td><td>enumeration</td><td><code>snappy</code></td><td>determines the compression algorithm to use when compressing sstable data blocks for backup transport; [snappy = 1, zstd = 2, none = 3]</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-storage-wal-failover-unhealthy-op-threshold" class="anchored"><code>storage.wal_failover.unhealthy_op_threshold</code></div></td><td>duration</td><td><code>100ms</code></td><td>the latency of a WAL write considered unhealthy and triggers a failover to a secondary WAL location</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-tablemetadata-automatic-updates-enabled" class="anchored"><code>tablemetadata.automatic_updates.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables automatic updates of the table metadata cache system.table_metadata</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-tablemetadata-data-valid-duration" class="anchored"><code>tablemetadata.data_valid_duration</code></div></td><td>duration</td><td><code>20m0s</code></td><td>the duration for which the data in system.table_metadata is considered valid</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-timeseries-storage-enabled" class="anchored"><code>timeseries.storage.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-timeseries-storage-resolution-10s-ttl" class="anchored"><code>timeseries.storage.resolution_10s.ttl</code></div></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-timeseries-storage-resolution-30m-ttl" class="anchored"><code>timeseries.storage.resolution_30m.ttl</code></div></td><td>duration</td><td><code>2160h0m0s</code></td><td>the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/tablemetadatacache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "tablemetadatacache",
srcs = ["update_table_metadata_cache_job.go"],
srcs = [
"cluster_settings.go",
"update_table_metadata_cache_job.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/isql",
Expand All @@ -34,13 +38,15 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/isql",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
41 changes: 41 additions & 0 deletions pkg/sql/tablemetadatacache/cluster_settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tablemetadatacache

import (
"errors"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)

const defaultDataValidDuration = time.Minute * 20

var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"obs.tablemetadata.automatic_updates.enabled",
"enables automatic updates of the table metadata cache system.table_metadata",
false,
settings.WithPublic)

var TableMetadataCacheValidDuration = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"obs.tablemetadata.data_valid_duration",
"the duration for which the data in system.table_metadata is considered valid",
defaultDataValidDuration,
settings.WithValidateDuration(func(t time.Duration) error {
// This prevents the update loop from running too frequently.
if t < time.Minute {
return errors.New("validity period can't be less than 1 minute")
}
return nil
}),
settings.WithPublic)
60 changes: 53 additions & 7 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)

// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache

// MockJobExecFn sets the function that is run on each iteration of the
// table metadata update job. It is not thread-safe and should only be used in
// tests prior to starting the cluster.
func MockJobExecFn(fn func(context.Context, isql.Executor) error) {
if fn == nil {
fn = updateTableMetadataCache
}
updateJobExecFn = fn
}

type tableMetadataUpdateJobResumer struct {
job *jobs.Job
}
Expand Down Expand Up @@ -55,18 +69,46 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
// Channel used to signal the job should run.
signalCh := execCtx.ExecCfg().SQLStatusServer.GetUpdateTableMetadataCacheSignal()

// Register callbacks to signal the job to reset the timer when timer related settings change.
scheduleSettingsCh := make(chan struct{})
tableMetadataCacheAutoUpdatesEnabled.SetOnChange(&execCtx.ExecCfg().Settings.SV, func(_ context.Context) {
select {
case scheduleSettingsCh <- struct{}{}:
default:
}
})
TableMetadataCacheValidDuration.SetOnChange(&execCtx.ExecCfg().Settings.SV, func(_ context.Context) {
select {
case scheduleSettingsCh <- struct{}{}:
default:
}
})

var timer timeutil.Timer
for {
if tableMetadataCacheAutoUpdatesEnabled.Get(&execCtx.ExecCfg().Settings.SV) {
timer.Reset(TableMetadataCacheValidDuration.Get(&execCtx.ExecCfg().Settings.SV))
}
select {
case <-scheduleSettingsCh:
// Restart the loop to recompute the timer.
timer.Stop()
continue
case <-timer.C:
timer.Read = true
log.Info(ctx, "running table metadata update job after data cache expiration")
case <-signalCh:
log.Infof(ctx, "running table metadata update job")
metrics.NumRuns.Inc(1)
j.updateLastRunTime(ctx)

// TODO(xinhaoz): implement the actual table metadata update logic.

log.Info(ctx, "running table metadata update job via grpc signal")
case <-ctx.Done():
return ctx.Err()
}

// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.updateLastRunTime(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
}
}
}

Expand All @@ -87,6 +129,10 @@ func (j *tableMetadataUpdateJobResumer) updateLastRunTime(ctx context.Context) {
}
}

func updateTableMetadataCache(ctx context.Context, ie isql.Executor) error {
return nil
}

// OnFailOrCancel implements jobs.Resumer.
func (j *tableMetadataUpdateJobResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, jobErr error,
Expand Down Expand Up @@ -116,7 +162,7 @@ func (m TableMetadataUpdateJobMetrics) MetricStruct() {}
func newTableMetadataUpdateJobMetrics() metric.Struct {
return TableMetadataUpdateJobMetrics{
NumRuns: metric.NewCounter(metric.Metadata{
Name: "tablemetadatacache.update_job.runs",
Name: "obs.tablemetadata.update_job.runs",
Help: "The total number of runs of the update table metadata job.",
Measurement: "Executions",
Unit: metric.Unit_COUNT,
Expand Down
109 changes: 108 additions & 1 deletion pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@ package tablemetadatacache_test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -85,3 +90,105 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
return nil
})
}

// TestUpdateTableMetadataCacheAutomaticUpdates tests that:
// 1. The update table metadata cache job does not run automatically by default.
// 2. The job runs automatically on the data validity interval when automatic
// updates are enabled.
func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// We'll mock the job execution function to track when the job is run and
// to avoid running the actual job which could take longer - we don't care
// about the actual update logic in this test.
var mockCalls []time.Time
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
tablemetadatacache.MockJobExecFn(func(ctx context.Context, ie isql.Executor) error {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
select {
case jobRunCh <- struct{}{}:
default:
}
return nil
})

// Server setup.
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))
metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(tablemetadatacache.TableMetadataUpdateJobMetrics)

getMockCallCount := func() int {
mockMutex.RLock()
defer mockMutex.RUnlock()
return len(mockCalls)
}

waitForJobRuns := func(count int, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for i := 0; i < count; i++ {
select {
case <-jobRunCh:
case <-ctx.Done():
return fmt.Errorf("timed out waiting for job run %d", i+1)
}
}
return nil
}

testutils.SucceedsSoon(t, func() error {
row := conn.Query(t, `
SELECT claim_instance_id FROM system.jobs
WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("no node has claimed the job")
}
return nil
})

t.Run("JobNotRunningByDefault", func(t *testing.T) {
require.Zero(t, metrics.NumRuns.Count(), "Job should not run automatically by default")
})

t.Run("AutomaticUpdatesEnabled", func(t *testing.T) {
conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`)
tablemetadatacache.TableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond)
err := waitForJobRuns(3, 5*time.Second)
require.NoError(t, err, "Job should have run at least 3 times")
mockCallsCount := getMockCallCount()
require.GreaterOrEqual(t, mockCallsCount, 3, "Job should have run at least 3 times")
require.Equal(t, int64(mockCallsCount), metrics.NumRuns.Count())
conn.Exec(t, `RESET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled`)
// We'll wait for one more signal in case the job was running when the setting was disabled.
// Ignore the error since it could timeout or be successful.
_ = waitForJobRuns(1, 200*time.Millisecond)
})

t.Run("AutomaticUpdatesDisabled", func(t *testing.T) {
conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`)
initialCount := getMockCallCount()
err := waitForJobRuns(1, 200*time.Millisecond)
require.Error(t, err, "Job should not run after being disabled")
require.Equal(t, initialCount, getMockCallCount(), "Job count should not increase after being disabled")
})

t.Run("VerifyTimeBetweenCalls", func(t *testing.T) {
mockMutex.Lock()
defer mockMutex.Unlock()
for i := 1; i < len(mockCalls); i++ {
timeBetweenCalls := mockCalls[i].Sub(mockCalls[i-1])
require.GreaterOrEqual(t, timeBetweenCalls, 50*time.Millisecond,
"Time between calls %d and %d should be at least 50ms", i-1, i)
}
})
}

0 comments on commit 56b580b

Please sign in to comment.