Skip to content

Commit

Permalink
tracing,tracingservice: adds a trace service to pull clusterwide trac…
Browse files Browse the repository at this point in the history
…e spans

Previously, every node in the cluster had a local inflight span
registry that was aware of all the spans that were rooted on that
particular node. Child spans of a given traceID executing on a remote
node would only become visible to the local registry once execution
completes, and the span pushes its recordings over gRPC to the
"client" node.

This change introduces a `tracingservice` package.
Package tracingservice contains a gRPC service to be used for
remote inflight span access.

It is used for pulling inflight spans from all CockroachDB nodes.
Each node will run a trace service, which serves the inflight spans from the
local span registry on that node. Each node will also have a trace client
dialer, which uses the nodedialer to connect to another node's trace service,
and access its inflight spans. The trace client dialer is backed by a remote
trace client or a local trace client, which serve as the point of entry to this
service. Both clients support the `TraceClient` interface, which includes the
following functionalities:
  - GetSpanRecordings

The spans for a traceID are sorted by `StartTime` before they are
returned. The per node trace dialer has yet to be hooked up to an
appropriate location depending on where we intend to use it.

Release note: None
  • Loading branch information
adityamaru committed May 26, 2021
1 parent 35789fc commit d67bb84
Show file tree
Hide file tree
Showing 11 changed files with 1,190 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ ALL_TESTS = [
"//pkg/util/timeutil/pgdate:pgdate_test",
"//pkg/util/timeutil:timeutil_test",
"//pkg/util/tracing:tracing_test",
"//pkg/util/tracingservice:tracingservice_test",
"//pkg/util/treeprinter:treeprinter_test",
"//pkg/util/uint128:uint128_test",
"//pkg/util/ulid:ulid_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracingservice",
"//pkg/util/tracingservice/tracingservicepb:tracingservicepb_go_proto",
"//pkg/util/uuid",
"@com_github_cenkalti_backoff//:backoff",
"@com_github_cockroachdb_apd_v2//:apd",
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracingservice"
"github.com/cockroachdb/cockroach/pkg/util/tracingservice/tracingservicepb"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
"google.golang.org/grpc"
Expand All @@ -99,6 +101,7 @@ type SQLServer struct {
internalExecutor *sql.InternalExecutor
leaseMgr *lease.Manager
blobService *blobs.Service
traceService *tracingservice.Service
tenantConnect kvtenant.Connector
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
Expand Down Expand Up @@ -260,6 +263,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
blobspb.RegisterBlobServer(cfg.grpcServer, blobService)

// Create trace service for inter-node sharing of inflight trace spans.
traceService := tracingservice.NewTraceService(cfg.Settings.Tracer)
tracingservicepb.RegisterTraceServer(cfg.grpcServer, traceService)

jobRegistry := cfg.circularJobRegistry
{
regLiveness := cfg.nodeLiveness
Expand Down Expand Up @@ -730,6 +737,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
internalExecutor: cfg.circularInternalExecutor,
leaseMgr: leaseMgr,
blobService: blobService,
traceService: traceService,
tenantConnect: cfg.tenantConnect,
sessionRegistry: cfg.sessionRegistry,
jobRegistry: jobRegistry,
Expand Down
49 changes: 49 additions & 0 deletions pkg/util/tracingservice/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "tracingservice",
srcs = [
"client.go",
"service.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/util/tracingservice",
visibility = ["//visibility:public"],
deps = [
"//pkg/migration/migrationcluster",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracingservice/tracingservicepb:tracingservicepb_go_proto",
],
)

go_test(
name = "tracingservice_test",
srcs = [
"client_test.go",
"service_test.go",
],
embed = [":tracingservice"],
deps = [
"//pkg/migration/nodelivenesstest",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/netutil",
"//pkg/util/stop",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracingservice/tracingservicepb:tracingservicepb_go_proto",
"@com_github_gogo_protobuf//types",
"@com_github_pkg_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
173 changes: 173 additions & 0 deletions pkg/util/tracingservice/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tracingservice

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/migration/migrationcluster"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/tracingservice/tracingservicepb"
)

// TraceClient provides an interface for accessing the recordings of inflight
// spans for a given traceID on a nodes' inflight registry.
type TraceClient interface {
GetSpanRecordings(ctx context.Context, traceID uint64) (*tracingservicepb.SpanRecordingResponse, error)
}

var _ TraceClient = &localTraceClient{}

// localTraceClient executes the local tracing service's code to fetch inflight
// span recordings.
type localTraceClient struct {
tracer *tracing.Tracer
}

// newLocalTraceClient returns a local tracing service client.
func newLocalTraceClient(tracer *tracing.Tracer) TraceClient {
return &localTraceClient{tracer: tracer}
}

// GetSpanRecordings implements the TraceClient interface.
func (c *localTraceClient) GetSpanRecordings(
_ context.Context, traceID uint64,
) (*tracingservicepb.SpanRecordingResponse, error) {
var resp tracingservicepb.SpanRecordingResponse
err := c.tracer.VisitSpans(func(span *tracing.Span) error {
if span.TraceID() != traceID {
return nil
}
for _, rec := range span.GetRecording() {
resp.SpanRecordings = append(resp.SpanRecordings, rec)
}
return nil
})
return &resp, err
}

var _ TraceClient = &remoteTraceClient{}

// remoteClient uses the node dialer and tracing service clients to fetch
// span recordings from other nodes.
type remoteTraceClient struct {
traceClient tracingservicepb.TraceClient
}

// newRemoteClient instantiates a remote tracing service client.
func newRemoteTraceClient(traceClient tracingservicepb.TraceClient) TraceClient {
return &remoteTraceClient{traceClient: traceClient}
}

// GetSpanRecordings implements the TraceClient interface.
func (c *remoteTraceClient) GetSpanRecordings(
ctx context.Context, traceID uint64,
) (*tracingservicepb.SpanRecordingResponse, error) {
return c.traceClient.GetSpanRecordings(ctx, &tracingservicepb.SpanRecordingRequest{TraceID: traceID})
}

// TraceClientDialer can be used to extract recordings from inflight spans for a
// given traceID, from all nodes of the cluster.
type TraceClientDialer struct {
tracer *tracing.Tracer
localNodeID roachpb.NodeID
dialer *nodedialer.Dialer
nodeliveness migrationcluster.NodeLiveness
}

// NewTraceClientDialer returns a TraceClientDialer.
func NewTraceClientDialer(
localNodeID roachpb.NodeID,
dialer *nodedialer.Dialer,
nodeliveness migrationcluster.NodeLiveness,
tracer *tracing.Tracer,
) *TraceClientDialer {
return &TraceClientDialer{
localNodeID: localNodeID,
dialer: dialer,
nodeliveness: nodeliveness,
tracer: tracer,
}
}

// GetSpanRecordingsFromCluster returns the inflight span recordings from all
// nodes in the cluster.
func (t *TraceClientDialer) GetSpanRecordingsFromCluster(
ctx context.Context, traceID uint64,
) ([]tracingpb.RecordedSpan, error) {
var res []tracingpb.RecordedSpan
ns, err := migrationcluster.NodesFromNodeLiveness(ctx, t.nodeliveness)
if err != nil {
return res, err
}

// Collect spans from the local client.
localClient := newLocalTraceClient(t.tracer)
localSpanRecordings, err := localClient.GetSpanRecordings(ctx, traceID)
if err != nil {
return res, err
}
var mu syncutil.Mutex
res = append(res, localSpanRecordings.SpanRecordings...)

// Collect spans from remote clients.
// We'll want to rate limit outgoing RPCs (limit pulled out of thin air).
qp := quotapool.NewIntPool("every-node", 25)
log.Infof(ctx, "executing GetSpanRecordings on nodes %s", ns)
grp := ctxgroup.WithContext(ctx)

for _, node := range ns {
if node.ID == t.localNodeID {
continue
}
id := node.ID // copy out of the loop variable
alloc, err := qp.Acquire(ctx, 1)
if err != nil {
return res, err
}

grp.GoCtx(func(ctx context.Context) error {
defer alloc.Release()

conn, err := t.dialer.Dial(ctx, id, rpc.DefaultClass)
if err != nil {
return err
}
remoteClient := newRemoteTraceClient(tracingservicepb.NewTraceClient(conn))
resp, err := remoteClient.GetSpanRecordings(ctx, traceID)
if err != nil {
return err
}
mu.Lock()
res = append(res, resp.SpanRecordings...)
mu.Unlock()
return nil
})
}
if err := grp.Wait(); err != nil {
return res, err
}

sort.SliceStable(res, func(i, j int) bool {
return res[i].StartTime.Before(res[j].StartTime)
})

return res, nil
}
Loading

0 comments on commit d67bb84

Please sign in to comment.