diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index dcf348b5dac6..5f20b5ad262e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -17,9 +17,11 @@ import ( "net/url" "os" "path/filepath" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" @@ -30,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -53,8 +56,8 @@ type clusterInfo struct { // db provides a connection to the system tenant db *gosql.DB - // sql provides a sql connection to the host cluster - sql *sqlutils.SQLRunner + // sql provides a sql connection to the system tenant + sysSQL *sqlutils.SQLRunner // nodes indicates the roachprod nodes running the cluster's nodes nodes option.NodeListOption @@ -67,11 +70,69 @@ type c2cSetup struct { metrics c2cMetrics } +var c2cPromMetrics = map[string]clusterstats.ClusterStat{ + "LogicalMegabytes": { + LabelName: "node", + Query: "replication_logical_bytes / 1e6"}, + "PhysicalMegabytes": { + LabelName: "node", + Query: "replication_sst_bytes / 1e6"}, + "PhysicalReplicatedMegabytes": { + LabelName: "node", + Query: "capacity_used / 1e6"}, +} + +func sumOverLabel(stats map[string]map[string]clusterstats.StatPoint, label string) float64 { + var mean float64 + for _, stat := range stats[label] { + mean += stat.Value + } + return mean +} + +func (cc *c2cSetup) startStatsCollection( + ctx context.Context, t test.Test, c cluster.Cluster, +) func(time.Time) map[string]float64 { + + if c.IsLocal() { + // Grafana does not run locally. + return func(snapTime time.Time) map[string]float64 { + return map[string]float64{} + } + } + // TODO(msbutler): pass a proper cluster replication dashboard and figure out why we need to + // pass a grafana dashboard for this to work + cfg := (&prometheus.Config{}). + WithPrometheusNode(cc.workloadNode.InstallNodes()[0]). + WithCluster(cc.dst.nodes.InstallNodes()). + WithNodeExporter(cc.dst.nodes.InstallNodes()). + WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") + + require.NoError(t, c.StartGrafana(ctx, t.L(), cfg)) + t.L().Printf("Prom has started") + + client, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), cfg) + require.NoError(t, err, "error creating prometheus client for stats collector") + collector := clusterstats.NewStatsCollector(ctx, client) + + return func(snapTime time.Time) map[string]float64 { + metricSnap := make(map[string]float64) + for name, stat := range c2cPromMetrics { + point, err := collector.CollectPoint(ctx, t.L(), snapTime, stat.Query) + if err != nil { + t.L().Errorf("Could not query prom %s", err.Error()) + } + // TODO(msbutler): update the CollectPoint api to conduct the sum in Prom instead. + metricSnap[name] = sumOverLabel(point, stat.LabelName) + t.L().Printf("%s: %.2f", name, metricSnap[name]) + } + return metricSnap + } +} + // DiskUsageTracker can grab the disk usage of the provided cluster. // -// TODO(msbutler): move DiskUsageTracker, exportedMetric, -// SizeTime and helper methods to an external package that all -// roachtests can use. +// TODO(msbutler): deprecate this, once restore roachtests also use prom setup. type DiskUsageTracker struct { c cluster.Cluster l *logger.Logger @@ -101,88 +162,94 @@ func NewDiskUsageTracker( return &DiskUsageTracker{c: c, l: diskLogger}, nil } -// exportedMetric describes a measurement created in the roachtest process that will export to -// roachperf or a prom/grafana instance. -// -// TODO(msbutler): currently, the exported metrics are merely printed at end of -// the roachtest. Refactor these methods to play nice with a roachtest prom endpoint, -// once it exists. -type exportedMetric struct { - metric float64 - unit string -} - -// newMetric creates a new exportedMetric -func newMetric(metric float64, unit string) exportedMetric { - return exportedMetric{metric, unit} -} - -func (em exportedMetric) StringWithUnits() string { - return fmt.Sprintf("%.2f %s", em.metric, em.unit) -} - -func (em exportedMetric) String() string { - return fmt.Sprintf("%.2f", em.metric) +type metricSnapshot struct { + metrics map[string]float64 + time time.Time } -// sizeTime captures the disk size of the nodes at some moment in time -type sizeTime struct { - // size is the megabytes of the objects - size int - time time.Time - nodeCount int -} - -func newSizeTime(ctx context.Context, du *DiskUsageTracker, nodes option.NodeListOption) sizeTime { - return sizeTime{ - size: du.GetDiskUsage(ctx, nodes), - time: timeutil.Now(), - nodeCount: len(nodes), +func newMetricSnapshot( + metricSnapper func(time.Time) map[string]float64, ts time.Time, +) metricSnapshot { + snap := metricSnapshot{ + time: ts, + metrics: metricSnapper(ts), } + return snap } -// diskDiffThroughput estimates throughput between two time intervals as mb/s/node by assuming -// that the total bytes written between the time intervals is diskUsage_End - diskUsage_Start. -func diskDiffThroughput(start sizeTime, end sizeTime) float64 { - if start.nodeCount != end.nodeCount { - panic("node count cannot change while measuring throughput") - } - return (float64(end.size-start.size) / end.time.Sub(start.time).Seconds()) / float64(start.nodeCount) +// calcThroughput estimates throughput between two time intervals as metric_unit/s/node +// for the provided metric, assuming the cluster had the same number of nodes +// over the interval. +func calcThroughput( + startMetric float64, endMetric float64, interval time.Duration, nodeCount int, +) float64 { + return (endMetric - startMetric) / (interval.Seconds() * float64(nodeCount)) } type c2cMetrics struct { - start sizeTime + initalScanStart metricSnapshot - initialScanEnd sizeTime + initialScanEnd metricSnapshot - cutoverStart sizeTime + // cutoverTo records stats at the system time to which the dst cluster cuts over to. + cutoverTo metricSnapshot - cutoverEnd sizeTime + cutoverStart metricSnapshot + + cutoverEnd metricSnapshot fingerprintingStart time.Time fingerprintingEnd time.Time } -func (m c2cMetrics) export() map[string]exportedMetric { - metrics := map[string]exportedMetric{} +// export summarizes all metrics gathered throughout the test. +func (m c2cMetrics) export(t test.Test, nodeCount int) { + + // aggregate aggregates metric snapshots across two time periods. A non-zero + // durationOverride will be used instead of the duration between the two + // passed in snapshots. + aggregate := func( + start metricSnapshot, + end metricSnapshot, + label string, + durationOverride time.Duration) { + if start.metrics == nil || end.metrics == nil { + return + } - populate := func(start sizeTime, end sizeTime, label string) { - metrics[label+"Duration"] = newMetric(end.time.Sub(start.time).Minutes(), "Minutes") + metrics := map[string]float64{} + duration := durationOverride + if duration == 0 { + duration = end.time.Sub(start.time) + } + metrics["Duration Minutes"] = duration.Minutes() - // Describes the cluster size difference between two timestamps. - metrics[label+"Size"] = newMetric(float64(end.size-start.size), "MB") - metrics[label+"Throughput"] = newMetric(diskDiffThroughput(start, end), "MB/S/Node") + for metricName := range start.metrics { + metrics["Size_"+metricName] = end.metrics[metricName] - start.metrics[metricName] + metrics["Throughput_"+metricName+"_MB/S/Node"] = calcThroughput( + start.metrics[metricName], end.metrics[metricName], duration, nodeCount) + } + // Print all the metrics for now while we wait for prom/grafana to visualize perf over time. + // Sort the metrics for pretty printing. + metricNames := make([]string, 0, len(metrics)) + for name := range metrics { + metricNames = append(metricNames, name) + } + sort.Strings(metricNames) + t.L().Printf("%s Perf:", label) + for _, name := range metricNames { + t.L().Printf("\t%s : %.2f", name, metrics[name]) + } } - if m.initialScanEnd.nodeCount != 0 { - populate(m.start, m.initialScanEnd, "InitialScan") - } + aggregate(m.initalScanStart, m.initialScanEnd, "InitialScan", 0) - if m.cutoverEnd.nodeCount != 0 { - populate(m.cutoverStart, m.cutoverEnd, "Cutover") - } - return metrics + aggregate(m.initialScanEnd, m.cutoverStart, "Workload", 0) + + // The _amount_ of data processed during cutover should be the data ingested between the + // timestamp we cut over to and the start of the cutover process. + aggregate(m.cutoverTo, m.cutoverStart, "Cutover", m.cutoverEnd.time.Sub(m.cutoverStart.time)) } func setupC2C( @@ -195,12 +262,14 @@ func setupC2C( workloadNode := c.Node(srcKVNodes + dstKVNodes + 1) c.Put(ctx, t.DeprecatedWorkload(), "./workload", workloadNode) - srcStartOps := option.DefaultStartOpts() + // TODO(msbutler): allow for backups once this test stabilizes a bit more. + srcStartOps := option.DefaultStartOptsNoBackups() srcStartOps.RoachprodOpts.InitTarget = 1 srcClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), srcStartOps, srcClusterSetting, srcCluster) - dstStartOps := option.DefaultStartOpts() + // TODO(msbutler): allow for backups once this test stabilizes a bit more. + dstStartOps := option.DefaultStartOptsNoBackups() dstStartOps.RoachprodOpts.InitTarget = srcKVNodes + 1 dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) @@ -232,24 +301,26 @@ func setupC2C( require.NoError(t, err) srcTenantInfo := clusterInfo{ - name: srcTenantName, - ID: srcTenantID, - pgURL: pgURL, - sql: srcSQL, - db: srcDB, - nodes: srcCluster} + name: srcTenantName, + ID: srcTenantID, + pgURL: pgURL, + sysSQL: srcSQL, + db: srcDB, + nodes: srcCluster} destTenantInfo := clusterInfo{ - name: destTenantName, - ID: destTenantID, - sql: destSQL, - db: destDB, - nodes: dstCluster} + name: destTenantName, + ID: destTenantID, + sysSQL: destSQL, + db: destDB, + nodes: dstCluster} - return &c2cSetup{ + setup := &c2cSetup{ src: srcTenantInfo, dst: destTenantInfo, workloadNode: workloadNode, metrics: c2cMetrics{}} + + return setup } type streamingWorkload interface { @@ -338,6 +409,21 @@ func registerClusterToCluster(r registry.Registry) { additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, }, + { + name: "c2c/tpcc/warehouses=1000/duration=60/cutover=30", + srcNodes: 4, + dstNodes: 4, + cpus: 8, + pdSize: 1000, + // 500 warehouses adds 30 GB to source + // + // TODO(msbutler): increase default test to 1000 warehouses once fingerprinting + // job speeds up. + workload: replicateTPCC{warehouses: 1000}, + timeout: 3 * time.Hour, + additionalDuration: 60 * time.Minute, + cutover: 30 * time.Minute, + }, { name: "c2c/kv0", srcNodes: 3, @@ -366,23 +452,16 @@ func registerClusterToCluster(r registry.Registry) { Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { setup := setupC2C(ctx, t, c, sp.srcNodes, sp.dstNodes) m := c.NewMonitor(ctx, setup.src.nodes.Merge(setup.dst.nodes)) - du, err := NewDiskUsageTracker(c, t.L()) - require.NoError(t, err) - var initDuration time.Duration + metricSnapper := setup.startStatsCollection(ctx, t, c) if initCmd := sp.workload.sourceInitCmd(setup.src.name, setup.src.nodes); initCmd != "" { t.Status("populating source cluster before replication") - setup.metrics.start = newSizeTime(ctx, du, setup.src.nodes) + initStart := timeutil.Now() c.Run(ctx, setup.workloadNode, initCmd) - setup.metrics.initialScanEnd = newSizeTime(ctx, du, setup.src.nodes) - - initDuration = setup.metrics.initialScanEnd.time.Sub(setup.metrics.start.time) - t.L().Printf("src cluster workload initialization took %s minutes", initDuration) + t.L().Printf("src cluster workload initialization took %s minutes", timeutil.Since(initStart).Minutes()) } - t.L().Printf("begin workload on src cluster") workloadCtx, workloadCancel := context.WithCancel(ctx) defer workloadCancel() - workloadDoneCh := make(chan struct{}) m.Go(func(ctx context.Context) error { err := c.RunE(workloadCtx, setup.workloadNode, @@ -399,10 +478,14 @@ func registerClusterToCluster(r registry.Registry) { }) t.Status("starting replication stream") + setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + // There's no need to remove the tenant limiters for this new app tenant, as + // all replication traffic flows through the system tenant. streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sql.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) + setup.dst.sysSQL.Exec(t, streamReplStmt) + ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name) // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the @@ -416,12 +499,12 @@ func registerClusterToCluster(r registry.Registry) { t.L().Printf("waiting for replication stream to finish ingesting initial scan") waitForHighWatermark(t, setup.dst.db, ingestionJobID, sp.timeout/2) - + setup.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, sp.additionalDuration)) var currentTime time.Time - setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) t.Status("cutover time chosen: ", cutoverTime.String()) @@ -434,10 +517,13 @@ func registerClusterToCluster(r registry.Registry) { return } t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name)) - setup.metrics.cutoverStart = newSizeTime(ctx, du, setup.dst.nodes) - stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) - setup.metrics.cutoverEnd = newSizeTime(ctx, du, setup.dst.nodes) + retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name)) + setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) + setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime) + setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) + + setup.metrics.export(t, len(setup.src.nodes)) t.Status("comparing fingerprints") compareTenantFingerprintsAtTimestamp( @@ -449,20 +535,6 @@ func registerClusterToCluster(r registry.Registry) { ) lv.assertValid(t) - // TODO(msbutler): export metrics to roachperf or prom/grafana - exportedMetrics := setup.metrics.export() - t.L().Printf(`Initial Scan: Duration, Size, Throughput; Cutover: Duration, Size, Throughput`) - t.L().Printf(`%s %s %s %s %s %s`, - exportedMetrics["InitialScanDuration"].String(), - exportedMetrics["InitialScanSize"].String(), - exportedMetrics["InitialScanThroughput"].String(), - exportedMetrics["CutoverDuration"].String(), - exportedMetrics["CutoverSize"].String(), - exportedMetrics["CutoverThroughput"].String(), - ) - for key, metric := range exportedMetrics { - t.L().Printf("%s: %s", key, metric.String()) - } }, }) } @@ -495,14 +567,14 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) var srcFingerprint int64 m.Go(func(ctx context.Context) error { - setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) + setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) return nil }) var destFingerprint int64 m.Go(func(ctx context.Context) error { // TODO(adityamaru): Measure and record fingerprinting throughput. setup.metrics.fingerprintingStart = timeutil.Now() - setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) + setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) setup.metrics.fingerprintingEnd = timeutil.Now() fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String() t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) @@ -611,7 +683,8 @@ func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { } func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`) + db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, + `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func copyPGCertsAndMakeURL( diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 35603c449a53..f1cf7a0ae4e8 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -313,6 +313,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) + removeTenantRateLimiters(t, sysSQL, tenantName) + // Opening a SQL session to a newly created in-process tenant may require a // few retries. Unfortunately, the c.ConnE and MakeSQLRunner APIs do not make // it clear if they eagerly open a session with the tenant or wait until the @@ -331,14 +333,22 @@ func createInMemoryTenant( return nil }) - // Currently, a tenant has by default a 10m RU burst limit, which can be - // reached during these tests. To prevent RU limit throttling, add 10B RUs to - // the tenant. - var tenantID int - sysSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) - sysSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, -10000000000, now(), 0);`, tenantID) if secure { createTenantAdminRole(t, tenantName, tenantSQL) } } + +// removeTenantRateLimiters ensures the tenant is not throttled by limiters. +func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { + var tenantID int + systemSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) + systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, +10000000000, now(), 0);`, tenantID) + systemSQL.ExecMultiple(t, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) +}