Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: server: fix crdb_internal.cluster_inflight_traces in shared process mode #106457

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// The collector requires nodeliveness to get a list of all the nodes in the
// cluster.
var getNodes func(ctx context.Context) ([]roachpb.NodeID, error)
if isMixedSQLAndKVNode {
if isMixedSQLAndKVNode && hasNodeLiveness {
// TODO(dt): any reason not to just always use the instance reader? And just
// pass it directly instead of making a new closure here?
getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/collector/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
Expand Down
207 changes: 130 additions & 77 deletions pkg/util/tracing/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
Expand Down Expand Up @@ -210,101 +210,154 @@ func TestTracingCollectorGetSpanRecordings(t *testing.T) {
// mixed nodes.
func TestClusterInflightTraces(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ccl.TestingEnableEnterprise() // We'll create tenants.
defer ccl.TestingDisableEnterprise()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// We'll create our own tenants, to ensure they exist as opposed to them
// being created randomly.
// The test itself creates tenants however necessary.
DisableDefaultTestTenant: true,
},
}

testutils.RunTrueAndFalse(t, "tenant", func(t *testing.T, tenant bool) {
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)

type testCase struct {
servers []serverutils.TestTenantInterface
// otherServers, if set, represents the servers corresponding to other
// tenants (or to the system tenant) than the ones being tested.
otherServers []serverutils.TestTenantInterface
getDB := func(sqlAddr, prefix string) (_ *gosql.DB, cleanup func()) {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, sqlAddr, prefix, url.User(username.RootUser))
db, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
return db, func() {
require.NoError(t, db.Close())
cleanupPGUrl()
}
var testCases []testCase
if tenant {
tenantID := roachpb.MustMakeTenantID(10)
tenants := make([]serverutils.TestTenantInterface, len(tc.Servers))
for i := range tc.Servers {
tenant, err := tc.Servers[i].StartTenant(ctx, base.TestTenantArgs{TenantID: tenantID})
require.NoError(t, err)
tenants[i] = tenant
}

for _, config := range []string{
"single-tenant",
"shared-process",
"separate-process",
} {
t.Run(config, func(t *testing.T) {
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)

systemServers := []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]}
systemDBs := make([]*gosql.DB, len(tc.Servers))
for i, s := range tc.Servers {
db, cleanup := getDB(s.SQLAddr(), "System" /* prefix */)
defer cleanup()
systemDBs[i] = db
}
testCases = []testCase{
{
servers: tenants,
otherServers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
},
{
servers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
otherServers: tenants,
},

type testCase struct {
servers []serverutils.TestTenantInterface
dbs []*gosql.DB
// otherServers, if set, represents the servers corresponding to
// other tenants (or to the system tenant) than the ones being
// tested.
otherServers []serverutils.TestTenantInterface
}
} else {
testCases = []testCase{{
servers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
}}
}
var testCases []testCase
switch config {
case "single-tenant":
testCases = []testCase{{
servers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
dbs: systemDBs,
}}

case "shared-process":
tenants := make([]serverutils.TestTenantInterface, len(tc.Servers))
dbs := make([]*gosql.DB, len(tc.Servers))
for i, s := range tc.Servers {
tenant, db, err := s.StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{TenantName: "app"})
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
tenants[i] = tenant
dbs[i] = db
}
testCases = []testCase{
{
servers: tenants,
dbs: dbs,
otherServers: systemServers,
},
{
servers: systemServers,
dbs: systemDBs,
otherServers: tenants,
},
}

for _, tc := range testCases {
// Setup the traces we're going to look for.
localTraceID, _, cleanup := setupTraces(tc.servers[0].Tracer(), tc.servers[1].Tracer())
defer cleanup()

// Create some other spans on tc.otherServers, that we don't expect to
// find.
const otherServerSpanName = "other-server-span"
for _, s := range tc.otherServers {
sp := s.Tracer().StartSpan(otherServerSpanName)
defer sp.Finish()
case "separate-process":
tenantID := roachpb.MustMakeTenantID(10)
tenants := make([]serverutils.TestTenantInterface, len(tc.Servers))
dbs := make([]*gosql.DB, len(tc.Servers))
for i := range tc.Servers {
tenant, err := tc.Servers[i].StartTenant(ctx, base.TestTenantArgs{TenantID: tenantID})
require.NoError(t, err)
tenants[i] = tenant
db, cleanup := getDB(tenant.SQLAddr(), "Tenant" /* prefix */)
defer cleanup()
dbs[i] = db
}
testCases = []testCase{
{
servers: tenants,
dbs: dbs,
otherServers: systemServers,
},
{
servers: systemServers,
dbs: systemDBs,
otherServers: tenants,
},
}
}

// We're going to query the cluster_inflight_traces through every node.
for _, s := range tc.servers {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, s.SQLAddr(), "Tenant", url.User(username.RootUser))
defer cleanupPGUrl()
db, err := gosql.Open("postgres", pgURL.String())
defer func() {
require.NoError(t, db.Close())
}()
require.NoError(t, err)

rows, err := db.Query(
"SELECT node_id, trace_str FROM crdb_internal.cluster_inflight_traces "+
"WHERE trace_id=$1 ORDER BY node_id",
localTraceID)
require.NoError(t, err)

expSpans := map[int][]string{
1: {"root", "root.child", "root.child.remotechilddone"},
2: {"root.child.remotechild"},
for _, tc := range testCases {
// Set up the traces we're going to look for.
localTraceID, _, cleanup := setupTraces(tc.servers[0].Tracer(), tc.servers[1].Tracer())
defer cleanup()

// Create some other spans on tc.otherServers, that we don't
// expect to find.
const otherServerSpanName = "other-server-span"
for _, s := range tc.otherServers {
sp := s.Tracer().StartSpan(otherServerSpanName)
defer sp.Finish()
}
for rows.Next() {
var nodeID int
var trace string
require.NoError(t, rows.Scan(&nodeID, &trace))
exp, ok := expSpans[nodeID]
require.True(t, ok)
delete(expSpans, nodeID) // Consume this entry; we'll check that they were all consumed.
for _, span := range exp {
require.Contains(t, trace, "=== operation:"+span)

// We're going to query the cluster_inflight_traces through
// every SQL instance.
for _, db := range tc.dbs {
rows, err := db.Query(
"SELECT node_id, trace_str FROM crdb_internal.cluster_inflight_traces "+
"WHERE trace_id=$1 ORDER BY node_id",
localTraceID)
require.NoError(t, err)

expSpans := map[int][]string{
1: {"root", "root.child", "root.child.remotechilddone"},
2: {"root.child.remotechild"},
}
require.NotContains(t, trace, "=== operation:"+otherServerSpanName)
for rows.Next() {
var nodeID int
var trace string
require.NoError(t, rows.Scan(&nodeID, &trace))
exp, ok := expSpans[nodeID]
require.True(t, ok)
delete(expSpans, nodeID) // Consume this entry; we'll check that they were all consumed.
for _, span := range exp {
require.Contains(t, trace, "=== operation:"+span)
}
require.NotContains(t, trace, "=== operation:"+otherServerSpanName)
}
require.Len(t, expSpans, 0)
}
require.Len(t, expSpans, 0)
}
}
})
})
}
}