Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a gRPC interceptor for embedding payloads into trace spans #900

Merged
merged 7 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion charts/vald/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,15 @@ servers:
connection_timeout: {{ default .default.servers.grpc.server.grpc.connection_timeout .Values.servers.grpc.server.grpc.connection_timeout | quote }}
max_header_list_size: {{ default .default.servers.grpc.server.grpc.max_header_list_size .Values.servers.grpc.server.grpc.max_header_list_size }}
header_table_size: {{ default .default.servers.grpc.server.grpc.header_table_size .Values.servers.grpc.server.grpc.header_table_size }}
interceptors: {{ default .default.servers.grpc.server.grpc.interceptors .Values.servers.grpc.server.grpc.interceptors }}
{{- if .Values.servers.grpc.server.grpc.interceptors }}
interceptors:
{{- toYaml .Values.servers.grpc.server.grpc.interceptors | nindent 8 }}
{{- else if .default.servers.grpc.server.grpc.interceptors }}
interceptors:
{{- toYaml .default.servers.grpc.server.grpc.interceptors | nindent 8 }}
{{- else }}
interceptors: []
{{- end }}
{{- else }}
{{- toYaml .default.servers.grpc.server.grpc | nindent 6 }}
{{- end }}
Expand Down
2 changes: 1 addition & 1 deletion charts/vald/values.schema.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ defaults:
# @schema {"name": "defaults.server_config.servers.grpc.server.grpc.header_table_size", "type": "integer"}
# defaults.server_config.servers.grpc.server.grpc.header_table_size -- gRPC server header table size
header_table_size: 0
# @schema {"name": "defaults.server_config.servers.grpc.server.grpc.interceptors", "type": "array"}
# @schema {"name": "defaults.server_config.servers.grpc.server.grpc.interceptors", "type": "array", "items": {"type": "string", "enum": ["RecoverInterceptor", "TracePayloadInterceptor"]}}
# defaults.server_config.servers.grpc.server.grpc.interceptors -- gRPC server interceptors
interceptors: []
interceptors:
- "RecoverInterceptor"
# @schema {"name": "defaults.server_config.servers.grpc.server.restart", "type": "boolean"}
# defaults.server_config.servers.grpc.server.restart -- gRPC server restart
restart: true
Expand Down
37 changes: 6 additions & 31 deletions internal/net/grpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package grpc

import (
"context"

"github.com/vdaas/vald/internal/safety"
"google.golang.org/grpc"
)

type (
UnaryServerInterceptor = grpc.UnaryServerInterceptor
StreamServerInterceptor = grpc.StreamServerInterceptor

UnaryServerInfo = grpc.UnaryServerInfo
UnaryHandler = grpc.UnaryHandler

StreamServerInfo = grpc.StreamServerInfo
StreamHandler = grpc.StreamHandler
)

var (
Expand All @@ -35,31 +38,3 @@ var (
StreamInterceptor = grpc.StreamInterceptor
ChainStreamInterceptor = grpc.ChainStreamInterceptor
)

func RecoverInterceptor() UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
err = safety.RecoverWithoutPanicFunc(func() (err error) {
resp, err = handler(ctx, req)
return err
})()
return resp, err
}
}

func RecoverStreamInterceptor() StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
return safety.RecoverWithoutPanicFunc(func() (err error) {
return handler(srv, ss)
})()
}
}
53 changes: 53 additions & 0 deletions internal/net/grpc/interceptor/server/recover/recover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// Copyright (C) 2019-2021 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package recover provides gRPC interceptors for recovery
package recover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
package name recover has same name as predeclared identifier (predeclared)


import (
"context"

"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
)

func RecoverInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
err = safety.RecoverWithoutPanicFunc(func() (err error) {
resp, err = handler(ctx, req)
return err
})()
return resp, err
}
}

func RecoverStreamInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
return safety.RecoverWithoutPanicFunc(func() (err error) {
return handler(srv, ss)
})()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,31 @@
// limitations under the License.
//

// Package grpc provides generic functionality for grpc
package grpc
// Package recover provides gRPC interceptors for recovery
package recover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
package name recover has same name as predeclared identifier (predeclared)


import (
"reflect"
"testing"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"go.uber.org/goleak"
)

func TestRecoverInterceptor(t *testing.T) {
t.Parallel()
type want struct {
want UnaryServerInterceptor
want grpc.UnaryServerInterceptor
}
type test struct {
name string
want want
checkFunc func(want, UnaryServerInterceptor) error
checkFunc func(want, grpc.UnaryServerInterceptor) error
beforeFunc func()
afterFunc func()
}
defaultCheckFunc := func(w want, got UnaryServerInterceptor) error {
defaultCheckFunc := func(w want, got grpc.UnaryServerInterceptor) error {
if !reflect.DeepEqual(got, w.want) {
return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
}
Expand Down Expand Up @@ -84,23 +85,24 @@ func TestRecoverInterceptor(t *testing.T) {
if err := test.checkFunc(test.want, got); err != nil {
tt.Errorf("error = %v", err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
File is not gofumpt-ed (gofumpt)

})
}
}

func TestRecoverStreamInterceptor(t *testing.T) {
t.Parallel()
type want struct {
want StreamServerInterceptor
want grpc.StreamServerInterceptor
}
type test struct {
name string
want want
checkFunc func(want, StreamServerInterceptor) error
checkFunc func(want, grpc.StreamServerInterceptor) error
beforeFunc func()
afterFunc func()
}
defaultCheckFunc := func(w want, got StreamServerInterceptor) error {
defaultCheckFunc := func(w want, got grpc.StreamServerInterceptor) error {
if !reflect.DeepEqual(got, w.want) {
return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
}
Expand Down Expand Up @@ -147,6 +149,7 @@ func TestRecoverStreamInterceptor(t *testing.T) {
if err := test.checkFunc(test.want, got); err != nil {
tt.Errorf("error = %v", err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
File is not gofumpt-ed (gofumpt)

})
}
}
168 changes: 168 additions & 0 deletions internal/net/grpc/interceptor/server/trace/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//
// Copyright (C) 2019-2021 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package trace provides gRPC interceptors for traces
package trace

import (
"bytes"
"context"
"path"
"sync"

"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
)

const (
grpcKindUnary = "unary"
grpcKindStream = "stream"

traceAttrGRPCKind = "grpc.kind"
traceAttrGRPCService = "grpc.service"
traceAttrGRPCMethod = "grpc.method"

traceAttrGRPCRequestPayload = "grpc.request.payload"
traceAttrGRPCResponsePayload = "grpc.response.payload"
)

var (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
File is not gofumpt-ed (gofumpt)

bufferPool = sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
bufferPool is a global variable (gochecknoglobals)

New: func() interface{} {
return &bytes.Buffer{}
},
}
)

func TracePayloadInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
span := trace.FromContext(ctx)
if span == nil {
return handler(ctx, req)
}

service, method := parseMethod(info.FullMethod)
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCKind, grpcKindUnary),
trace.StringAttribute(traceAttrGRPCService, service),
trace.StringAttribute(traceAttrGRPCMethod, method),
)

if reqj := marshalJSON(req); reqj != "" {
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCRequestPayload, reqj),
)
}

resp, err = handler(ctx, req)

if resj := marshalJSON(resp); resj != "" {
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCResponsePayload, resj),
)
}

return resp, err
}
}

func TracePayloadStreamInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
span := trace.FromContext(ss.Context())
if span == nil {
return handler(srv, ss)
}

service, method := parseMethod(info.FullMethod)
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCKind, grpcKindStream),
trace.StringAttribute(traceAttrGRPCService, service),
trace.StringAttribute(traceAttrGRPCMethod, method),
)

tss := &tracingServerStream{
ServerStream: ss,
}

err := handler(srv, tss)

span.AddAttributes(
trace.StringAttribute(traceAttrGRPCRequestPayload, tss.request),
trace.StringAttribute(traceAttrGRPCResponsePayload, tss.response),
)

return err
}
}

type tracingServerStream struct {
grpc.ServerStream
request string
response string
}

func (tss *tracingServerStream) RecvMsg(m interface{}) error {
err := tss.ServerStream.RecvMsg(m)
if err == nil && tss.request == "" {
if reqj := marshalJSON(m); reqj != "" {
tss.request = reqj
}
}

return err
}

func (tss *tracingServerStream) SendMsg(m interface{}) error {
err := tss.ServerStream.SendMsg(m)
if err == nil && tss.response == "" {
if resj := marshalJSON(m); resj != "" {
tss.response = resj
}
}

return err
}

func parseMethod(fullMethod string) (service, method string) {
service = path.Dir(fullMethod)[1:]
method = path.Base(fullMethod)

return service, method
}

func marshalJSON(pbMsg interface{}) string {
b := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(b)
defer b.Reset()

err := json.Encode(b, pbMsg)
if err != nil {
return ""
}

return b.String()
}
Loading