Skip to content

Commit

Permalink
Merge pull request #140 from DataDog/fricounet/upstream/otel-tracing-…
Browse files Browse the repository at this point in the history
…grpc

Add support for otel tracing of grpc calls
  • Loading branch information
k8s-ci-robot authored Aug 25, 2023
2 parents 0911089 + 47cfaa9 commit 7b02e8d
Show file tree
Hide file tree
Showing 96 changed files with 20,513 additions and 12 deletions.
59 changes: 50 additions & 9 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -56,6 +57,8 @@ func SetMaxGRPCLogLength(characterCount int) {
//
// The function tries to connect for 30 seconds, and returns an error if no connection has been established at that point.
// The function automatically disables TLS and adds interceptor for logging of all gRPC messages at level 5.
// If the metricsManager is 'nil', no metrics will be recorded on the gRPC calls.
// The function behaviour can be tweaked with options.
//
// For a connection to a Unix Domain socket, the behavior after
// loosing the connection is configurable. The default is to
Expand All @@ -70,12 +73,20 @@ func SetMaxGRPCLogLength(characterCount int) {
// For other connections, the default behavior from gRPC is used and
// loss of connection is not detected reliably.
func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
return connect(address, metricsManager, []grpc.DialOption{grpc.WithTimeout(time.Second * 30)}, options)
// Prepend default options
options = append([]Option{WithTimeout(time.Second * 30)}, options...)
if metricsManager != nil {
options = append([]Option{WithMetrics(metricsManager)}, options...)
}
return connect(address, options)
}

// ConnectWithoutMetrics behaves exactly like Connect except no metrics are recorded.
// This function is deprecated, prefer using Connect with `nil` as the metricsManager.
func ConnectWithoutMetrics(address string, options ...Option) (*grpc.ClientConn, error) {
return connect(address, nil, []grpc.DialOption{grpc.WithTimeout(time.Second * 30)}, options)
// Prepend default options
options = append([]Option{WithTimeout(time.Second * 30)}, options...)
return connect(address, options)
}

// Option is the type of all optional parameters for Connect.
Expand Down Expand Up @@ -105,29 +116,59 @@ func ExitOnConnectionLoss() func() bool {
}
}

// WithTimeout adds a configurable timeout on the gRPC calls.
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
}
}

// WithMetrics enables the recording of metrics on the gRPC calls with the provided CSIMetricsManager.
func WithMetrics(metricsManager metrics.CSIMetricsManager) Option {
return func(o *options) {
o.metricsManager = metricsManager
}
}

// WithOtelTracing enables the recording of traces on the gRPC calls with opentelemetry gRPC interceptor.
func WithOtelTracing() Option {
return func(o *options) {
o.enableOtelTracing = true
}
}

type options struct {
reconnect func() bool
reconnect func() bool
timeout time.Duration
metricsManager metrics.CSIMetricsManager
enableOtelTracing bool
}

// connect is the internal implementation of Connect. It has more options to enable testing.
func connect(
address string,
metricsManager metrics.CSIMetricsManager,
dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
connectOptions []Option) (*grpc.ClientConn, error) {
var o options
for _, option := range connectOptions {
option(&o)
}

dialOptions = append(dialOptions,
dialOptions := []grpc.DialOption{
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
grpc.WithBlock(), // Block until connection succeeds.
)
}

if o.timeout > 0 {
dialOptions = append(dialOptions, grpc.WithTimeout(o.timeout))
}

interceptors := []grpc.UnaryClientInterceptor{LogGRPC}
if metricsManager != nil {
interceptors = append(interceptors, ExtendedCSIMetricsManager{metricsManager}.RecordMetricsClientInterceptor)
if o.metricsManager != nil {
interceptors = append(interceptors, ExtendedCSIMetricsManager{o.metricsManager}.RecordMetricsClientInterceptor)
}
if o.enableOtelTracing {
interceptors = append(interceptors, otelgrpc.UnaryClientInterceptor())
}
dialOptions = append(dialOptions, grpc.WithChainUnaryInterceptor(interceptors...))

Expand Down
61 changes: 58 additions & 3 deletions connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -138,13 +139,38 @@ func TestConnectWithoutMetrics(t *testing.T) {
addr, stopServer := startServer(t, tmp, nil, nil, nil)
defer stopServer()

conn, err := ConnectWithoutMetrics("unix:///" + addr)
// With Connect
conn, err := Connect("unix:///"+addr, nil)
if assert.NoError(t, err, "connect with unix:/// prefix") &&
assert.NotNil(t, conn, "got a connection") {
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
err = conn.Close()
assert.NoError(t, err, "closing connection")
}

// With ConnectWithoutMetics
conn, err = ConnectWithoutMetrics("unix:///" + addr)
if assert.NoError(t, err, "connect with unix:/// prefix") &&
assert.NotNil(t, conn, "got a connection") {
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
err = conn.Close()
assert.NoError(t, err, "closing connection")
}
}

func TestConnectWithOtelTracing(t *testing.T) {
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
addr, stopServer := startServer(t, tmp, nil, nil, nil)
defer stopServer()

conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"), WithOtelTracing())
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
err = conn.Close()
assert.NoError(t, err, "closing connection")
}
}

func TestWaitForServer(t *testing.T) {
Expand Down Expand Up @@ -191,13 +217,13 @@ func TestWaitForServer(t *testing.T) {
}
}

func TestTimout(t *testing.T) {
func TestTimeout(t *testing.T) {
tmp := tmpDir(t)
defer os.RemoveAll(tmp)

startTime := time.Now()
timeout := 5 * time.Second
conn, err := connect(path.Join(tmp, "no-such.sock"), metrics.NewCSIMetricsManager("fake.csi.driver.io"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil)
conn, err := connect(path.Join(tmp, "no-such.sock"), []Option{WithTimeout(timeout)})
endTime := time.Now()
if assert.Error(t, err, "connection should fail") {
assert.InEpsilon(t, timeout, endTime.Sub(startTime), 1, "connection timeout")
Expand Down Expand Up @@ -491,3 +517,32 @@ func verifyMetricsError(t *testing.T, err error, metricToIgnore string) error {

return nil
}

func TestConnectWithOtelGrpcInterceptorTraces(t *testing.T) {
t.Logf("Running regular connection test")
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
// We have to have a real implementation of the gRPC call, otherwise the trace
// interceptor is not called. The CSI identity service is used because it's simple.
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, nil)
defer stopServer()

conn, err := Connect(addr, nil, WithOtelTracing())

if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")

identityClient := csi.NewIdentityClient(conn)
ctx := context.Background()
if _, err := identityClient.GetPluginInfo(ctx, &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
errStatus, _ := status.FromError(err)
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
}

// First traceID is 00000000000000000000000000000000
assert.Equal(t, "00000000000000000000000000000000", trace.SpanContextFromContext(ctx).TraceID().String())
}

}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
github.com/container-storage-interface/spec v1.8.0
github.com/golang/protobuf v1.5.3
github.com/stretchr/testify v1.8.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.41.0
go.opentelemetry.io/otel/trace v1.15.0
golang.org/x/net v0.13.0
google.golang.org/grpc v1.54.0
k8s.io/api v0.28.0
Expand All @@ -21,6 +23,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
Expand All @@ -43,6 +46,8 @@ require (
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/otel v1.15.0 // indirect
go.opentelemetry.io/otel/metric v0.38.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
Expand All @@ -14,8 +16,11 @@ github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhF
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
Expand Down Expand Up @@ -93,6 +98,14 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.41.0 h1:pWlIooxHVVdetyXFDsuzfqV42lXVIDmVGBCHeaXzDyI=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.41.0/go.mod h1:YjmsSWM1VTcWXFSgyrmLADPMZZohioz9onjgkikk59w=
go.opentelemetry.io/otel v1.15.0 h1:NIl24d4eiLJPM0vKn4HjLYM+UZf6gSfi9Z+NmCxkWbk=
go.opentelemetry.io/otel v1.15.0/go.mod h1:qfwLEbWhLPk5gyWrne4XnF0lC8wtywbuJbgfAE3zbek=
go.opentelemetry.io/otel/metric v0.38.0 h1:vv/Nv/44S3GzMMmeUhaesBKsAenE6xLkTVWL+zuv30w=
go.opentelemetry.io/otel/metric v0.38.0/go.mod h1:uAtxN5hl8aXh5irD8afBtSwQU5Zjg64WWSz6KheZxBg=
go.opentelemetry.io/otel/trace v1.15.0 h1:5Fwje4O2ooOxkfyqI/kJwxWotggDLix4BSAvpE1wlpo=
go.opentelemetry.io/otel/trace v1.15.0/go.mod h1:CUsmE2Ht1CRkvE8OsMESvraoZrrcgD1J2W8GV1ev0Y4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
Loading

0 comments on commit 7b02e8d

Please sign in to comment.