Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120480: roachtest: deprecate multitenant_utils r=DarrylWong a=herkolategan

Previously, `multitenant_utils.go` provided convenience functions to start virtual clusters. These utils served as a good interim, but lacked proper integration with roachprod and the cluster interfaces. After the introduction of virtual cluster APIs in roachprod and roachtest those interfaces should rather be used from now on.

The functions exposed in `multitenant_utils.go` have been prefixed with deprecated, and given added docs to discourage any further use and point future implementations to the new API.

See: cockroachdb#115867

Epic: None
Release Note: None

120631: pcr: quantize PCR frontier timestamps to 5s r=dt a=dt

Fewer distinct timestamps makes it easier to merge adjacent spans.

Release note: none.
Epic: none.

120632: pcr: use the bulk oracle r=dt a=dt

Release note: none.
Epic: none.

Co-authored-by: Herko Lategan <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
3 people committed Mar 18, 2024
4 parents 93c9087 + e45e337 + 230cef6 + 58be95d commit 1c258e7
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 44 deletions.
17 changes: 17 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ var cutoverSignalPollInterval = settings.RegisterDurationSetting(
settings.WithName("physical_replication.consumer.cutover_signal_poll_interval"),
)

var quantize = settings.RegisterDurationSettingWithExplicitUnit(
settings.SystemOnly,
"physical_replication.consumer.timestamp_granularity",
"the granularity at which replicated times are quantized to make tracking more efficient",
5*time.Second,
)

var streamIngestionResultTypes = []*types.T{
types.Bytes, // jobspb.ResolvedSpans
}
Expand Down Expand Up @@ -874,7 +881,17 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro

lowestTimestamp := hlc.MaxTimestamp
highestTimestamp := hlc.MinTimestamp
d := quantize.Get(&sip.EvalCtx.Settings.SV)
for _, resolvedSpan := range resolvedSpans {
// If quantizing is enabled, round the timestamp down to an even multiple of
// the quantization amount, to maximize the number of spans that share the
// same resolved timestamp -- even if they were individually resolved to
// _slightly_ different/newer timestamps -- to allow them to merge into
// fewer and larger spans in the frontier.
if d > 0 {
resolvedSpan.Timestamp.Logical = 0
resolvedSpan.Timestamp.WallTime -= resolvedSpan.Timestamp.WallTime % int64(d)
}
if resolvedSpan.Timestamp.Less(lowestTimestamp) {
lowestTimestamp = resolvedSpan.Timestamp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func TestStreamIngestionProcessor(t *testing.T) {
},
})
defer tc.Stopper().Stop(ctx)
st := cluster.MakeTestingClusterSettings()
quantize.Override(ctx, &st.SV, 0)

db := tc.Server(0).InternalDB().(descs.DB)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)

Expand Down Expand Up @@ -290,7 +293,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}
out, err := runStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey,
mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

emittedRows := readRows(out)
Expand Down Expand Up @@ -324,12 +327,13 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

g := ctxgroup.WithContext(ctx)
sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db,
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient,
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)

require.NoError(t, err)
minimumFlushInterval.Override(ctx, &st.SV, 5*time.Millisecond)
quantize.Override(ctx, &st.SV, 0)
out := &execinfra.RowChannel{}
out.InitWithNumSenders(sip.OutputTypes(), 1)
out.Start(ctx)
Expand Down Expand Up @@ -367,12 +371,13 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

g := ctxgroup.WithContext(ctx)
sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db,
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient,
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

minimumFlushInterval.Override(ctx, &st.SV, 50*time.Minute)
quantize.Override(ctx, &st.SV, 0)
maxKVBufferSize.Override(ctx, &st.SV, 1)
out := &execinfra.RowChannel{}
out.InitWithNumSenders(sip.OutputTypes(), 1)
Expand Down Expand Up @@ -416,13 +421,15 @@ func TestStreamIngestionProcessor(t *testing.T) {
}

g := ctxgroup.WithContext(ctx)
sip, st, err := getStreamIngestionProcessor(ctx, t, registry, db,
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient,
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

minimumFlushInterval.Override(ctx, &st.SV, 50*time.Minute)
maxRangeKeyBufferSize.Override(ctx, &st.SV, 1)
quantize.Override(ctx, &st.SV, 0)

out := &execinfra.RowChannel{}
out.InitWithNumSenders(sip.OutputTypes(), 1)
out.Start(ctx)
Expand Down Expand Up @@ -472,7 +479,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}}
out, err := runStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, checkpoint, tenantRekey, mockClient,
nil /* cutoverProvider */, streamingTestingKnobs)
nil /* cutoverProvider */, streamingTestingKnobs, st)
require.NoError(t, err)

emittedRows := readRows(out)
Expand All @@ -499,7 +506,7 @@ func TestStreamIngestionProcessor(t *testing.T) {
}
out, err := runStreamIngestionProcessor(ctx, t, registry, db,
topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, &errorStreamClient{},
nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
nil /* cutoverProvider */, nil /* streamingTestingKnobs */, st)
require.NoError(t, err)

// Expect no rows, and just the error.
Expand Down Expand Up @@ -680,6 +687,7 @@ func TestRandomClientGeneration(t *testing.T) {
})
defer srv.Stopper().Stop(ctx)

quantize.Override(ctx, &srv.SystemLayer().ClusterSettings().SV, 0)
ts := srv.SystemLayer()

registry := ts.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -725,9 +733,11 @@ func TestRandomClientGeneration(t *testing.T) {
randomStreamClient.RegisterInterception(cancelAfterCheckpoints)
randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator))

st := cluster.MakeTestingClusterSettings()
quantize.Override(ctx, &st.SV, 0)
out, err := runStreamIngestionProcessor(ctx, t, registry, ts.InternalDB().(descs.DB),
topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey,
randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/)
randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/, st)
require.NoError(t, err)

numResolvedEvents := 0
Expand Down Expand Up @@ -811,9 +821,10 @@ func runStreamIngestionProcessor(
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
streamingTestingKnobs *sql.StreamingTestingKnobs,
st *cluster.Settings,
) (*distsqlutils.RowBuffer, error) {
sip, _, err := getStreamIngestionProcessor(ctx, t, registry, db,
partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs)
sip, err := getStreamIngestionProcessor(ctx, t, registry, db,
partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs, st)
require.NoError(t, err)

out := &distsqlutils.RowBuffer{}
Expand Down Expand Up @@ -841,11 +852,11 @@ func getStreamIngestionProcessor(
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
streamingTestingKnobs *sql.StreamingTestingKnobs,
) (*streamIngestionProcessor, *cluster.Settings, error) {
st := cluster.MakeTestingClusterSettings()
st *cluster.Settings,
) (*streamIngestionProcessor, error) {
evalCtx := eval.MakeTestingEvalContext(st)
if mockClient == nil {
return nil, nil, errors.AssertionFailedf("non-nil streamclient required")
return nil, errors.AssertionFailedf("non-nil streamclient required")
}

testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
Expand Down Expand Up @@ -893,7 +904,7 @@ func getStreamIngestionProcessor(
sip.cutoverProvider = cutoverProvider
}

return sip, st, err
return sip, err
}

func resolvedSpansMinTS(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/kvccl/kvfollowerreadsccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/ccl/utilccl",
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -294,8 +295,11 @@ func buildReplicationStreamSpec(

// Partition the spans with SQLPlanner
dsp := jobExecCtx.DistSQLPlanner()
planCtx := dsp.NewPlanningCtx(
ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution,
noLoc := roachpb.Locality{}
oracle := kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc)

planCtx := dsp.NewPlanningCtxWithOracle(
ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, oracle, noLoc,
)

spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans)
Expand Down
8 changes: 4 additions & 4 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,14 @@ func (rd *replicationDriver) setupC2C(

overrideSrcAndDestTenantTTL(t, srcSQL, destSQL, rd.rs.overrideTenantTTL)

createTenantAdminRole(t, "src-system", srcSQL)
createTenantAdminRole(t, "dst-system", destSQL)
deprecatedCreateTenantAdminRole(t, "src-system", srcSQL)
deprecatedCreateTenantAdminRole(t, "dst-system", destSQL)

srcTenantID, destTenantID := 2, 2
srcTenantName := "src-tenant"
destTenantName := "destination-tenant"

createInMemoryTenant(ctx, t, c, srcTenantName, srcCluster, true)
deprecatedCreateInMemoryTenant(ctx, t, c, srcTenantName, srcCluster, true)

pgURL, err := copyPGCertsAndMakeURL(ctx, t, c, srcNode, srcClusterSetting.PGUrlCertsDir, addr[0])
require.NoError(t, err)
Expand Down Expand Up @@ -983,7 +983,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.t.L().Printf("starting the destination tenant")
conn := startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes)
conn := deprecatedStartInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes)
conn.Close()

rd.metrics.export(rd.t, len(rd.setup.src.nodes))
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runAcceptanceMultitenantMultiRegion(ctx context.Context, t test.Test, c clu
for i, node := range c.All() {
region := regions[i]
regionInfo := fmt.Sprintf("cloud=%s,region=%s,zone=%s", c.Cloud(), regionOnly(region), region)
tenant := createTenantNode(ctx, t, c, c.All(), tenantID, node, tenantHTTPPort, tenantSQLPort, createTenantRegion(regionInfo))
tenant := deprecatedCreateTenantNode(ctx, t, c, c.All(), tenantID, node, tenantHTTPPort, tenantSQLPort, createTenantRegion(regionInfo))
tenant.start(ctx, t, c, "./cockroach")
tenants = append(tenants, tenant)

Expand Down
12 changes: 6 additions & 6 deletions pkg/cmd/roachtest/tests/multitenant_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ func runMultiTenantUpgrade(
// Create two instances of tenant 11 so that we can test with two pods
// running during migration.
const tenantNode = 2
tenant11a := createTenantNode(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11aHTTPPort, tenant11aSQLPort)
tenant11a := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11aHTTPPort, tenant11aSQLPort)
tenant11a.start(ctx, t, c, predecessorBinary)
defer tenant11a.stop(ctx, t, c)

// Since the certs are created with the createTenantNode call above, we
// Since the certs are created with the deprecatedCreateTenantNode call above, we
// call the "no certs" version of create tenant here.
tenant11b := createTenantNodeNoCerts(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11bHTTPPort, tenant11bSQLPort)
tenant11b := deprecatedCreateTenantNodeNoCerts(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11bHTTPPort, tenant11bSQLPort)
tenant11b.start(ctx, t, c, predecessorBinary)
defer tenant11b.stop(ctx, t, c)

Expand Down Expand Up @@ -174,7 +174,7 @@ func runMultiTenantUpgrade(
withResults([][]string{{"1", "bar"}}))

t.Status("starting tenant 12 server with older binary")
tenant12 := createTenantNode(ctx, t, c, kvNodes, tenant12ID, tenantNode, tenant12HTTPPort, tenant12SQLPort)
tenant12 := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant12ID, tenantNode, tenant12HTTPPort, tenant12SQLPort)
tenant12.start(ctx, t, c, predecessorBinary)
defer tenant12.stop(ctx, t, c)

Expand All @@ -196,7 +196,7 @@ func runMultiTenantUpgrade(
runner.Exec(t, `SELECT crdb_internal.create_tenant($1::INT)`, tenant13ID)

t.Status("starting tenant 13 server with new binary")
tenant13 := createTenantNode(ctx, t, c, kvNodes, tenant13ID, tenantNode, tenant13HTTPPort, tenant13SQLPort)
tenant13 := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant13ID, tenantNode, tenant13HTTPPort, tenant13SQLPort)
tenant13.start(ctx, t, c, currentBinary)
defer tenant13.stop(ctx, t, c)

Expand Down Expand Up @@ -356,7 +356,7 @@ func runMultiTenantUpgrade(
runner.Exec(t, `SELECT crdb_internal.create_tenant($1::INT)`, tenant14ID)

t.Status("verifying that the tenant 14 server works and has the proper version")
tenant14 := createTenantNode(ctx, t, c, kvNodes, tenant14ID, tenantNode, tenant14HTTPPort, tenant14SQLPort)
tenant14 := deprecatedCreateTenantNode(ctx, t, c, kvNodes, tenant14ID, tenantNode, tenant14HTTPPort, tenant14SQLPort)
tenant14.start(ctx, t, c, currentBinary)
defer tenant14.stop(ctx, t, c)
verifySQL(t, tenant14.pgURL,
Expand Down
39 changes: 24 additions & 15 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func createTenantNodeInternal(
return tn
}

func createTenantNode(
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedCreateTenantNode(
ctx context.Context,
t test.Test,
c cluster.Cluster,
Expand All @@ -118,7 +119,8 @@ func createTenantNode(
return createTenantNodeInternal(ctx, t, c, kvnodes, tenantID, node, httpPort, sqlPort, true /* certs */, opts...)
}

func createTenantNodeNoCerts(
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedCreateTenantNodeNoCerts(
ctx context.Context,
t test.Test,
c cluster.Cluster,
Expand Down Expand Up @@ -189,7 +191,7 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster,
randomSeed := rand.Int63()
c.SetRandomSeed(randomSeed)
require.NoError(t, err)
tn.errCh = startTenantServer(
tn.errCh = deprecatedStartTenantServer(
ctx, c, c.Node(tn.node), internalIPs[0], binary, tn.kvAddrs, tn.tenantID,
tn.httpPort, tn.sqlPort, tn.envVars, randomSeed,
extraArgs...,
Expand Down Expand Up @@ -251,7 +253,8 @@ func (tn *tenantNode) start(ctx context.Context, t test.Test, c cluster.Cluster,
t.L().Printf("sql server for tenant %d (instance %d) now running", tn.tenantID, tn.instanceID)
}

func startTenantServer(
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedStartTenantServer(
tenantCtx context.Context,
c cluster.Cluster,
node option.NodeListOption,
Expand Down Expand Up @@ -289,8 +292,11 @@ func startTenantServer(
return errCh
}

// createTenantAdminRole creates a role that can be used to log into a secure cluster's db console.
func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.SQLRunner) {
// deprecatedCreateTenantAdminRole creates a role that can be used to log into a secure cluster's db console.
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedCreateTenantAdminRole(
t test.Test, tenantName string, tenantSQL *sqlutils.SQLRunner,
) {
tenantSQL.Exec(t, fmt.Sprintf(`CREATE ROLE IF NOT EXISTS %s WITH LOGIN PASSWORD '%s'`, install.DefaultUser, install.DefaultPassword))
tenantSQL.Exec(t, fmt.Sprintf(`GRANT ADMIN TO %s`, install.DefaultUser))
t.L().Printf(`Log into %s db console with username "%s" and password "%s"`,
Expand All @@ -299,25 +305,27 @@ func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.S

const appTenantName = "app"

// createInMemoryTenant runs through the necessary steps to create an in-memory
// deprecatedCreateInMemoryTenant runs through the necessary steps to create an in-memory
// tenant without resource limits and full dbconsole viewing privileges.
func createInMemoryTenant(
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedCreateInMemoryTenant(
ctx context.Context,
t test.Test,
c cluster.Cluster,
tenantName string,
nodes option.NodeListOption,
secure bool,
) {
db := createInMemoryTenantWithConn(ctx, t, c, tenantName, nodes, secure)
db := deprecatedCreateInMemoryTenantWithConn(ctx, t, c, tenantName, nodes, secure)
db.Close()
}

// createInMemoryTenantWithConn runs through the necessary steps to create an
// deprecatedCreateInMemoryTenantWithConn runs through the necessary steps to create an
// in-memory tenant without resource limits and full dbconsole viewing
// privileges. As a convenience, it also returns a connection to the tenant (on
// a random node in the cluster).
func createInMemoryTenantWithConn(
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedCreateInMemoryTenantWithConn(
ctx context.Context,
t test.Test,
c cluster.Cluster,
Expand All @@ -330,19 +338,20 @@ func createInMemoryTenantWithConn(
sysSQL := sqlutils.MakeSQLRunner(sysDB)
sysSQL.Exec(t, "CREATE TENANT $1", tenantName)

tenantConn := startInMemoryTenant(ctx, t, c, tenantName, nodes)
tenantConn := deprecatedStartInMemoryTenant(ctx, t, c, tenantName, nodes)
tenantSQL := sqlutils.MakeSQLRunner(tenantConn)
if secure {
createTenantAdminRole(t, tenantName, tenantSQL)
deprecatedCreateTenantAdminRole(t, tenantName, tenantSQL)
}
return tenantConn
}

// startInMemoryTenant starts an in memory tenant that has already been created.
// deprecatedStartInMemoryTenant starts an in memory tenant that has already been created.
// This function also removes tenant rate limiters and sets a few cluster
// settings on the tenant. As a convenience, it also returns a connection to
// the tenant (on a random node in the cluster).
func startInMemoryTenant(
// Deprecated: use Cluster.StartServiceForVirtualCluster instead.
func deprecatedStartInMemoryTenant(
ctx context.Context,
t test.Test,
c cluster.Cluster,
Expand Down

0 comments on commit 1c258e7

Please sign in to comment.