Skip to content

Commit

Permalink
Merge 4f4924a into fa1db2b
Browse files Browse the repository at this point in the history
  • Loading branch information
rinx authored Jan 21, 2021
2 parents fa1db2b + 4f4924a commit b1af006
Show file tree
Hide file tree
Showing 22 changed files with 328 additions and 97 deletions.
10 changes: 9 additions & 1 deletion charts/vald/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,15 @@ servers:
connection_timeout: {{ default .default.servers.grpc.server.grpc.connection_timeout .Values.servers.grpc.server.grpc.connection_timeout | quote }}
max_header_list_size: {{ default .default.servers.grpc.server.grpc.max_header_list_size .Values.servers.grpc.server.grpc.max_header_list_size }}
header_table_size: {{ default .default.servers.grpc.server.grpc.header_table_size .Values.servers.grpc.server.grpc.header_table_size }}
interceptors: {{ default .default.servers.grpc.server.grpc.interceptors .Values.servers.grpc.server.grpc.interceptors }}
{{- if .Values.servers.grpc.server.grpc.interceptors }}
interceptors:
{{- toYaml .Values.servers.grpc.server.grpc.interceptors | nindent 8 }}
{{- else if .default.servers.grpc.server.grpc.interceptors }}
interceptors:
{{- toYaml .default.servers.grpc.server.grpc.interceptors | nindent 8 }}
{{- else }}
interceptors: []
{{- end }}
{{- else }}
{{- toYaml .default.servers.grpc.server.grpc | nindent 6 }}
{{- end }}
Expand Down
2 changes: 1 addition & 1 deletion charts/vald/values.schema.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,10 @@ defaults:
# @schema {"name": "defaults.server_config.servers.grpc.server.grpc.header_table_size", "type": "integer"}
# defaults.server_config.servers.grpc.server.grpc.header_table_size -- gRPC server header table size
header_table_size: 0
# @schema {"name": "defaults.server_config.servers.grpc.server.grpc.interceptors", "type": "array"}
# @schema {"name": "defaults.server_config.servers.grpc.server.grpc.interceptors", "type": "array", "items": {"type": "string", "enum": ["RecoverInterceptor", "TracePayloadInterceptor"]}}
# defaults.server_config.servers.grpc.server.grpc.interceptors -- gRPC server interceptors
interceptors: []
interceptors:
- "RecoverInterceptor"
# @schema {"name": "defaults.server_config.servers.grpc.server.restart", "type": "boolean"}
# defaults.server_config.servers.grpc.server.restart -- gRPC server restart
restart: true
Expand Down
37 changes: 6 additions & 31 deletions internal/net/grpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package grpc

import (
"context"

"github.com/vdaas/vald/internal/safety"
"google.golang.org/grpc"
)

type (
UnaryServerInterceptor = grpc.UnaryServerInterceptor
StreamServerInterceptor = grpc.StreamServerInterceptor

UnaryServerInfo = grpc.UnaryServerInfo
UnaryHandler = grpc.UnaryHandler

StreamServerInfo = grpc.StreamServerInfo
StreamHandler = grpc.StreamHandler
)

var (
Expand All @@ -35,31 +38,3 @@ var (
StreamInterceptor = grpc.StreamInterceptor
ChainStreamInterceptor = grpc.ChainStreamInterceptor
)

func RecoverInterceptor() UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
err = safety.RecoverWithoutPanicFunc(func() (err error) {
resp, err = handler(ctx, req)
return err
})()
return resp, err
}
}

func RecoverStreamInterceptor() StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
return safety.RecoverWithoutPanicFunc(func() (err error) {
return handler(srv, ss)
})()
}
}
53 changes: 53 additions & 0 deletions internal/net/grpc/interceptor/server/recover/recover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// Copyright (C) 2019-2021 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package recover provides gRPC interceptors for recovery
package recover

import (
"context"

"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
)

func RecoverInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
err = safety.RecoverWithoutPanicFunc(func() (err error) {
resp, err = handler(ctx, req)
return err
})()
return resp, err
}
}

func RecoverStreamInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
return safety.RecoverWithoutPanicFunc(func() (err error) {
return handler(srv, ss)
})()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,31 @@
// limitations under the License.
//

// Package grpc provides generic functionality for grpc
package grpc
// Package recover provides gRPC interceptors for recovery
package recover

import (
"reflect"
"testing"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"go.uber.org/goleak"
)

func TestRecoverInterceptor(t *testing.T) {
t.Parallel()
type want struct {
want UnaryServerInterceptor
want grpc.UnaryServerInterceptor
}
type test struct {
name string
want want
checkFunc func(want, UnaryServerInterceptor) error
checkFunc func(want, grpc.UnaryServerInterceptor) error
beforeFunc func()
afterFunc func()
}
defaultCheckFunc := func(w want, got UnaryServerInterceptor) error {
defaultCheckFunc := func(w want, got grpc.UnaryServerInterceptor) error {
if !reflect.DeepEqual(got, w.want) {
return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
}
Expand Down Expand Up @@ -84,23 +85,24 @@ func TestRecoverInterceptor(t *testing.T) {
if err := test.checkFunc(test.want, got); err != nil {
tt.Errorf("error = %v", err)
}

})
}
}

func TestRecoverStreamInterceptor(t *testing.T) {
t.Parallel()
type want struct {
want StreamServerInterceptor
want grpc.StreamServerInterceptor
}
type test struct {
name string
want want
checkFunc func(want, StreamServerInterceptor) error
checkFunc func(want, grpc.StreamServerInterceptor) error
beforeFunc func()
afterFunc func()
}
defaultCheckFunc := func(w want, got StreamServerInterceptor) error {
defaultCheckFunc := func(w want, got grpc.StreamServerInterceptor) error {
if !reflect.DeepEqual(got, w.want) {
return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
}
Expand Down Expand Up @@ -147,6 +149,7 @@ func TestRecoverStreamInterceptor(t *testing.T) {
if err := test.checkFunc(test.want, got); err != nil {
tt.Errorf("error = %v", err)
}

})
}
}
168 changes: 168 additions & 0 deletions internal/net/grpc/interceptor/server/trace/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//
// Copyright (C) 2019-2021 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package trace provides gRPC interceptors for traces
package trace

import (
"bytes"
"context"
"path"
"sync"

"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
)

const (
grpcKindUnary = "unary"
grpcKindStream = "stream"

traceAttrGRPCKind = "grpc.kind"
traceAttrGRPCService = "grpc.service"
traceAttrGRPCMethod = "grpc.method"

traceAttrGRPCRequestPayload = "grpc.request.payload"
traceAttrGRPCResponsePayload = "grpc.response.payload"
)

var (
bufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
)

func TracePayloadInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
span := trace.FromContext(ctx)
if span == nil {
return handler(ctx, req)
}

service, method := parseMethod(info.FullMethod)
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCKind, grpcKindUnary),
trace.StringAttribute(traceAttrGRPCService, service),
trace.StringAttribute(traceAttrGRPCMethod, method),
)

if reqj := marshalJSON(req); reqj != "" {
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCRequestPayload, reqj),
)
}

resp, err = handler(ctx, req)

if resj := marshalJSON(resp); resj != "" {
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCResponsePayload, resj),
)
}

return resp, err
}
}

func TracePayloadStreamInterceptor() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
span := trace.FromContext(ss.Context())
if span == nil {
return handler(srv, ss)
}

service, method := parseMethod(info.FullMethod)
span.AddAttributes(
trace.StringAttribute(traceAttrGRPCKind, grpcKindStream),
trace.StringAttribute(traceAttrGRPCService, service),
trace.StringAttribute(traceAttrGRPCMethod, method),
)

tss := &tracingServerStream{
ServerStream: ss,
}

err := handler(srv, tss)

span.AddAttributes(
trace.StringAttribute(traceAttrGRPCRequestPayload, tss.request),
trace.StringAttribute(traceAttrGRPCResponsePayload, tss.response),
)

return err
}
}

type tracingServerStream struct {
grpc.ServerStream
request string
response string
}

func (tss *tracingServerStream) RecvMsg(m interface{}) error {
err := tss.ServerStream.RecvMsg(m)
if err == nil && tss.request == "" {
if reqj := marshalJSON(m); reqj != "" {
tss.request = reqj
}
}

return err
}

func (tss *tracingServerStream) SendMsg(m interface{}) error {
err := tss.ServerStream.SendMsg(m)
if err == nil && tss.response == "" {
if resj := marshalJSON(m); resj != "" {
tss.response = resj
}
}

return err
}

func parseMethod(fullMethod string) (service, method string) {
service = path.Dir(fullMethod)[1:]
method = path.Base(fullMethod)

return service, method
}

func marshalJSON(pbMsg interface{}) string {
b := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(b)
defer b.Reset()

err := json.Encode(b, pbMsg)
if err != nil {
return ""
}

return b.String()
}
Loading

0 comments on commit b1af006

Please sign in to comment.