Skip to content

Commit

Permalink
server: fix crdb_internal.cluster_inflight_traces in shared process mode
Browse files Browse the repository at this point in the history
This commit fixes a panic that would previously occur when querying
`crdb_internal.cluster_inflight_traces` virtual table when running in
shared-process multi-tenant mode. In particular, the problem was that we
tried to access node liveness which isn't available, and now we will
fall back to the multi-tenant way of doing things (using the instances
reader). Additionally, this commit extends the existing test to also run
in shared-process multi-tenant config which serves as a regression test
for this bug. There is no release note since it's not a user-visible
bug.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2023
1 parent f788417 commit f65fd0b
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 79 deletions.
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// The collector requires nodeliveness to get a list of all the nodes in the
// cluster.
var getNodes func(ctx context.Context) ([]roachpb.NodeID, error)
if isMixedSQLAndKVNode {
if isMixedSQLAndKVNode && hasNodeLiveness {
// TODO(dt): any reason not to just always use the instance reader? And just
// pass it directly instead of making a new closure here?
getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/collector/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
Expand Down
207 changes: 130 additions & 77 deletions pkg/util/tracing/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
Expand Down Expand Up @@ -210,101 +210,154 @@ func TestTracingCollectorGetSpanRecordings(t *testing.T) {
// mixed nodes.
func TestClusterInflightTraces(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ccl.TestingEnableEnterprise() // We'll create tenants.
defer ccl.TestingDisableEnterprise()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// We'll create our own tenants, to ensure they exist as opposed to them
// being created randomly.
// The test itself creates tenants however necessary.
DefaultTestTenant: base.TestTenantDisabled,
},
}

testutils.RunTrueAndFalse(t, "tenant", func(t *testing.T, tenant bool) {
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)

type testCase struct {
servers []serverutils.TestTenantInterface
// otherServers, if set, represents the servers corresponding to other
// tenants (or to the system tenant) than the ones being tested.
otherServers []serverutils.TestTenantInterface
getDB := func(sqlAddr, prefix string) (_ *gosql.DB, cleanup func()) {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, sqlAddr, prefix, url.User(username.RootUser))
db, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
return db, func() {
require.NoError(t, db.Close())
cleanupPGUrl()
}
var testCases []testCase
if tenant {
tenantID := roachpb.MustMakeTenantID(10)
tenants := make([]serverutils.TestTenantInterface, len(tc.Servers))
for i := range tc.Servers {
tenant, err := tc.Servers[i].StartTenant(ctx, base.TestTenantArgs{TenantID: tenantID})
require.NoError(t, err)
tenants[i] = tenant
}

for _, config := range []string{
"single-tenant",
"shared-process",
"separate-process",
} {
t.Run(config, func(t *testing.T) {
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)

systemServers := []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]}
systemDBs := make([]*gosql.DB, len(tc.Servers))
for i, s := range tc.Servers {
db, cleanup := getDB(s.SQLAddr(), "System" /* prefix */)
defer cleanup()
systemDBs[i] = db
}
testCases = []testCase{
{
servers: tenants,
otherServers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
},
{
servers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
otherServers: tenants,
},

type testCase struct {
servers []serverutils.TestTenantInterface
dbs []*gosql.DB
// otherServers, if set, represents the servers corresponding to
// other tenants (or to the system tenant) than the ones being
// tested.
otherServers []serverutils.TestTenantInterface
}
} else {
testCases = []testCase{{
servers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
}}
}
var testCases []testCase
switch config {
case "single-tenant":
testCases = []testCase{{
servers: []serverutils.TestTenantInterface{tc.Servers[0], tc.Servers[1]},
dbs: systemDBs,
}}

case "shared-process":
tenants := make([]serverutils.TestTenantInterface, len(tc.Servers))
dbs := make([]*gosql.DB, len(tc.Servers))
for i, s := range tc.Servers {
tenant, db, err := s.StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{TenantName: "app"})
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
tenants[i] = tenant
dbs[i] = db
}
testCases = []testCase{
{
servers: tenants,
dbs: dbs,
otherServers: systemServers,
},
{
servers: systemServers,
dbs: systemDBs,
otherServers: tenants,
},
}

for _, tc := range testCases {
// Setup the traces we're going to look for.
localTraceID, _, cleanup := setupTraces(tc.servers[0].Tracer(), tc.servers[1].Tracer())
defer cleanup()

// Create some other spans on tc.otherServers, that we don't expect to
// find.
const otherServerSpanName = "other-server-span"
for _, s := range tc.otherServers {
sp := s.Tracer().StartSpan(otherServerSpanName)
defer sp.Finish()
case "separate-process":
tenantID := roachpb.MustMakeTenantID(10)
tenants := make([]serverutils.TestTenantInterface, len(tc.Servers))
dbs := make([]*gosql.DB, len(tc.Servers))
for i := range tc.Servers {
tenant, err := tc.Servers[i].StartTenant(ctx, base.TestTenantArgs{TenantID: tenantID})
require.NoError(t, err)
tenants[i] = tenant
db, cleanup := getDB(tenant.SQLAddr(), "Tenant" /* prefix */)
defer cleanup()
dbs[i] = db
}
testCases = []testCase{
{
servers: tenants,
dbs: dbs,
otherServers: systemServers,
},
{
servers: systemServers,
dbs: systemDBs,
otherServers: tenants,
},
}
}

// We're going to query the cluster_inflight_traces through every node.
for _, s := range tc.servers {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, s.SQLAddr(), "Tenant", url.User(username.RootUser))
defer cleanupPGUrl()
db, err := gosql.Open("postgres", pgURL.String())
defer func() {
require.NoError(t, db.Close())
}()
require.NoError(t, err)

rows, err := db.Query(
"SELECT node_id, trace_str FROM crdb_internal.cluster_inflight_traces "+
"WHERE trace_id=$1 ORDER BY node_id",
localTraceID)
require.NoError(t, err)

expSpans := map[int][]string{
1: {"root", "root.child", "root.child.remotechilddone"},
2: {"root.child.remotechild"},
for _, tc := range testCases {
// Set up the traces we're going to look for.
localTraceID, _, cleanup := setupTraces(tc.servers[0].Tracer(), tc.servers[1].Tracer())
defer cleanup()

// Create some other spans on tc.otherServers, that we don't
// expect to find.
const otherServerSpanName = "other-server-span"
for _, s := range tc.otherServers {
sp := s.Tracer().StartSpan(otherServerSpanName)
defer sp.Finish()
}
for rows.Next() {
var nodeID int
var trace string
require.NoError(t, rows.Scan(&nodeID, &trace))
exp, ok := expSpans[nodeID]
require.True(t, ok)
delete(expSpans, nodeID) // Consume this entry; we'll check that they were all consumed.
for _, span := range exp {
require.Contains(t, trace, "=== operation:"+span)

// We're going to query the cluster_inflight_traces through
// every SQL instance.
for _, db := range tc.dbs {
rows, err := db.Query(
"SELECT node_id, trace_str FROM crdb_internal.cluster_inflight_traces "+
"WHERE trace_id=$1 ORDER BY node_id",
localTraceID)
require.NoError(t, err)

expSpans := map[int][]string{
1: {"root", "root.child", "root.child.remotechilddone"},
2: {"root.child.remotechild"},
}
require.NotContains(t, trace, "=== operation:"+otherServerSpanName)
for rows.Next() {
var nodeID int
var trace string
require.NoError(t, rows.Scan(&nodeID, &trace))
exp, ok := expSpans[nodeID]
require.True(t, ok)
delete(expSpans, nodeID) // Consume this entry; we'll check that they were all consumed.
for _, span := range exp {
require.Contains(t, trace, "=== operation:"+span)
}
require.NotContains(t, trace, "=== operation:"+otherServerSpanName)
}
require.Len(t, expSpans, 0)
}
require.Len(t, expSpans, 0)
}
}
})
})
}
}

0 comments on commit f65fd0b

Please sign in to comment.