Skip to content

Commit

Permalink
schematelemetry: emit metrics and logs about invalid objects
Browse files Browse the repository at this point in the history
Short of continuously polling `crdb_internal.invalid_objects`, there was
not a convenient way to monitor a cluster for descriptor corruption.

Having such an indicator would allow customers to perform preflight
checks ahead of upgrades to avoid being stuck in a mixed version state.
It would also allow CRL to more easily monitor cloud clusters for
corruptions in the wild.

This commit updates the schematelemetry job to additionally update the
`sql.schema.invalid_objects` gauge and emit logs for any encountered
corruptions.

Informs: #104266
Epic: CRDB-28665
Release note: None
  • Loading branch information
chrisseto committed Aug 22, 2023
1 parent f6c4d9d commit f6af97a
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 6 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/catalog/schematelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/builtins/builtinconstants",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltelemetry",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -46,8 +47,10 @@ go_test(
],
args = ["-test.timeout=295s"],
deps = [
":schematelemetry",
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/scheduledjobs",
"//pkg/security/securityassets",
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/catalog/schematelemetry/scheduled_job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
)

type schemaTelemetryExecutor struct {
metrics schemaTelemetryMetrics
metrics schemaTelemetryJobMetrics
}

var _ jobs.ScheduledJobController = (*schemaTelemetryExecutor)(nil)
var _ jobs.ScheduledJobExecutor = (*schemaTelemetryExecutor)(nil)

type schemaTelemetryMetrics struct {
type schemaTelemetryJobMetrics struct {
*jobs.ExecutorMetrics
}

var _ metric.Struct = &schemaTelemetryMetrics{}
var _ metric.Struct = &schemaTelemetryJobMetrics{}

// MetricStruct is part of the metric.Struct interface.
func (m *schemaTelemetryMetrics) MetricStruct() {}
func (m *schemaTelemetryJobMetrics) MetricStruct() {}

// OnDrop is part of the jobs.ScheduledJobController interface.
func (s schemaTelemetryExecutor) OnDrop(
Expand Down Expand Up @@ -120,7 +120,7 @@ func init() {
func() (jobs.ScheduledJobExecutor, error) {
m := jobs.MakeExecutorMetrics(tree.ScheduledSchemaTelemetryExecutor.InternalName())
return &schemaTelemetryExecutor{
metrics: schemaTelemetryMetrics{
metrics: schemaTelemetryJobMetrics{
ExecutorMetrics: &m,
},
}, nil
Expand Down
91 changes: 90 additions & 1 deletion pkg/sql/catalog/schematelemetry/schema_telemetry_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,38 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

type Metrics struct {
InvalidObjects *metric.Gauge
}

func newMetrics() Metrics {
return Metrics{
InvalidObjects: metric.NewGauge(metric.Metadata{
Name: "sql.schema.invalid_objects",
Help: "Gauge of detected invalid objects within the system.descriptor table (measured by querying crdb_internal.invalid_objects)",
Measurement: "Objects",
Unit: metric.Unit_COUNT,
}),
}
}

// MetricStruct implements the metric.Struct interface.
func (Metrics) MetricStruct() {}

type schemaTelemetryResumer struct {
job *jobs.Job
st *cluster.Settings
Expand Down Expand Up @@ -53,8 +79,15 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{})
aostDuration = d
}
}
asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0)

const maxRecords = 10000
asOf := p.ExecCfg().Clock.Now().Add(aostDuration.Nanoseconds(), 0)
metrics := p.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(Metrics)

if err := processInvalidObjects(ctx, p.ExecCfg(), asOf, &metrics, maxRecords); err != nil {
return err
}

events, err := CollectClusterSchemaForTelemetry(ctx, p.ExecCfg(), asOf, uuid.FastMakeV4(), maxRecords)
if err != nil || len(events) == 0 {
return err
Expand All @@ -66,9 +99,64 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{})
sql.LogExternally,
events...,
)

return nil
}

func processInvalidObjects(
ctx context.Context,
cfg *sql.ExecutorConfig,
asOf hlc.Timestamp,
metrics *Metrics,
maxRecords int,
) error {
return sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) (retErr error) {
err := txn.KV().SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}

rows, err := txn.QueryIteratorEx(ctx, "sql-telemetry-invalid-objects", txn.KV(), sessiondata.NodeUserSessionDataOverride, `SELECT id, error FROM "".crdb_internal.invalid_objects LIMIT $1`, maxRecords)
if err != nil {
return err
}

defer func(it isql.Rows) {
retErr = errors.CombineErrors(retErr, it.Close())
}(rows)

count := int64(0)
for {
ok, err := rows.Next(ctx)
if err != nil {
return err
}
if !ok {
break
}

count++
row := rows.Cur()

descID, ok := row[0].(*tree.DInt)
if !ok {
return errors.AssertionFailedf("expected id to be int (was %T)", row[0])
}

validationErr, ok := row[1].(*tree.DString)
if !ok {
return errors.AssertionFailedf("expected err to be string (was %T)", row[1])
}

log.Warningf(ctx, "found invalid object with ID %d: %q", descID, validationErr)
}

metrics.InvalidObjects.Update(count)

return nil
})
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (t schemaTelemetryResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, _ error,
Expand All @@ -86,5 +174,6 @@ func init() {
}
},
jobs.DisablesTenantCostControl,
jobs.WithJobMetrics(newMetrics()),
)
}
77 changes: 77 additions & 0 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ package schematelemetry_test
import (
"context"
"fmt"
"math"
"regexp"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -106,7 +110,80 @@ func TestSchemaTelemetryJob(t *testing.T) {
id := res[0][0]
tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id))
tdb.CheckQueryResults(t, qHasJob, [][]string{{"0"}})

// NB: The following block is copied directly from
// pkg/sql/crdb_internal_test.go. It may be worthwhile to add utilities for
// generating descriptor corruption in the future rather than copying the same codeblock around.

// Create some tables that we can corrupt the descriptors of.
tdb.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test (k INT8);
CREATE TABLE fktbl (id INT8 PRIMARY KEY);
CREATE TABLE tbl (
customer INT8 NOT NULL REFERENCES fktbl (id)
);
CREATE TABLE nojob (k INT8);
`)

// Retrieve their IDs.
databaseID := int(sqlutils.QueryDatabaseID(t, db, "t"))
tableTID := int(sqlutils.QueryTableID(t, db, "t", "public", "test"))
tableFkTblID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "fktbl"))
tableNoJobID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "nojob"))
const fakeID = 12345

// Now introduce some inconsistencies.
tdb.Exec(t, fmt.Sprintf(`
INSERT INTO system.users VALUES ('node', NULL, true, 3);
GRANT node TO root;
DELETE FROM system.descriptor WHERE id = %d;
DELETE FROM system.descriptor WHERE id = %d;
SELECT
crdb_internal.unsafe_upsert_descriptor(
id,
crdb_internal.json_to_pb(
'cockroach.sql.sqlbase.Descriptor',
json_set(
json_set(
crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor, false),
ARRAY['table', 'mutationJobs'],
jsonb_build_array(jsonb_build_object('job_id', 123456, 'mutation_id', 1))
),
ARRAY['table', 'mutations'],
jsonb_build_array(jsonb_build_object('mutation_id', 1))
)
),
true
)
FROM
system.descriptor
WHERE
id = %d;
UPDATE system.namespace SET id = %d WHERE id = %d;
`, databaseID, tableFkTblID, tableNoJobID, fakeID, tableTID))

// Grab a handle to the job's metrics struct.
metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(schematelemetry.Metrics)

// Run a schema telemetry job and wait for it to succeed.
tdb.Exec(t, qJob)
tdb.CheckQueryResultsRetry(t, qHasJob, [][]string{{"1"}})

// Assert that the InvalidObjects gauge is set to the number of expected
// invalid object. Our above query should have generated 9 corruptions. See
// the pkg/sql/crdb_internal_test.go:TestInvalidObjects for the breakdown of
// what exactly was done.
require.Equal(t, int64(9), metrics.InvalidObjects.Value())

// Ensure that our logs are flushed to disk before asserting about log
// entries.
log.Flush()

// Ensure that a log line is emitted for each invalid object, with a loose
// enforcement of the log structure.
errorRE := regexp.MustCompile(`found invalid object with ID \d+: ".+"`)
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1000, errorRE, log.SelectEditMode(false, false))
require.NoError(t, err)
require.Len(t, entries, 9)
}

0 comments on commit f6af97a

Please sign in to comment.