Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
114326: roachtest: add online restore roachtest r=dt a=msbutler

This patch adds the following roachtest:
```
restore/online/tpce/400GB/aws/inc-count=1/nodes=4/cpus=8
```

which restores the TPCE database from a full backup without revision history
using online restore and restarts the tpce workload during the download phase.

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Nov 14, 2023
2 parents b3d6b39 + 0a7d912 commit 7ba9725
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ go_library(
"nodejs_postgres.go",
"npgsql.go",
"npgsql_blocklist.go",
"online_restore.go",
"orm_helpers.go",
"parsing_helpers.go",
"pebble_write_throughput.go",
Expand Down
141 changes: 141 additions & 0 deletions pkg/cmd/roachtest/tests/online_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func registerOnlineRestore(r registry.Registry) {
durationGauge := r.PromFactory().NewGaugeVec(prometheus.GaugeOpts{Namespace: registry.
PrometheusNameSpace, Subsystem: "online_restore", Name: "duration"}, []string{"test_name"})

for _, sp := range []restoreSpecs{
{
namePrefix: "online",
hardware: makeHardwareSpecs(hardwareSpecs{ebsThroughput: 250 /* MB/s */}),
backup: makeRestoringBackupSpecs(backupSpecs{nonRevisionHistory: true, version: "v23.1.11"}),
timeout: 5 * time.Hour,
clouds: registry.AllClouds,
suites: registry.Suites(registry.Nightly),
tags: registry.Tags("aws"),
restoreUptoIncremental: 1,
skip: "used for ad hoc experiments",
},
} {
sp := sp
sp.initTestName()
r.Add(registry.TestSpec{
Name: sp.testName,
Owner: registry.OwnerDisasterRecovery,
Benchmark: true,
Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud),
Timeout: sp.timeout,
// These tests measure performance. To ensure consistent perf,
// disable metamorphic encryption.
EncryptionSupport: registry.EncryptionAlwaysDisabled,
CompatibleClouds: sp.clouds,
Suites: sp.suites,
Tags: sp.tags,
Skip: sp.skip,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {

rd := makeRestoreDriver(t, c, sp)
rd.prepareCluster(ctx)

m := c.NewMonitor(ctx)
dul := roachtestutil.NewDiskUsageLogger(t, c)
m.Go(dul.Runner)
m.Go(func(ctx context.Context) error {
defer dul.Done()
t.Status(`running setup statements`)
db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0])
if err != nil {
return errors.Wrapf(err, "failure to run setup statements")
}
defer db.Close()

t.Status(`Running online restore: linking phase`)
metricCollector := rd.initRestorePerfMetrics(ctx, durationGauge)
restoreCmd := rd.restoreCmd("DATABASE tpce", "WITH EXPERIMENTAL DEFERRED COPY")
t.L().Printf("Running %s", restoreCmd)
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
return err
}
metricCollector()
return nil
})
m.Wait()

t.Status(`Running online restore: download phase`)
workloadCtx, workloadCancel := context.WithCancel(ctx)
mDownload := c.NewMonitor(workloadCtx)
defer func() {
workloadCancel()
mDownload.Wait()
}()
// TODO(msbutler): add foreground query latency tracker

mDownload.Go(func(ctx context.Context) error {
err := sp.backup.workload.run(ctx, t, c, sp.hardware)
// We expect the workload to return a context cancelled error because
// the roachtest driver cancels the monitor's context after the download job completes
if err != nil && ctx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
rd.t.L().Printf("workload successfully finished")
return nil
})
mDownload.Go(func(ctx context.Context) error {
defer workloadCancel()
// Wait for the job to succeed.
succeededJobTick := time.NewTicker(time.Minute * 1)
defer succeededJobTick.Stop()
done := ctx.Done()
conn, err := c.ConnE(ctx, t.L(), c.Node(1)[0])
require.NoError(t, err)
defer conn.Close()
for {
select {
case <-done:
return ctx.Err()
case <-succeededJobTick.C:
var status string
if err := conn.QueryRow(`SELECT status FROM [SHOW JOBS] WHERE job_type = 'RESTORE' ORDER BY created DESC LIMIT 1`).Scan(&status); err != nil {
return err
}
if status == string(jobs.StatusSucceeded) {
return nil
} else if status == string(jobs.StatusRunning) {
rd.t.L().Printf("Download job still running")
} else {
return errors.Newf("job unexpectedly found in %s state", status)
}
}
}
})
mDownload.Wait()
},
})
}
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func RegisterTests(r registry.Registry) {
registerRestart(r)
registerRestore(r)
registerRestoreNodeShutdown(r)
registerOnlineRestore(r)
registerRoachmart(r)
registerRoachtest(r)
registerRubyPG(r)
Expand Down
30 changes: 24 additions & 6 deletions pkg/cmd/roachtest/tests/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,14 @@ type backupSpecs struct {

// workload defines the backed up workload.
workload backupWorkload

// nonRevisionHistory is true if the backup is not a revision history backup
// (note that the default backup in the restore roachtests contains revision
// history).
//
// TODO(msbutler): if another fixture requires a different backup option,
// create a new backupOpts struct.
nonRevisionHistory bool
}

func (bs backupSpecs) storagePrefix() string {
Expand All @@ -670,13 +678,17 @@ func (bs backupSpecs) storagePrefix() string {
func (bs backupSpecs) backupCollection() string {
// N.B. AWS buckets are _regional_ whereas GCS buckets are _multi-regional_. Thus, in order to avoid egress (cost),
// we use us-east-2 for AWS, which is the default region for all roachprod clusters. (See roachprod/vm/aws/aws.go)
properties := ""
if bs.nonRevisionHistory {
properties = "/rev-history=false"
}
switch bs.storagePrefix() {
case "s3":
return fmt.Sprintf(`'s3://cockroach-fixtures-us-east-2/backups/%s/%s/inc-count=%d?AUTH=implicit'`,
bs.workload.fixtureDir(), bs.version, bs.numBackupsInChain)
return fmt.Sprintf(`'s3://cockroach-fixtures-us-east-2/backups/%s/%s/inc-count=%d%s?AUTH=implicit'`,
bs.workload.fixtureDir(), bs.version, bs.numBackupsInChain, properties)
case "gs":
return fmt.Sprintf(`'gs://cockroach-fixtures/backups/%s/%s/inc-count=%d?AUTH=implicit'`,
bs.workload.fixtureDir(), bs.version, bs.numBackupsInChain)
return fmt.Sprintf(`'gs://cockroach-fixtures/backups/%s/%s/inc-count=%d%s?AUTH=implicit'`,
bs.workload.fixtureDir(), bs.version, bs.numBackupsInChain, properties)
default:
panic(fmt.Sprintf("unknown storage prefix: %s", bs.storagePrefix()))
}
Expand Down Expand Up @@ -708,6 +720,10 @@ func makeBackupSpecs(override backupSpecs, specs backupSpecs) backupSpecs {
specs.numBackupsInChain = override.numBackupsInChain
}

if override.nonRevisionHistory != specs.nonRevisionHistory {
specs.nonRevisionHistory = override.nonRevisionHistory
}

if override.workload != nil {
specs.workload = override.workload
}
Expand Down Expand Up @@ -825,11 +841,13 @@ type restoreSpecs struct {
// restored user space tables.
fingerprint int

testName string
setUpStmts []string

// skip, if non-empty, skips the test with the given reason.
skip string

// testname is set automatically.
testName string
}

func (sp *restoreSpecs) initTestName() {
Expand Down Expand Up @@ -902,7 +920,7 @@ func (rd *restoreDriver) getAOST(ctx context.Context) {
conn := rd.c.Conn(ctx, rd.t.L(), 1)
defer conn.Close()
err := conn.QueryRowContext(ctx, rd.sp.getAostCmd()).Scan(&aost)
require.NoError(rd.t, err)
require.NoError(rd.t, err, fmt.Sprintf("aost cmd failed: %s", rd.sp.getAostCmd()))
rd.aost = aost
}

Expand Down

0 comments on commit 7ba9725

Please sign in to comment.