Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109781: spanconfig: pass SpanConfig by pointer r=arulajmani a=andrewbaptist

Note: Only the last 2 commits are only in this PR.

SpanConfig can be a large object and it is more efficient to pass by
pointer now that it goes through multiple layers of code.

Epic: none

Release note: None


111090: kvcoord: fix flake in `TestTransactionUnexpectedlyCommitted` r=AlexTalks a=AlexTalks

The `TestTransactionUnexpectedlyCommitted/recovery_after_transfer_lease` test, introduced to test #107658, has been flaky (particularly under deadlock builds) due to a race condition between a retry of a write and intent resolution. While both orderings in this test result in a correct `AmbiguousResultError` for the client, when intent resolution wins the race, the retried write will attempt to push away the current lockholder; since it is illegal for a committed transaction to perform a
push, this results in a different secondary error attached to the `AmbiguousResultError`. This change ensures a predefined ordering of these operations so that the secondary error is consistent across runs of the test.

Fixes: #110187

Release note: None

111369: clusterversion: fix formatting of versions comment r=RaduBerinde a=RaduBerinde

Some of the formatting in this big comment is getting screwed up by
gofmt. This change fixes it - the trick was to make sure the "sub
lists" are all indented by a tab so it's all treated like a big code
block.

Epic: none
Release note: None

111372: roachtest: fix leaked goroutines in c2c roachtests r=lidorcarmel a=lidorcarmel

Without this PR, c2c roachtests have almost 30 messages like these at the end:
```
18:04:19 leaktest.go:161: Leaked goroutine: goroutine 1879 [select, 2 minutes]:
database/sql.(*DB).connectionOpener(0xc003f2bc70, {0x10812e000, 0xc00059d220})
	GOROOT/src/database/sql/sql.go:1218 +0x8d
created by database/sql.OpenDB
	GOROOT/src/database/sql/sql.go:791 +0x18d
```

This PR cleans that up.

Epic: none

Release note: None

Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Lidor Carmel <[email protected]>
  • Loading branch information
5 people committed Sep 27, 2023
5 parents 81a33ed + 3cbfa22 + fdb6e86 + b670d20 + 5615637 commit f085187
Show file tree
Hide file tree
Showing 34 changed files with 249 additions and 214 deletions.
106 changes: 52 additions & 54 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,70 +26,68 @@ type Key int
//
// You'll want to add a new one in the following cases:
//
// (a) When introducing a backwards incompatible feature. Broadly, by this we
// (a) When introducing a backwards incompatible feature. Broadly, by this we
// mean code that's structured as follows:
//
// mean code that's structured as follows:
// if (specific-version is active) {
// // Implies that all nodes in the cluster are running binaries that
// // have this code. We can "enable" the new feature knowing that
// // outbound RPCs, requests, etc. will be handled by nodes that know
// // how to do so.
// } else {
// // There may be some nodes running older binaries without this code.
// // To be safe, we'll want to behave as we did before introducing
// // this feature.
// }
//
// if (specific-version is active) {
// // Implies that all nodes in the cluster are running binaries that
// // have this code. We can "enable" the new feature knowing that
// // outbound RPCs, requests, etc. will be handled by nodes that know
// // how to do so.
// } else {
// // There may be some nodes running older binaries without this code.
// // To be safe, we'll want to behave as we did before introducing
// // this feature.
// }
// Authors of migrations need to be careful in ensuring that end-users
// aren't able to enable feature gates before they're active. This is fine:
//
// Authors of migrations need to be careful in ensuring that end-users
// aren't able to enable feature gates before they're active. This is fine:
// func handleSomeNewStatement() error {
// if !(specific-version is active) {
// return errors.New("cluster version needs to be bumped")
// }
// // ...
// }
//
// func handleSomeNewStatement() error {
// if !(specific-version is active) {
// return errors.New("cluster version needs to be bumped")
// }
// // ...
// }
// At the same time, with requests/RPCs originating at other crdb nodes, the
// initiator of the request gets to decide what's supported. A node should
// not refuse functionality on the grounds that its view of the version gate
// is as yet inactive. Consider the sender:
//
// At the same time, with requests/RPCs originating at other crdb nodes, the
// initiator of the request gets to decide what's supported. A node should
// not refuse functionality on the grounds that its view of the version gate
// is as yet inactive. Consider the sender:
//
// func invokeSomeRPC(req) {
// if (specific-version is active) {
// // Like mentioned above, this implies that all nodes in the
// // cluster are running binaries that can handle this new
// // feature. We may have learned about this fact before the
// // node on the other end. This is due to the fact that migration
// // manager informs each node about the specific-version being
// // activated active concurrently. See BumpClusterVersion for
// // where that happens. Still, it's safe for us to enable the new
// // feature flags as we trust the recipient to know how to deal
// // with it.
// req.NewFeatureFlag = true
// }
// send(req)
// }
// func invokeSomeRPC(req) {
// if (specific-version is active) {
// // Like mentioned above, this implies that all nodes in the
// // cluster are running binaries that can handle this new
// // feature. We may have learned about this fact before the
// // node on the other end. This is due to the fact that migration
// // manager informs each node about the specific-version being
// // activated active concurrently. See BumpClusterVersion for
// // where that happens. Still, it's safe for us to enable the new
// // feature flags as we trust the recipient to know how to deal
// // with it.
// req.NewFeatureFlag = true
// }
// send(req)
// }
//
// And consider the recipient:
//
// func someRPC(req) {
// if !req.NewFeatureFlag {
// // Legacy behavior...
// }
// // There's no need to even check if the specific-version is active.
// // If the flag is enabled, the specific-version must have been
// // activated, even if we haven't yet heard about it (we will pretty
// // soon).
// }
//
// See clusterversion.Handle.IsActive and usage of some existing versions
// below for more clues on the matter.
// func someRPC(req) {
// if !req.NewFeatureFlag {
// // Legacy behavior...
// }
// // There's no need to even check if the specific-version is active.
// // If the flag is enabled, the specific-version must have been
// // activated, even if we haven't yet heard about it (we will pretty
// // soon).
// }
//
// (b) When cutting a major release branch. When cutting release-20.2 for
// See clusterversion.Handle.IsActive and usage of some existing versions
// below for more clues on the matter.
//
// example, you'll want to introduce the following to `master`.
// (b) When cutting a major release branch. When cutting release-20.2 for
// example, you'll want to introduce the following to `master`.
//
// (i) V20_2 (keyed to v20.2.0-0})
// (ii) V21_1Start (keyed to v20.2.0-1})
Expand Down
25 changes: 19 additions & 6 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,9 @@ func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) *
}
}

func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluster.Cluster) {
func (rd *replicationDriver) setupC2C(
ctx context.Context, t test.Test, c cluster.Cluster,
) (cleanup func()) {
if len(rd.rs.multiregion.srcLocalities) != 0 {
nodeCount := rd.rs.srcNodes + rd.rs.dstNodes
localityCount := len(rd.rs.multiregion.srcLocalities) + len(rd.rs.multiregion.destLocalities)
Expand Down Expand Up @@ -585,6 +587,10 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg))
rd.t.L().Printf("Prom has started")
}
return func() {
srcDB.Close()
destDB.Close()
}
}

func (rd *replicationDriver) crdbNodes() option.NodeListOption {
Expand Down Expand Up @@ -776,7 +782,9 @@ func (rd *replicationDriver) onFingerprintMismatch(
) {
rd.t.L().Printf("conducting table level fingerprints")
srcTenantConn := rd.c.Conn(ctx, rd.t.L(), 1, option.TenantName(rd.setup.src.name))
defer srcTenantConn.Close()
dstTenantConn := rd.c.Conn(ctx, rd.t.L(), rd.rs.srcNodes+1, option.TenantName(rd.setup.dst.name))
defer dstTenantConn.Close()
fingerprintBisectErr := replicationutils.InvestigateFingerprints(ctx, srcTenantConn, dstTenantConn,
startTime,
endTime)
Expand Down Expand Up @@ -957,7 +965,8 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.t.L().Printf("starting the destination tenant")
startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes)
conn := startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes)
conn.Close()

rd.metrics.export(rd.t, len(rd.setup.src.nodes))

Expand Down Expand Up @@ -1030,7 +1039,8 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster
suites: registry.Suites("nightly"),
}
rd := makeReplicationDriver(t, c, sp)
rd.setupC2C(ctx, t, c)
cleanup := rd.setupC2C(ctx, t, c)
defer cleanup()

// Spin up a monitor to capture any node deaths.
m := rd.newMonitor(ctx)
Expand Down Expand Up @@ -1227,7 +1237,8 @@ func registerClusterToCluster(r registry.Registry) {
c2cRegisterWrapper(r, sp,
func(ctx context.Context, t test.Test, c cluster.Cluster) {
rd := makeReplicationDriver(t, c, sp)
rd.setupC2C(ctx, t, c)
cleanup := rd.setupC2C(ctx, t, c)
defer cleanup()
// Spin up a monitor to capture any node deaths.
m := rd.newMonitor(ctx)
m.Go(func(ctx context.Context) error {
Expand Down Expand Up @@ -1499,7 +1510,8 @@ func registerClusterReplicationResilience(r registry.Registry) {

rrd := makeReplShutdownDriver(t, c, rsp)
rrd.t.L().Printf("Planning to shut down node during %s phase", rrd.phase)
rrd.setupC2C(ctx, t, c)
cleanup := rrd.setupC2C(ctx, t, c)
defer cleanup()

shutdownSetupDone := make(chan struct{})

Expand Down Expand Up @@ -1609,7 +1621,8 @@ func registerClusterReplicationDisconnect(r registry.Registry) {
}
c2cRegisterWrapper(r, sp, func(ctx context.Context, t test.Test, c cluster.Cluster) {
rd := makeReplicationDriver(t, c, sp)
rd.setupC2C(ctx, t, c)
cleanup := rd.setupC2C(ctx, t, c)
defer cleanup()

shutdownSetupDone := make(chan struct{})

Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/tests/multitenant_tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func runMultiTenantTPCH(

// Now we create a tenant and run all TPCH queries within it.
if sharedProcess {
db := createInMemoryTenant(ctx, t, c, appTenantName, c.All(), true /* secure */)
db := createInMemoryTenantWithConn(ctx, t, c, appTenantName, c.All(), true /* secure */)
defer db.Close()
url := fmt.Sprintf("{pgurl:1:%s}", appTenantName)
runTPCH(db, url, 1 /* setupIdx */)
} else {
Expand Down
29 changes: 24 additions & 5 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,34 @@ func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.S
const appTenantName = "app"

// createInMemoryTenant runs through the necessary steps to create an in-memory
// tenant without resource limits and full dbconsole viewing privileges. As a
// convenience, it also returns a connection to the tenant (on a random node in
// the cluster).
// tenant without resource limits and full dbconsole viewing privileges.
func createInMemoryTenant(
ctx context.Context,
t test.Test,
c cluster.Cluster,
tenantName string,
nodes option.NodeListOption,
secure bool,
) {
db := createInMemoryTenantWithConn(ctx, t, c, tenantName, nodes, secure)
db.Close()
}

// createInMemoryTenantWithConn runs through the necessary steps to create an
// in-memory tenant without resource limits and full dbconsole viewing
// privileges. As a convenience, it also returns a connection to the tenant (on
// a random node in the cluster).
func createInMemoryTenantWithConn(
ctx context.Context,
t test.Test,
c cluster.Cluster,
tenantName string,
nodes option.NodeListOption,
secure bool,
) *gosql.DB {
sysSQL := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), nodes.RandNode()[0]))
sysDB := c.Conn(ctx, t.L(), nodes.RandNode()[0])
defer sysDB.Close()
sysSQL := sqlutils.MakeSQLRunner(sysDB)
sysSQL.Exec(t, "CREATE TENANT $1", tenantName)

tenantConn := startInMemoryTenant(ctx, t, c, tenantName, nodes)
Expand All @@ -360,7 +376,9 @@ func startInMemoryTenant(
tenantName string,
nodes option.NodeListOption,
) *gosql.DB {
sysSQL := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), nodes.RandNode()[0]))
sysDB := c.Conn(ctx, t.L(), nodes.RandNode()[0])
defer sysDB.Close()
sysSQL := sqlutils.MakeSQLRunner(sysDB)
sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName)
sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName)
Expand All @@ -384,6 +402,7 @@ func startInMemoryTenant(
return err
}
if err = tenantConn.Ping(); err != nil {
tenantConn.Close()
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ func runTPCCBench(ctx context.Context, t test.Test, c cluster.Cluster, b tpccBen

var db *gosql.DB
if b.SharedProcessMT {
db = createInMemoryTenant(ctx, t, c, appTenantName, roachNodes, false /* secure */)
db = createInMemoryTenantWithConn(ctx, t, c, appTenantName, roachNodes, false /* secure */)
} else {
db = c.Conn(ctx, t.L(), 1)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/tpchvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func runTPCHVec(
if _, err := singleTenantConn.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false;"); err != nil {
t.Fatal(err)
}
conn = createInMemoryTenant(ctx, t, c, appTenantName, c.All(), false /* secure */)
conn = createInMemoryTenantWithConn(ctx, t, c, appTenantName, c.All(), false /* secure */)
} else {
conn = c.Conn(ctx, t.L(), 1)
disableMergeQueue = true
Expand Down
14 changes: 10 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
// several seconds, and requires maintaining expected leases.
skip.UnderShort(t)
skip.UnderStressRace(t)
skip.WithIssue(t, 110187, "flaky test")

succeedsSoonDuration := testutils.DefaultSucceedsSoonDuration
if util.RaceEnabled {
Expand Down Expand Up @@ -1050,7 +1049,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
// 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts
// recovery.
// 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed, but
// pauses before returning so that txn1's intents don't get cleaned up.
// intent resolution on txn1 needs to be paused until after txn1 finishes.
if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending {
close(recoverComplete)
}
Expand All @@ -1077,8 +1076,15 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
// 12. txn1->n1: EndTxn(commit) -- Recovery has already completed, so this
// request fails with "transaction unexpectedly committed".

// <allow (8) recovery to return and txn2 to continue>
if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending {
// <allow intent resolution after (8) recovery so txn2 can continue>
// If the intent on (b) were resolved and txn2 could grab the lock prior
// to txn1's retry of the Put(b), the retry will cause a PushTxn to txn2.
// Given that the recovery at (8) has already completed, a PushTxn
// request where the pusher is a committed transaction results in an
// "already committed" TransactionStatusError from the txnwait queue.
// While this still results in AmbiguousResultError from the DistSender,
// the reason will be distinct; as such we pause the intent resolution.
if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == BeforeSending {
req.pauseUntil(t, txn1Done, cp)
t.Logf("%s - complete, resp={%s}", req.prefix, resp)
}
Expand Down
Loading

0 comments on commit f085187

Please sign in to comment.