-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathschemachange.go
465 lines (417 loc) · 14.5 KB
/
schemachange.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package main
import (
"context"
gosql "database/sql"
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)
func registerSchemaChangeKV(r *testRegistry) {
r.Add(testSpec{
Name: `schemachange/mixed/kv`,
Cluster: makeClusterSpec(5),
Run: func(ctx context.Context, t *test, c *cluster) {
const fixturePath = `gs://cockroach-fixtures/workload/tpch/scalefactor=10/backup`
c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload")
c.Start(ctx, t, c.All())
db := c.Conn(ctx, 1)
defer db.Close()
m := newMonitor(ctx, c, c.All())
m.Go(func(ctx context.Context) error {
t.Status("loading fixture")
if _, err := db.Exec(`RESTORE DATABASE tpch FROM $1`, fixturePath); err != nil {
t.Fatal(err)
}
return nil
})
m.Wait()
c.Run(ctx, c.Node(1), `./workload init kv --drop --db=test`)
for node := 1; node <= c.spec.NodeCount; node++ {
node := node
// TODO(dan): Ideally, the test would fail if this queryload failed,
// but we can't put it in monitor as-is because the test deadlocks.
go func() {
const cmd = `./workload run kv --tolerate-errors --min-block-bytes=8 --max-block-bytes=127 --db=test`
l, err := t.l.ChildLogger(fmt.Sprintf(`kv-%d`, node))
if err != nil {
t.Fatal(err)
}
defer l.close()
_ = execCmd(ctx, t.l, roachprod, "ssh", c.makeNodes(c.Node(node)), "--", cmd)
}()
}
m = newMonitor(ctx, c, c.All())
m.Go(func(ctx context.Context) error {
t.Status("running schema change tests")
return waitForSchemaChanges(ctx, t.l, db)
})
m.Wait()
},
})
}
func waitForSchemaChanges(ctx context.Context, l *logger, db *gosql.DB) error {
start := timeutil.Now()
// These schema changes are over a table that is not actively
// being updated.
l.Printf("running schema changes over tpch.customer\n")
schemaChanges := []string{
"ALTER TABLE tpch.customer ADD COLUMN newcol INT DEFAULT 23456",
"CREATE INDEX foo ON tpch.customer (c_name)",
}
if err := runSchemaChanges(ctx, l, db, schemaChanges); err != nil {
return err
}
// TODO(vivek): Fix #21544.
// if err := sqlutils.RunScrub(db, `test`, `kv`); err != nil {
// return err
// }
// All these return the same result.
validationQueries := []string{
"SELECT count(*) FROM tpch.customer AS OF SYSTEM TIME %s",
"SELECT count(newcol) FROM tpch.customer AS OF SYSTEM TIME %s",
"SELECT count(c_name) FROM tpch.customer@foo AS OF SYSTEM TIME %s",
}
if err := runValidationQueries(ctx, l, db, start, validationQueries, nil); err != nil {
return err
}
// These schema changes are run later because the above schema
// changes run for a decent amount of time giving kv.kv
// an opportunity to get populate through the load generator. These
// schema changes are acting upon a decent sized table that is also
// being updated.
l.Printf("running schema changes over test.kv\n")
schemaChanges = []string{
"ALTER TABLE test.kv ADD COLUMN created_at TIMESTAMP DEFAULT now()",
"CREATE INDEX foo ON test.kv (v)",
}
if err := runSchemaChanges(ctx, l, db, schemaChanges); err != nil {
return err
}
// TODO(vivek): Fix #21544.
// if err := sqlutils.RunScrub(db, `test`, `kv`); err != nil {
// return err
// }
// All these return the same result.
validationQueries = []string{
"SELECT count(*) FROM test.kv AS OF SYSTEM TIME %s",
"SELECT count(v) FROM test.kv AS OF SYSTEM TIME %s",
"SELECT count(v) FROM test.kv@foo AS OF SYSTEM TIME %s",
}
// Queries to hone in on index validation problems.
indexValidationQueries := []string{
"SELECT count(k) FROM test.kv@primary AS OF SYSTEM TIME %s WHERE created_at > $1 AND created_at <= $2",
"SELECT count(v) FROM test.kv@foo AS OF SYSTEM TIME %s WHERE created_at > $1 AND created_at <= $2",
}
return runValidationQueries(ctx, l, db, start, validationQueries, indexValidationQueries)
}
func runSchemaChanges(ctx context.Context, l *logger, db *gosql.DB, schemaChanges []string) error {
for _, cmd := range schemaChanges {
start := timeutil.Now()
l.Printf("starting schema change: %s\n", cmd)
if _, err := db.Exec(cmd); err != nil {
l.Errorf("hit schema change error: %s, for %s, in %s\n", err, cmd, timeutil.Since(start))
return err
}
l.Printf("completed schema change: %s, in %s\n", cmd, timeutil.Since(start))
// TODO(vivek): Monitor progress of schema changes and log progress.
}
return nil
}
// The validationQueries all return the same result.
func runValidationQueries(
ctx context.Context,
l *logger,
db *gosql.DB,
start time.Time,
validationQueries []string,
indexValidationQueries []string,
) error {
// Sleep for a bit before validating the schema changes to
// accommodate for time differences between nodes. Some of the
// schema change backfill transactions might use a timestamp a bit
// into the future. This is not a problem normally because a read
// of schema data written into the impending future gets pushed,
// but the reads being done here are at a specific timestamp through
// AS OF SYSTEM TIME.
time.Sleep(5 * time.Second)
var nowString string
if err := db.QueryRow("SELECT cluster_logical_timestamp()").Scan(&nowString); err != nil {
return err
}
var nowInNanos int64
if _, err := fmt.Sscanf(nowString, "%d", &nowInNanos); err != nil {
return err
}
now := timeutil.Unix(0, nowInNanos)
// Validate the different schema changes
var eCount int64
for i := range validationQueries {
var count int64
q := fmt.Sprintf(validationQueries[i], nowString)
if err := db.QueryRow(q).Scan(&count); err != nil {
return err
}
l.Printf("query: %s, found %d rows\n", q, count)
if count == 0 {
return errors.Errorf("%s: %d rows found", q, count)
}
if eCount == 0 {
eCount = count
// Investigate index creation problems. Always run this so we know
// it works.
if indexValidationQueries != nil {
sp := timeSpan{start: start, end: now}
if err := findIndexProblem(
ctx, l, db, sp, nowString, indexValidationQueries,
); err != nil {
return err
}
}
} else if count != eCount {
return errors.Errorf("%s: %d rows found, expected %d rows", q, count, eCount)
}
}
return nil
}
type timeSpan struct {
start, end time.Time
}
// Check index inconsistencies over the timeSpan and return true when
// problems are seen.
func checkIndexOverTimeSpan(
ctx context.Context,
l *logger,
db *gosql.DB,
s timeSpan,
nowString string,
indexValidationQueries []string,
) (bool, error) {
var eCount int64
q := fmt.Sprintf(indexValidationQueries[0], nowString)
if err := db.QueryRow(q, s.start, s.end).Scan(&eCount); err != nil {
return false, err
}
var count int64
q = fmt.Sprintf(indexValidationQueries[1], nowString)
if err := db.QueryRow(q, s.start, s.end).Scan(&count); err != nil {
return false, err
}
l.Printf("counts seen %d, %d, over [%s, %s]\n", count, eCount, s.start, s.end)
return count != eCount, nil
}
// Keep splitting the span of time passed and log where index
// inconsistencies are seen.
func findIndexProblem(
ctx context.Context,
l *logger,
db *gosql.DB,
s timeSpan,
nowString string,
indexValidationQueries []string,
) error {
spans := []timeSpan{s}
// process all the outstanding time spans.
for len(spans) > 0 {
s := spans[0]
spans = spans[1:]
// split span into two time ranges.
leftSpan, rightSpan := s, s
d := s.end.Sub(s.start) / 2
if d < 50*time.Millisecond {
l.Printf("problem seen over [%s, %s]\n", s.start, s.end)
continue
}
m := s.start.Add(d)
leftSpan.end = m
rightSpan.start = m
leftState, err := checkIndexOverTimeSpan(
ctx, l, db, leftSpan, nowString, indexValidationQueries)
if err != nil {
return err
}
rightState, err := checkIndexOverTimeSpan(
ctx, l, db, rightSpan, nowString, indexValidationQueries)
if err != nil {
return err
}
if leftState {
spans = append(spans, leftSpan)
}
if rightState {
spans = append(spans, rightSpan)
}
if !(leftState || rightState) {
l.Printf("no problem seen over [%s, %s]\n", s.start, s.end)
}
}
return nil
}
func registerSchemaChangeIndexTPCC1000(r *testRegistry) {
r.Add(makeIndexAddTpccTest(makeClusterSpec(5, cpu(16)), 1000, time.Hour*2))
}
func registerSchemaChangeIndexTPCC100(r *testRegistry) {
r.Add(makeIndexAddTpccTest(makeClusterSpec(5), 100, time.Minute*15))
}
func makeIndexAddTpccTest(spec clusterSpec, warehouses int, length time.Duration) testSpec {
return testSpec{
Name: fmt.Sprintf("schemachange/index/tpcc/w=%d", warehouses),
Cluster: spec,
Timeout: length * 3,
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: warehouses,
Extra: "--wait=false --tolerate-errors",
During: func(ctx context.Context) error {
return runAndLogStmts(ctx, t, c, "addindex", []string{
`CREATE UNIQUE INDEX ON tpcc.order (o_entry_d, o_w_id, o_d_id, o_carrier_id, o_id);`,
`CREATE INDEX ON tpcc.order (o_carrier_id);`,
`CREATE INDEX ON tpcc.customer (c_last, c_first);`,
})
},
Duration: length,
})
},
MinVersion: "v19.1.0",
}
}
func registerSchemaChangeBulkIngest(r *testRegistry) {
r.Add(makeSchemaChangeBulkIngestTest(5, 100000000, time.Minute*20))
}
func makeSchemaChangeBulkIngestTest(numNodes, numRows int, length time.Duration) testSpec {
return testSpec{
Name: "schemachange/bulkingest",
Cluster: makeClusterSpec(numNodes),
Timeout: length * 2,
// `fixtures import` (with the workload paths) is not supported in 2.1
MinVersion: "v19.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
// Configure column a to have sequential ascending values, and columns b and c to be constant.
// The payload column will be randomized and thus uncorrelated with the primary key (a, b, c).
aNum := numRows
if c.isLocal() {
aNum = 100000
}
bNum := 1
cNum := 1
payloadBytes := 4
crdbNodes := c.Range(1, c.spec.NodeCount-1)
workloadNode := c.Node(c.spec.NodeCount)
c.Put(ctx, cockroach, "./cockroach")
c.Put(ctx, workload, "./workload", workloadNode)
// TODO (lucy): Remove flag once the faster import is enabled by default
c.Start(ctx, t, crdbNodes, startArgs("--env=COCKROACH_IMPORT_WORKLOAD_FASTER=true"))
// Don't add another index when importing.
cmdWrite := fmt.Sprintf(
// For fixtures import, use the version built into the cockroach binary
// so the tpcc workload-versions match on release branches.
"./cockroach workload fixtures import bulkingest {pgurl:1} --a %d --b %d --c %d --payload-bytes %d --index-b-c-a=false",
aNum, bNum, cNum, payloadBytes,
)
c.Run(ctx, workloadNode, cmdWrite)
m := newMonitor(ctx, c, crdbNodes)
indexDuration := length
if c.isLocal() {
indexDuration = time.Second * 30
}
cmdWriteAndRead := fmt.Sprintf(
"./workload run bulkingest --duration %s {pgurl:1-%d} --a %d --b %d --c %d --payload-bytes %d",
indexDuration.String(), c.spec.NodeCount-1, aNum, bNum, cNum, payloadBytes,
)
m.Go(func(ctx context.Context) error {
c.Run(ctx, workloadNode, cmdWriteAndRead)
return nil
})
m.Go(func(ctx context.Context) error {
db := c.Conn(ctx, 1)
defer db.Close()
if !c.isLocal() {
// Wait for the load generator to run for a few minutes before creating the index.
sleepInterval := time.Minute * 5
maxSleep := length / 2
if sleepInterval > maxSleep {
sleepInterval = maxSleep
}
time.Sleep(sleepInterval)
}
c.l.Printf("Creating index")
before := timeutil.Now()
if _, err := db.Exec(`CREATE INDEX payload_a ON bulkingest.bulkingest (payload, a)`); err != nil {
t.Fatal(err)
}
c.l.Printf("CREATE INDEX took %v\n", timeutil.Since(before))
return nil
})
m.Wait()
},
}
}
func registerMixedSchemaChangesTPCC1000(r *testRegistry) {
r.Add(makeMixedSchemaChanges(makeClusterSpec(5, cpu(16)), 1000, time.Hour*3))
}
func makeMixedSchemaChanges(spec clusterSpec, warehouses int, length time.Duration) testSpec {
return testSpec{
Name: "schemachange/mixed/tpcc",
Cluster: spec,
Timeout: length * 3,
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: warehouses,
Extra: "--wait=false --tolerate-errors",
During: func(ctx context.Context) error {
if t.IsBuildVersion(`v19.2.0`) {
if err := runAndLogStmts(ctx, t, c, "mixed-schema-changes-19.2", []string{
// CREATE TABLE AS with a specified primary key was added in 19.2.
`CREATE TABLE tpcc.orderpks (o_w_id, o_d_id, o_id, PRIMARY KEY(o_w_id, o_d_id, o_id)) AS select o_w_id, o_d_id, o_id from tpcc.order;`,
}); err != nil {
return err
}
}
return runAndLogStmts(ctx, t, c, "mixed-schema-changes", []string{
`CREATE INDEX ON tpcc.order (o_carrier_id);`,
`CREATE TABLE tpcc.customerpks (c_w_id INT, c_d_id INT, c_id INT, FOREIGN KEY (c_w_id, c_d_id, c_id) REFERENCES tpcc.customer (c_w_id, c_d_id, c_id));`,
`ALTER TABLE tpcc.order ADD COLUMN orderdiscount INT DEFAULT 0;`,
`ALTER TABLE tpcc.order ADD CONSTRAINT nodiscount CHECK (orderdiscount = 0);`,
`ALTER TABLE tpcc.orderpks ADD CONSTRAINT warehouse_id FOREIGN KEY (o_w_id) REFERENCES tpcc.warehouse (w_id);`,
// The FK constraint on tpcc.district referencing tpcc.warehouse is
// unvalidated, thus this operation will not be a noop.
`ALTER TABLE tpcc.district VALIDATE CONSTRAINT fk_d_w_id_ref_warehouse;`,
`ALTER TABLE tpcc.orderpks RENAME TO tpcc.readytodrop;`,
`TRUNCATE TABLE tpcc.readytodrop CASCADE;`,
`DROP TABLE tpcc.readytodrop CASCADE;`,
})
},
Duration: length,
})
},
MinVersion: "v19.1.0",
}
}
func runAndLogStmts(ctx context.Context, t *test, c *cluster, prefix string, stmts []string) error {
db := c.Conn(ctx, 1)
defer db.Close()
c.l.Printf("%s: running %d statements\n", prefix, len(stmts))
start := timeutil.Now()
for i, stmt := range stmts {
// Let some traffic run before the schema change.
time.Sleep(time.Minute)
c.l.Printf("%s: running statement %d...\n", prefix, i+1)
before := timeutil.Now()
if _, err := db.Exec(stmt); err != nil {
t.Fatal(err)
}
c.l.Printf("%s: statement %d: %q took %v\n", prefix, i+1, stmt, timeutil.Since(before))
}
c.l.Printf("%s: ran %d statements in %v\n", prefix, len(stmts), timeutil.Since(start))
return nil
}