diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index a135d99c0b45..bb9d57b040e8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -94,6 +94,13 @@ var cutoverSignalPollInterval = settings.RegisterDurationSetting( settings.WithName("physical_replication.consumer.cutover_signal_poll_interval"), ) +var quantize = settings.RegisterDurationSettingWithExplicitUnit( + settings.SystemOnly, + "physical_replication.consumer.timestamp_granularity", + "the granularity at which replicated times are quantized to make tracking more efficient", + 5*time.Second, +) + var streamIngestionResultTypes = []*types.T{ types.Bytes, // jobspb.ResolvedSpans } @@ -874,7 +881,17 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro lowestTimestamp := hlc.MaxTimestamp highestTimestamp := hlc.MinTimestamp + d := quantize.Get(&sip.EvalCtx.Settings.SV) for _, resolvedSpan := range resolvedSpans { + // If quantizing is enabled, round the timestamp down to an even multiple of + // the quantization amount, to maximize the number of spans that share the + // same resolved timestamp -- even if they were individually resolved to + // _slightly_ different/newer timestamps -- to allow them to merge into + // fewer and larger spans in the frontier. + if d > 0 { + resolvedSpan.Timestamp.Logical = 0 + resolvedSpan.Timestamp.WallTime -= resolvedSpan.Timestamp.WallTime % int64(d) + } if resolvedSpan.Timestamp.Less(lowestTimestamp) { lowestTimestamp = resolvedSpan.Timestamp } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index f87489ee0542..e343dbf9b0dc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -203,6 +203,9 @@ func TestStreamIngestionProcessor(t *testing.T) { }, }) defer tc.Stopper().Stop(ctx) + st := cluster.MakeTestingClusterSettings() + quantize.Override(ctx, &st.SV, 0) + db := tc.Server(0).InternalDB().(descs.DB) registry := tc.Server(0).JobRegistry().(*jobs.Registry) @@ -290,7 +293,7 @@ func TestStreamIngestionProcessor(t *testing.T) { } out, err := runStreamIngestionProcessor(ctx, t, registry, db, topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, - mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */) + mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st) require.NoError(t, err) emittedRows := readRows(out) @@ -324,12 +327,13 @@ func TestStreamIngestionProcessor(t *testing.T) { } g := ctxgroup.WithContext(ctx) - sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db, + sip, err := getStreamIngestionProcessor(ctx, t, registry, db, topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient, - nil /* cutoverProvider */, nil /* streamingTestingKnobs */) + nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st) require.NoError(t, err) minimumFlushInterval.Override(ctx, &st.SV, 5*time.Millisecond) + quantize.Override(ctx, &st.SV, 0) out := &execinfra.RowChannel{} out.InitWithNumSenders(sip.OutputTypes(), 1) out.Start(ctx) @@ -367,12 +371,13 @@ func TestStreamIngestionProcessor(t *testing.T) { } g := ctxgroup.WithContext(ctx) - sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db, + sip, err := getStreamIngestionProcessor(ctx, t, registry, db, topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient, - nil /* cutoverProvider */, nil /* streamingTestingKnobs */) + nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st) require.NoError(t, err) minimumFlushInterval.Override(ctx, &st.SV, 50*time.Minute) + quantize.Override(ctx, &st.SV, 0) maxKVBufferSize.Override(ctx, &st.SV, 1) out := &execinfra.RowChannel{} out.InitWithNumSenders(sip.OutputTypes(), 1) @@ -416,13 +421,15 @@ func TestStreamIngestionProcessor(t *testing.T) { } g := ctxgroup.WithContext(ctx) - sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db, + sip, err := getStreamIngestionProcessor(ctx, t, registry, db, topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient, - nil /* cutoverProvider */, nil /* streamingTestingKnobs */) + nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st) require.NoError(t, err) minimumFlushInterval.Override(ctx, &st.SV, 50*time.Minute) maxRangeKeyBufferSize.Override(ctx, &st.SV, 1) + quantize.Override(ctx, &st.SV, 0) + out := &execinfra.RowChannel{} out.InitWithNumSenders(sip.OutputTypes(), 1) out.Start(ctx) @@ -472,7 +479,7 @@ func TestStreamIngestionProcessor(t *testing.T) { }} out, err := runStreamIngestionProcessor(ctx, t, registry, db, topology, initialScanTimestamp, checkpoint, tenantRekey, mockClient, - nil /* cutoverProvider */, streamingTestingKnobs) + nil /* cutoverProvider */, streamingTestingKnobs, st) require.NoError(t, err) emittedRows := readRows(out) @@ -499,7 +506,7 @@ func TestStreamIngestionProcessor(t *testing.T) { } out, err := runStreamIngestionProcessor(ctx, t, registry, db, topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, &errorStreamClient{}, - nil /* cutoverProvider */, nil /* streamingTestingKnobs */) + nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st) require.NoError(t, err) // Expect no rows, and just the error. @@ -680,6 +687,7 @@ func TestRandomClientGeneration(t *testing.T) { }) defer srv.Stopper().Stop(ctx) + quantize.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, 0) ts := srv.SystemLayer() registry := ts.JobRegistry().(*jobs.Registry) @@ -725,9 +733,11 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient.RegisterInterception(cancelAfterCheckpoints) randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator)) + st := cluster.MakeTestingClusterSettings() + quantize.Override(ctx, &st.SV, 0) out, err := runStreamIngestionProcessor(ctx, t, registry, ts.InternalDB().(descs.DB), topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, - randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/) + randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/, st) require.NoError(t, err) numResolvedEvents := 0 @@ -811,9 +821,10 @@ func runStreamIngestionProcessor( mockClient streamclient.Client, cutoverProvider cutoverProvider, streamingTestingKnobs *sql.StreamingTestingKnobs, + st *cluster.Settings, ) (*distsqlutils.RowBuffer, error) { - sip, _, err := getStreamIngestionProcessor(ctx, t, registry, db, - partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs) + sip, err := getStreamIngestionProcessor(ctx, t, registry, db, + partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs, st) require.NoError(t, err) out := &distsqlutils.RowBuffer{} @@ -841,11 +852,11 @@ func getStreamIngestionProcessor( mockClient streamclient.Client, cutoverProvider cutoverProvider, streamingTestingKnobs *sql.StreamingTestingKnobs, -) (*streamIngestionProcessor, *cluster.Settings, error) { - st := cluster.MakeTestingClusterSettings() + st *cluster.Settings, +) (*streamIngestionProcessor, error) { evalCtx := eval.MakeTestingEvalContext(st) if mockClient == nil { - return nil, nil, errors.AssertionFailedf("non-nil streamclient required") + return nil, errors.AssertionFailedf("non-nil streamclient required") } testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) @@ -893,7 +904,7 @@ func getStreamIngestionProcessor( sip.cutoverProvider = cutoverProvider } - return sip, st, err + return sip, err } func resolvedSpansMinTS(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp { diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index 6c9f192a602c..cfd42c0d92db 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer", visibility = ["//visibility:public"], deps = [ + "//pkg/ccl/kvccl/kvfollowerreadsccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/replicationutils", "//pkg/ccl/utilccl", diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 99383bdaac65..fb31a31c6b01 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -12,6 +12,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -294,8 +295,11 @@ func buildReplicationStreamSpec( // Partition the spans with SQLPlanner dsp := jobExecCtx.DistSQLPlanner() - planCtx := dsp.NewPlanningCtx( - ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, + noLoc := roachpb.Locality{} + oracle := kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc) + + planCtx := dsp.NewPlanningCtxWithOracle( + ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, oracle, noLoc, ) spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index c66063653635..41b39bd0ec06 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -531,14 +531,14 @@ func (rd *replicationDriver) setupC2C( overrideSrcAndDestTenantTTL(t, srcSQL, destSQL, rd.rs.overrideTenantTTL) - createTenantAdminRole(t, "src-system", srcSQL) - createTenantAdminRole(t, "dst-system", destSQL) + deprecatedCreateTenantAdminRole(t, "src-system", srcSQL) + deprecatedCreateTenantAdminRole(t, "dst-system", destSQL) srcTenantID, destTenantID := 2, 2 srcTenantName := "src-tenant" destTenantName := "destination-tenant" - createInMemoryTenant(ctx, t, c, srcTenantName, srcCluster, true) + deprecatedCreateInMemoryTenant(ctx, t, c, srcTenantName, srcCluster, true) pgURL, err := copyPGCertsAndMakeURL(ctx, t, c, srcNode, srcClusterSetting.PGUrlCertsDir, addr[0]) require.NoError(t, err) @@ -983,7 +983,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.t.L().Printf("starting the destination tenant") - conn := startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes) + conn := deprecatedStartInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes) conn.Close() rd.metrics.export(rd.t, len(rd.setup.src.nodes)) diff --git a/pkg/cmd/roachtest/tests/multitenant.go b/pkg/cmd/roachtest/tests/multitenant.go index 5f5b84ad1979..c255bf922ab3 100644 --- a/pkg/cmd/roachtest/tests/multitenant.go +++ b/pkg/cmd/roachtest/tests/multitenant.go @@ -120,7 +120,7 @@ func runAcceptanceMultitenantMultiRegion(ctx context.Context, t test.Test, c clu for i, node := range c.All() { region := regions[i] regionInfo := fmt.Sprintf("cloud=%s,region=%s,zone=%s", c.Cloud(), regionOnly(region), region) - tenant := createTenantNode(ctx, t, c, c.All(), tenantID, node, tenantHTTPPort, tenantSQLPort, createTenantRegion(regionInfo)) + tenant := deprecatedCreateTenantNode(ctx, t, c, c.All(), tenantID, node, tenantHTTPPort, tenantSQLPort, createTenantRegion(regionInfo)) tenant.start(ctx, t, c, "./cockroach") tenants = append(tenants, tenant) diff --git a/pkg/cmd/roachtest/tests/multitenant_upgrade.go b/pkg/cmd/roachtest/tests/multitenant_upgrade.go index 99f9e3a2b3c6..585f295907ce 100644 --- a/pkg/cmd/roachtest/tests/multitenant_upgrade.go +++ b/pkg/cmd/roachtest/tests/multitenant_upgrade.go @@ -113,13 +113,13 @@ func runMultiTenantUpgrade( // Create two instances of tenant 11 so that we can test with two pods // running during migration. const tenantNode = 2 - tenant11a := createTenantNode(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11aHTTPPort, tenant11aSQLPort) + tenant11a := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11aHTTPPort, tenant11aSQLPort) tenant11a.start(ctx, t, c, predecessorBinary) defer tenant11a.stop(ctx, t, c) - // Since the certs are created with the createTenantNode call above, we + // Since the certs are created with the deprecatedCreateTenantNode call above, we // call the "no certs" version of create tenant here. - tenant11b := createTenantNodeNoCerts(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11bHTTPPort, tenant11bSQLPort) + tenant11b := deprecatedCreateTenantNodeNoCerts(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11bHTTPPort, tenant11bSQLPort) tenant11b.start(ctx, t, c, predecessorBinary) defer tenant11b.stop(ctx, t, c) @@ -174,7 +174,7 @@ func runMultiTenantUpgrade( withResults([][]string{{"1", "bar"}})) t.Status("starting tenant 12 server with older binary") - tenant12 := createTenantNode(ctx, t, c, kvNodes, tenant12ID, tenantNode, tenant12HTTPPort, tenant12SQLPort) + tenant12 := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant12ID, tenantNode, tenant12HTTPPort, tenant12SQLPort) tenant12.start(ctx, t, c, predecessorBinary) defer tenant12.stop(ctx, t, c) @@ -196,7 +196,7 @@ func runMultiTenantUpgrade( runner.Exec(t, `SELECT crdb_internal.create_tenant($1::INT)`, tenant13ID) t.Status("starting tenant 13 server with new binary") - tenant13 := createTenantNode(ctx, t, c, kvNodes, tenant13ID, tenantNode, tenant13HTTPPort, tenant13SQLPort) + tenant13 := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant13ID, tenantNode, tenant13HTTPPort, tenant13SQLPort) tenant13.start(ctx, t, c, currentBinary) defer tenant13.stop(ctx, t, c) @@ -356,7 +356,7 @@ func runMultiTenantUpgrade( runner.Exec(t, `SELECT crdb_internal.create_tenant($1::INT)`, tenant14ID) t.Status("verifying that the tenant 14 server works and has the proper version") - tenant14 := createTenantNode(ctx, t, c, kvNodes, tenant14ID, tenantNode, tenant14HTTPPort, tenant14SQLPort) + tenant14 := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant14ID, tenantNode, tenant14HTTPPort, tenant14SQLPort) tenant14.start(ctx, t, c, currentBinary) defer tenant14.stop(ctx, t, c) verifySQL(t, tenant14.pgURL, diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 39ac17bf1ba5..81e63de4e4fc 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -107,7 +107,8 @@ func createTenantNodeInternal( return tn } -func createTenantNode( +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedCreateTenantNode( ctx context.Context, t test.Test, c cluster.Cluster, @@ -118,7 +119,8 @@ func createTenantNode( return createTenantNodeInternal(ctx, t, c, kvnodes, tenantID, node, httpPort, sqlPort, true /* certs */, opts...) } -func createTenantNodeNoCerts( +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedCreateTenantNodeNoCerts( ctx context.Context, t test.Test, c cluster.Cluster, @@ -189,7 +191,7 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster, randomSeed := rand.Int63() c.SetRandomSeed(randomSeed) require.NoError(t, err) - tn.errCh = startTenantServer( + tn.errCh = deprecatedStartTenantServer( ctx, c, c.Node(tn.node), internalIPs[0], binary, tn.kvAddrs, tn.tenantID, tn.httpPort, tn.sqlPort, tn.envVars, randomSeed, extraArgs..., @@ -251,7 +253,8 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster, t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID) } -func startTenantServer( +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedStartTenantServer( tenantCtx context.Context, c cluster.Cluster, node option.NodeListOption, @@ -289,8 +292,11 @@ func startTenantServer( return errCh } -// createTenantAdminRole creates a role that can be used to log into a secure cluster's db console. -func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.SQLRunner) { +// deprecatedCreateTenantAdminRole creates a role that can be used to log into a secure cluster's db console. +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedCreateTenantAdminRole( + t test.Test, tenantName string, tenantSQL *sqlutils.SQLRunner, +) { tenantSQL.Exec(t, fmt.Sprintf(`CREATE ROLE IF NOT EXISTS %s WITH LOGIN PASSWORD '%s'`, install.DefaultUser, install.DefaultPassword)) tenantSQL.Exec(t, fmt.Sprintf(`GRANT ADMIN TO %s`, install.DefaultUser)) t.L().Printf(`Log into %s db console with username "%s" and password "%s"`, @@ -299,9 +305,10 @@ func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.S const appTenantName = "app" -// createInMemoryTenant runs through the necessary steps to create an in-memory +// deprecatedCreateInMemoryTenant runs through the necessary steps to create an in-memory // tenant without resource limits and full dbconsole viewing privileges. -func createInMemoryTenant( +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedCreateInMemoryTenant( ctx context.Context, t test.Test, c cluster.Cluster, @@ -309,15 +316,16 @@ func createInMemoryTenant( nodes option.NodeListOption, secure bool, ) { - db := createInMemoryTenantWithConn(ctx, t, c, tenantName, nodes, secure) + db := deprecatedCreateInMemoryTenantWithConn(ctx, t, c, tenantName, nodes, secure) db.Close() } -// createInMemoryTenantWithConn runs through the necessary steps to create an +// deprecatedCreateInMemoryTenantWithConn runs through the necessary steps to create an // in-memory tenant without resource limits and full dbconsole viewing // privileges. As a convenience, it also returns a connection to the tenant (on // a random node in the cluster). -func createInMemoryTenantWithConn( +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedCreateInMemoryTenantWithConn( ctx context.Context, t test.Test, c cluster.Cluster, @@ -330,19 +338,20 @@ func createInMemoryTenantWithConn( sysSQL := sqlutils.MakeSQLRunner(sysDB) sysSQL.Exec(t, "CREATE TENANT $1", tenantName) - tenantConn := startInMemoryTenant(ctx, t, c, tenantName, nodes) + tenantConn := deprecatedStartInMemoryTenant(ctx, t, c, tenantName, nodes) tenantSQL := sqlutils.MakeSQLRunner(tenantConn) if secure { - createTenantAdminRole(t, tenantName, tenantSQL) + deprecatedCreateTenantAdminRole(t, tenantName, tenantSQL) } return tenantConn } -// startInMemoryTenant starts an in memory tenant that has already been created. +// deprecatedStartInMemoryTenant starts an in memory tenant that has already been created. // This function also removes tenant rate limiters and sets a few cluster // settings on the tenant. As a convenience, it also returns a connection to // the tenant (on a random node in the cluster). -func startInMemoryTenant( +// Deprecated: use Cluster.StartServiceForVirtualCluster instead. +func deprecatedStartInMemoryTenant( ctx context.Context, t test.Test, c cluster.Cluster,