-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathschema_telemetry_test.go
182 lines (159 loc) · 6.08 KB
/
schema_telemetry_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package schematelemetry_test
import (
"context"
"fmt"
"math"
"regexp"
"testing"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)
func makeTestServerArgs() (args base.TestServerArgs) {
args.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{
JobSchedulerEnv: jobstest.NewJobSchedulerTestEnv(
jobstest.UseSystemTables,
timeutil.Now(),
tree.ScheduledSchemaTelemetryExecutor,
),
}
aostDuration := time.Nanosecond
args.Knobs.SchemaTelemetry = &sql.SchemaTelemetryTestingKnobs{
AOSTDuration: &aostDuration,
}
return args
}
var (
qExists = fmt.Sprintf(`
SELECT recurrence, count(*)
FROM [SHOW SCHEDULES]
WHERE label = '%s'
AND schedule_status = 'ACTIVE'
GROUP BY recurrence`,
schematelemetrycontroller.SchemaTelemetryScheduleName)
qID = fmt.Sprintf(`
SELECT id
FROM [SHOW SCHEDULES]
WHERE label = '%s'
AND schedule_status = 'ACTIVE'`,
schematelemetrycontroller.SchemaTelemetryScheduleName)
qSet = fmt.Sprintf(`SET CLUSTER SETTING %s = '* * * * *'`,
schematelemetrycontroller.SchemaTelemetryRecurrence.Key())
qJob = fmt.Sprintf(`SELECT %s()`,
builtinconstants.CreateSchemaTelemetryJobBuiltinName)
)
const qHasJob = `SELECT count(*) FROM crdb_internal.jobs WHERE job_type = 'AUTO SCHEMA TELEMETRY' AND status = 'succeeded'`
func TestSchemaTelemetrySchedule(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, db, _ := serverutils.StartServer(t, makeTestServerArgs())
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(db)
clusterID := s.ExecutorConfig().(sql.ExecutorConfig).NodeInfo.
LogicalClusterID()
exp := scheduledjobs.MaybeRewriteCronExpr(clusterID, "@weekly")
tdb.CheckQueryResultsRetry(t, qExists, [][]string{{exp, "1"}})
tdb.ExecSucceedsSoon(t, qSet)
tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"* * * * *", "1"}})
}
func TestSchemaTelemetryJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s, db, _ := serverutils.StartServer(t, makeTestServerArgs())
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(db)
tdb.Exec(t, `SET CLUSTER SETTING server.eventlog.enabled = true`)
// Pause the existing schema telemetry schedule so that it doesn't interfere.
res := tdb.QueryStr(t, qID)
require.NotEmpty(t, res)
require.NotEmpty(t, res[0])
id := res[0][0]
tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id))
tdb.CheckQueryResults(t, qHasJob, [][]string{{"0"}})
// NB: The following block is copied directly from
// pkg/sql/crdb_internal_test.go. It may be worthwhile to add utilities for
// generating descriptor corruption in the future rather than copying the same codeblock around.
// Create some tables that we can corrupt the descriptors of.
tdb.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test (k INT8);
CREATE TABLE fktbl (id INT8 PRIMARY KEY);
CREATE TABLE tbl (
customer INT8 NOT NULL REFERENCES fktbl (id)
);
CREATE TABLE nojob (k INT8);
`)
// Retrieve their IDs.
databaseID := int(sqlutils.QueryDatabaseID(t, db, "t"))
tableTID := int(sqlutils.QueryTableID(t, db, "t", "public", "test"))
tableFkTblID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "fktbl"))
tableNoJobID := int(sqlutils.QueryTableID(t, db, "defaultdb", "public", "nojob"))
const fakeID = 12345
// Now introduce some inconsistencies.
tdb.Exec(t, fmt.Sprintf(`
INSERT INTO system.users VALUES ('node', NULL, true, 3);
GRANT node TO root;
DELETE FROM system.descriptor WHERE id = %d;
DELETE FROM system.descriptor WHERE id = %d;
SELECT
crdb_internal.unsafe_upsert_descriptor(
id,
crdb_internal.json_to_pb(
'cockroach.sql.sqlbase.Descriptor',
json_set(
json_set(
crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor, false),
ARRAY['table', 'mutationJobs'],
jsonb_build_array(jsonb_build_object('job_id', 123456, 'mutation_id', 1))
),
ARRAY['table', 'mutations'],
jsonb_build_array(jsonb_build_object('mutation_id', 1))
)
),
true
)
FROM
system.descriptor
WHERE
id = %d;
UPDATE system.namespace SET id = %d WHERE id = %d;
`, databaseID, tableFkTblID, tableNoJobID, fakeID, tableTID))
// Grab a handle to the job's metrics struct.
metrics := s.JobRegistry().(*jobs.Registry).MetricsStruct().JobSpecificMetrics[jobspb.TypeAutoSchemaTelemetry].(schematelemetry.Metrics)
// Run a schema telemetry job and wait for it to succeed.
tdb.Exec(t, qJob)
tdb.CheckQueryResultsRetry(t, qHasJob, [][]string{{"1"}})
// Assert that the InvalidObjects gauge is set to the number of expected
// invalid object.
require.Equal(t, int64(9), metrics.InvalidObjects.Value())
// Ensure that a log line is emitted for each invalid object.
errorRE := regexp.MustCompile("found invalid object with ID.*")
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1000, errorRE, log.SelectEditMode(false, false))
require.NoError(t, err)
require.Len(t, entries, 9)
}