Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c2c: increase c2c roachtest workload flexibility #95191

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 81 additions & 48 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ func newMetric(metric float64, unit string) exportedMetric {
return exportedMetric{metric, unit}
}

func (em exportedMetric) String() string {
func (em exportedMetric) StringWithUnits() string {
return fmt.Sprintf("%.2f %s", em.metric, em.unit)
}

func (em exportedMetric) String() string {
return fmt.Sprintf("%.2f", em.metric)
}

// sizeTime captures the disk size of the nodes at some moment in time
type sizeTime struct {
// size is the megabytes of the objects
Expand Down Expand Up @@ -263,8 +267,19 @@ func setupC2C(
srcTenantInfo.sql.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0,
10000000000, now(), 0);`, srcTenantInfo.ID)

createSystemRole(t, srcTenantInfo.name, srcTenantInfo.sql)
createSystemRole(t, destTenantInfo.name, destTenantInfo.sql)
createSystemRole(t, srcTenantInfo.name+" system tenant", srcTenantInfo.sql)
createSystemRole(t, destTenantInfo.name+" system tenant", destTenantInfo.sql)

tenantConn, err := srcTenant.conn()
require.NoError(t, err)
createSystemRole(t, srcTenantInfo.name+" app tenant", sqlutils.MakeSQLRunner(tenantConn))
t.L().Printf(`To open a sql session on the app tenant, ssh to the tenant node and run:
./cockroach sql url="%s"`, srcTenant.secureURL())
t.L().Printf(`To open the app tenant's db console, run:
1. roachprod adminui $CLUSTER:%d
2. change the port in the url to %d
`, srcTenantNode, tenantHTTPPort)

return &c2cSetup{
src: srcTenantInfo,
dst: destTenantInfo,
Expand All @@ -277,7 +292,7 @@ func createSystemRole(t test.Test, name string, sql *sqlutils.SQLRunner) {
password := "roach"
sql.Exec(t, fmt.Sprintf(`CREATE ROLE %s WITH LOGIN PASSWORD '%s'`, username, password))
sql.Exec(t, fmt.Sprintf(`GRANT ADMIN TO %s`, username))
t.L().Printf(`Log into the %s system tenant db console with username "%s" and password "%s"`,
t.L().Printf(`Log into the %s db console with username "%s" and password "%s"`,
name, username, password)
}

Expand All @@ -286,9 +301,8 @@ type streamingWorkload interface {
// replication stream begins
sourceInitCmd(pgURL string) string

// sourceRunCmd returns a command that will run a workload for the given duration on the src
// cluster during the replication stream.
sourceRunCmd(pgURL string, duration time.Duration) string
// sourceRunCmd returns a command that will run a workload
sourceRunCmd(pgURL string) string
}

type replicateTPCC struct {
Expand All @@ -300,10 +314,10 @@ func (tpcc replicateTPCC) sourceInitCmd(pgURL string) string {
tpcc.warehouses, pgURL)
}

func (tpcc replicateTPCC) sourceRunCmd(pgURL string, duration time.Duration) string {
func (tpcc replicateTPCC) sourceRunCmd(pgURL string) string {
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run tpcc --warehouses %d --duration %dm --tolerate-errors '%s'`,
tpcc.warehouses, int(duration.Minutes()), pgURL)
return fmt.Sprintf(`./workload run tpcc --warehouses %d --tolerate-errors '%s'`,
tpcc.warehouses, pgURL)
}

type replicateKV struct {
Expand All @@ -314,10 +328,9 @@ func (kv replicateKV) sourceInitCmd(pgURL string) string {
return ""
}

func (kv replicateKV) sourceRunCmd(pgURL string, duration time.Duration) string {
func (kv replicateKV) sourceRunCmd(pgURL string) string {
// added --tolerate-errors flags to prevent test from flaking due to a transaction retry error
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent %d '%s'`,
int(duration.Minutes()),
return fmt.Sprintf(`./workload run kv --tolerate-errors --init --read-percent %d '%s'`,
kv.readPercent,
pgURL)
}
Expand Down Expand Up @@ -354,7 +367,7 @@ type replicationTestSpec struct {
func registerClusterToCluster(r registry.Registry) {
for _, sp := range []replicationTestSpec{
{
name: "c2c/tpcc",
name: "c2c/tpcc/warehouses=500/duration=10/cutover=5",
srcKVNodes: 4,
dstKVNodes: 4,
cpus: 8,
Expand All @@ -367,12 +380,13 @@ func registerClusterToCluster(r registry.Registry) {
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
cutover: 5 * time.Minute,
}, {
},
{
name: "c2c/kv0",
srcKVNodes: 3,
dstKVNodes: 3,
cpus: 8,
pdSize: 1000,
pdSize: 100,
workload: replicateKV{readPercent: 0},
timeout: 1 * time.Hour,
additionalDuration: 10 * time.Minute,
Expand All @@ -390,11 +404,16 @@ func registerClusterToCluster(r registry.Registry) {
Name: sp.name,
Owner: registry.OwnerDisasterRecovery,
Cluster: r.MakeClusterSpec(sp.dstKVNodes+sp.srcKVNodes+1, clusterOps...),
Timeout: sp.timeout,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {

setup, cleanup := setupC2C(ctx, t, c, sp.srcKVNodes, sp.dstKVNodes)
defer cleanup()

t.L().Printf("tenant secureURL: %s; http port: %s; sql port: %s",
setup.src.tenant.secureURL(), setup.src.tenant.httpPort, setup.src.tenant.sqlPort)

m := c.NewMonitor(ctx, setup.src.kvNodes.Merge(setup.dst.kvNodes))
du, err := NewDiskUsageTracker(c, t.L())
require.NoError(t, err)
Expand All @@ -406,33 +425,33 @@ func registerClusterToCluster(r registry.Registry) {
setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.kvNodes)

initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time)
t.L().Printf("src cluster workload initialization took %d minutes", int(initDuration.Minutes()))
t.L().Printf("src cluster workload initialization took %s minutes", initDuration)
}

t.Status("starting replication stream")
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",
setup.dst.name, setup.src.name, setup.src.pgURL)
setup.dst.sql.Exec(t, streamReplStmt)
ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name)
t.L().Printf("begin workload on src cluster")
workloadCtx, workloadCancel := context.WithCancel(ctx)
defer workloadCancel()

// The replication stream is expected to spend some time conducting an
// initial scan, ideally on the same order as the `initDuration`, the
// time taken to initially populate the source cluster. To ensure the
// latency verifier stabilizes, ensure the workload and the replication
// stream run for a significant amount of time after the initial scan
// ends. Explicitly, set the workload to run for the estimated initial scan
// runtime + the user specified workload duration.
workloadDuration := initDuration + sp.additionalDuration
workloadDoneCh := make(chan struct{})
m.Go(func(ctx context.Context) error {
defer close(workloadDoneCh)
cmd := sp.workload.sourceRunCmd(setup.src.tenant.secureURL(), workloadDuration)
c.Run(ctx, c.Node(setup.src.sqlNode), cmd)
err := c.RunE(workloadCtx, c.Node(setup.src.sqlNode),
sp.workload.sourceRunCmd(setup.src.tenant.secureURL()))
// The workload should only stop if the workloadCtx is cancelled once
// sp.additionalDuration has elapsed after the initial scan completes.
if workloadCtx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned on
// its own.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
workloadDoneCh <- struct{}{}
return nil
})

cutoverTime := chooseCutover(t, setup.dst.sql, workloadDuration, sp.cutover)
t.Status(fmt.Sprintf("cutover time chosen: %s", cutoverTime.String()))
t.Status("starting replication stream")
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",
setup.dst.name, setup.src.name, setup.src.pgURL)
setup.dst.sql.Exec(t, streamReplStmt)
ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name)

// TODO(ssd): The job doesn't record the initial
// statement time, so we can't correctly measure the
Expand All @@ -444,13 +463,26 @@ func registerClusterToCluster(r registry.Registry) {
return lv.pollLatency(ctx, setup.dst.db, ingestionJobID, time.Second, workloadDoneCh)
})

t.Status("waiting for replication stream to finish ingesting initial scan")
t.L().Printf("waiting for replication stream to finish ingesting initial scan")
waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2)

t.Status("waiting for src cluster workload to complete")
m.Wait()

t.Status("waiting for replication stream to cutover")
t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`,
sp.additionalDuration))

var currentTime time.Time
setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(&currentTime)
cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover)
t.Status("cutover time chosen: ", cutoverTime.String())

select {
case <-time.After(sp.additionalDuration):
workloadCancel()
t.L().Printf("workload has finished after %s", sp.additionalDuration)
case <-ctx.Done():
t.L().Printf(`roachtest context cancelled while waiting for workload duration to complete`)
return
}
t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String()))
retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name))
setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.kvNodes)
stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime)
Expand All @@ -468,6 +500,15 @@ func registerClusterToCluster(r registry.Registry) {

// TODO(msbutler): export metrics to roachperf or prom/grafana
exportedMetrics := setup.metrics.export()
t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`)
t.L().Printf(`%s %s %s %s %s %s`,
exportedMetrics["InitialScanDuration"].String(),
exportedMetrics["InitialScanSize"].String(),
exportedMetrics["InitialScanThroughput"].String(),
exportedMetrics["CutoverDuration"].String(),
exportedMetrics["CutoverSize"].String(),
exportedMetrics["CutoverThroughput"].String(),
)
for key, metric := range exportedMetrics {
t.L().Printf("%s: %s", key, metric.String())
}
Expand All @@ -484,14 +525,6 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st
return int(tenantInfo.TenantReplicationJobID)
}

func chooseCutover(
t test.Test, dstSQL *sqlutils.SQLRunner, workloadDuration time.Duration, cutover time.Duration,
) time.Time {
var currentTime time.Time
dstSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&currentTime)
return currentTime.Add(workloadDuration - cutover)
}

func compareTenantFingerprintsAtTimestamp(
t test.Test, m cluster.Monitor, setup *c2cSetup, startTime, endTime time.Time,
) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster,
t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID)
}

// conn returns a sql connection to the tenant
func (tn *tenantNode) conn() (*gosql.DB, error) {
return gosql.Open("postgres", tn.pgURL)
}

func startTenantServer(
tenantCtx context.Context,
c cluster.Cluster,
Expand Down