Skip to content

Commit

Permalink
tablemetadatacache: add cluster settings to automate job execution
Browse files Browse the repository at this point in the history
This commit adds the ability to automate running the `UPDATE TABLE
METADATA` job. Two new cluster settings are introduced to control
the interval the job is run at:
- `obs.tablemetadatacache.data_valid_duration` non-negative duration
setting, which specifies the duration for which the data in
system.table_metadata is considered valid. This duration will be used as
a threshold to inform the system on when we should run the job to refresh
data. Default 20m.
- `obs.tablemetadatacache.automatic_updates.enabled` boolean setting,
which defaults to false. If true, the system will execute the update job
on according to the interval setting above.

Fixes: #130098
Epic: CRDB-37558

Release note (ops change): New cluster settings have been added
which control the refresh behaviour for the cached data shown
in the db pages in db console.
- `obs.tablemetadatacache.data_valid_duration` non-negative duration
setting, which specifies the duration for which the data in
system.table_metadata is considered valid. If the cache's last update
time  exceeds this threshold, users visiting the db pages will trigger
a cache refresh. Default 20m.
 - `obs.tablemetadatacache.automatic_updates.enabled` enables the cache
to be automatically updated according the validity interval. Default false.
  • Loading branch information
xinhaoz committed Sep 13, 2024
1 parent a463cb3 commit 254e150
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,7 @@
<tr><td>APPLICATION</td><td>logical_replication.replicated_time_seconds</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Seconds</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_bytes</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.retry_queue_events</td><td>The replicated time of the logical replication stream in seconds since the unix epoch.</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>obs.tablemetadata.update_job.runs</td><td>The total number of runs of the update table metadata job.</td><td>Executions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.admit_latency</td><td>Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>physical_replication.cutover_progress</td><td>The number of ranges left to revert in order to complete an inflight cutover</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down Expand Up @@ -1716,7 +1717,6 @@
<tr><td>APPLICATION</td><td>sqlliveness.sessions_deletion_runs</td><td>Number of calls to delete sessions which have been performed</td><td>Sessions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sqlliveness.write_failures</td><td>Number of update or insert calls which have failed</td><td>Writes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sqlliveness.write_successes</td><td>Number of update or insert calls successfully performed</td><td>Writes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>tablemetadatacache.update_job.runs</td><td>The total number of runs of the update table metadata job.</td><td>Executions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>tenant.cost_client.blocked_requests</td><td>Number of requests currently blocked by the rate limiter</td><td>Requests</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>tenant.sql_usage.cross_region_network_ru</td><td>Total number of RUs charged for cross-region network traffic</td><td>Request Units</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>tenant.sql_usage.estimated_cpu_seconds</td><td>Estimated amount of CPU consumed by a virtual cluster</td><td>CPU Seconds</td><td>COUNTER</td><td>SECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, t
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application
kv.transaction.write_pipelining.max_batch_size (alias: kv.transaction.write_pipelining_max_batch_size) integer 128 if non-zero, defines that maximum size batch that will be pipelined through Raft consensus application
obs.tablemetadata.automatic_updates.enabled boolean false enables automatic updates of the table metadata cache system.table_metadata application
obs.tablemetadata.data_valid_duration duration 20m0s the duration for which the data in system.table_metadata is considered valid application
schedules.backup.gc_protection.enabled boolean true enable chaining of GC protection across backups run as part of a schedule application
security.client_cert.subject_required.enabled boolean false mandates a requirement for subject role to be set for db user system-visible
security.ocsp.mode enumeration off use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] application
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@
<tr><td><div id="setting-kv-transaction-write-pipelining-enabled" class="anchored"><code>kv.transaction.write_pipelining.enabled<br />(alias: kv.transaction.write_pipelining_enabled)</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-max-batch-size" class="anchored"><code>kv.transaction.write_pipelining.max_batch_size<br />(alias: kv.transaction.write_pipelining_max_batch_size)</code></div></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kvadmission-store-provisioned-bandwidth" class="anchored"><code>kvadmission.store.provisioned_bandwidth</code></div></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to true</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-obs-tablemetadata-automatic-updates-enabled" class="anchored"><code>obs.tablemetadata.automatic_updates.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables automatic updates of the table metadata cache system.table_metadata</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-obs-tablemetadata-data-valid-duration" class="anchored"><code>obs.tablemetadata.data_valid_duration</code></div></td><td>duration</td><td><code>20m0s</code></td><td>the duration for which the data in system.table_metadata is considered valid</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-schedules-backup-gc-protection-enabled" class="anchored"><code>schedules.backup.gc_protection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-security-client-cert-subject-required-enabled" class="anchored"><code>security.client_cert.subject_required.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>mandates a requirement for subject role to be set for db user</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-security-ocsp-mode" class="anchored"><code>security.ocsp.mode</code></div></td><td>enumeration</td><td><code>off</code></td><td>use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/tablemetadatacache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "tablemetadatacache",
srcs = [
"cluster_settings.go",
"table_metadata_batch_iterator.go",
"table_metadata_updater.go",
"update_table_metadata_cache_job.go",
Expand All @@ -13,6 +14,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/isql",
Expand Down Expand Up @@ -47,12 +49,14 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
41 changes: 41 additions & 0 deletions pkg/sql/tablemetadatacache/cluster_settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tablemetadatacache

import (
"errors"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)

const defaultDataValidDuration = time.Minute * 20

var tableMetadataCacheAutoUpdatesEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"obs.tablemetadata.automatic_updates.enabled",
"enables automatic updates of the table metadata cache system.table_metadata",
false,
settings.WithPublic)

var tableMetadataCacheValidDuration = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"obs.tablemetadata.data_valid_duration",
"the duration for which the data in system.table_metadata is considered valid",
defaultDataValidDuration,
settings.WithValidateDuration(func(t time.Duration) error {
// This prevents the update loop from running too frequently.
if t < time.Minute {
return errors.New("validity period can't be less than 1 minute")
}
return nil
}),
settings.WithPublic)
48 changes: 39 additions & 9 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)

// updateJobExecFn specifies the function that is run on each iteration of the
// table metadata update job. It can be overriden in tests.
var updateJobExecFn func(context.Context, isql.Executor) error = updateTableMetadataCache

type tableMetadataUpdateJobResumer struct {
job *jobs.Job
}
Expand Down Expand Up @@ -55,20 +59,46 @@ func (j *tableMetadataUpdateJobResumer) Resume(ctx context.Context, execCtxI int
// Channel used to signal the job should run.
signalCh := execCtx.ExecCfg().SQLStatusServer.GetUpdateTableMetadataCacheSignal()

settings := execCtx.ExecCfg().Settings
// Register callbacks to signal the job to reset the timer when timer related settings change.
scheduleSettingsCh := make(chan struct{})
tableMetadataCacheAutoUpdatesEnabled.SetOnChange(&settings.SV, func(_ context.Context) {
select {
case scheduleSettingsCh <- struct{}{}:
default:
}
})
tableMetadataCacheValidDuration.SetOnChange(&settings.SV, func(_ context.Context) {
select {
case scheduleSettingsCh <- struct{}{}:
default:
}
})

var timer timeutil.Timer
for {
if tableMetadataCacheAutoUpdatesEnabled.Get(&settings.SV) {
timer.Reset(tableMetadataCacheValidDuration.Get(&settings.SV))
}
select {
case <-scheduleSettingsCh:
timer.Stop()
continue
case <-timer.C:
timer.Read = true
log.Info(ctx, "running table metadata update job after data cache expiration")
case <-signalCh:
log.Infof(ctx, "running table metadata update job")
metrics.NumRuns.Inc(1)
j.updateLastRunTime(ctx)

if err := updateTableMetadataCache(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
log.Errorf(ctx, "%s", err.Error())
}

log.Info(ctx, "running table metadata update job via grpc signal")
case <-ctx.Done():
return ctx.Err()
}

// Run table metadata update job.
metrics.NumRuns.Inc(1)
j.updateLastRunTime(ctx)
if err := updateJobExecFn(ctx, execCtx.ExecCfg().InternalDB.Executor()); err != nil {
log.Errorf(ctx, "error running table metadata update job: %s", err)
}
}
}

Expand Down Expand Up @@ -132,7 +162,7 @@ func (m TableMetadataUpdateJobMetrics) MetricStruct() {}
func newTableMetadataUpdateJobMetrics() metric.Struct {
return TableMetadataUpdateJobMetrics{
NumRuns: metric.NewCounter(metric.Metadata{
Name: "tablemetadatacache.update_job.runs",
Name: "obs.tablemetadata.update_job.runs",
Help: "The total number of runs of the update table metadata job.",
Measurement: "Executions",
Unit: metric.Unit_COUNT,
Expand Down
115 changes: 111 additions & 4 deletions pkg/sql/tablemetadatacache/update_table_metadata_cache_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,29 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tablemetadatacache_test
package tablemetadatacache

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/tablemetadatacache"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -66,7 +71,7 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
})

metrics := tc.Server(0).JobRegistry().(*jobs.Registry).MetricsStruct().
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(tablemetadatacache.TableMetadataUpdateJobMetrics)
JobSpecificMetrics[jobspb.TypeUpdateTableMetadataCache].(TableMetadataUpdateJobMetrics)
testutils.SucceedsSoon(t, func() error {
if metrics.NumRuns.Count() != 1 {
return errors.New("job hasn't run yet")
Expand All @@ -85,3 +90,105 @@ WHERE id = $1 AND claim_instance_id IS NOT NULL`, jobs.UpdateTableMetadataCacheJ
return nil
})
}

// TestUpdateTableMetadataCacheAutomaticUpdates tests that:
// 1. The update table metadata cache job does not run automatically by default.
// 2. The job runs automatically on the data validity interval when automatic
// updates are enabled.
func TestUpdateTableMetadataCacheAutomaticUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t, "too slow under stress")

ctx := context.Background()

// We'll mock the job execution function to track when the job is run and
// to avoid running the actual job which could take longer - we don't care
// about the actual update logic in this test.
var mockCalls []time.Time
mockMutex := syncutil.RWMutex{}
jobRunCh := make(chan struct{})
restoreUpdate := testutils.TestingHook(&updateJobExecFn,
func(ctx context.Context, ie isql.Executor) error {
mockMutex.Lock()
defer mockMutex.Unlock()
mockCalls = append(mockCalls, timeutil.Now())
select {
case jobRunCh <- struct{}{}:
default:
}
return nil
})
defer restoreUpdate()

getMockCallCount := func() int {
mockMutex.RLock()
defer mockMutex.RUnlock()
return len(mockCalls)
}

waitForJobRuns := func(count int, timeout time.Duration) error {
ctxWithCancel, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for i := 0; i < count; i++ {
select {
case <-jobRunCh:
case <-ctxWithCancel.Done():
return fmt.Errorf("timed out waiting for job run %d", i+1)
}
}
return nil
}

// Server setup.
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))

// Wait for the job to be claimed by a node.
testutils.SucceedsSoon(t, func() error {
row := conn.Query(t, `
SELECT claim_instance_id, status FROM system.jobs
WHERE id = $1 AND claim_instance_id IS NOT NULL
AND status = 'running'`,
jobs.UpdateTableMetadataCacheJobID)
if !row.Next() {
return errors.New("no node has claimed the job")
}
return nil
})

require.Zero(t, getMockCallCount(), "Job should not run automatically by default")

t.Run("AutomaticUpdatesEnabled", func(t *testing.T) {
conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = true`)
tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond)
err := waitForJobRuns(3, 10*time.Second)
require.NoError(t, err, "Job should have run at least 3 times")
mockCallsCount := getMockCallCount()
require.GreaterOrEqual(t, mockCallsCount, 3, "Job should have run at least 3 times")
conn.Exec(t, `RESET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled`)
// We'll wait for one more signal in case the job was running when the setting was disabled.
// Ignore the error since it could timeout or be successful.
_ = waitForJobRuns(1, 200*time.Millisecond)

// Verify time between calls.
mockMutex.RLock()
defer mockMutex.RUnlock()
for i := 1; i < len(mockCalls); i++ {
timeBetweenCalls := mockCalls[i].Sub(mockCalls[i-1])
require.GreaterOrEqual(t, timeBetweenCalls, 50*time.Millisecond,
"Time between calls %d and %d should be at least 50ms", i-1, i)
}
})

t.Run("AutomaticUpdatesDisabled", func(t *testing.T) {
conn.Exec(t, `SET CLUSTER SETTING obs.tablemetadata.automatic_updates.enabled = f`)
tableMetadataCacheValidDuration.Override(ctx, &s.ClusterSettings().SV, 50*time.Millisecond)
initialCount := getMockCallCount()
err := waitForJobRuns(1, 200*time.Millisecond)
require.Error(t, err, "Job should not run after being disabled")
require.Equal(t, initialCount, getMockCallCount(), "Job count should not increase after being disabled")
})
}

0 comments on commit 254e150

Please sign in to comment.