Skip to content

Commit

Permalink
Fixed open metrics test.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Aug 6, 2021
1 parent cee3c63 commit 2f12196
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 241 deletions.
1 change: 0 additions & 1 deletion interceptors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterce
reporter.PostMsgSend(req, nil, time.Since(r.startTime))
err := invoker(newCtx, method, req, reply, cc, opts...)
reporter.PostMsgReceive(reply, err, time.Since(r.startTime))

reporter.PostCall(err, time.Since(r.startTime))
return err
}
Expand Down
6 changes: 3 additions & 3 deletions interceptors/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newCommonFields(kind string, c interceptors.CallMeta) Fields {
// Iter returns FieldsIterator.
func (f Fields) Iter() FieldsIterator {
// We start from -2 as we iterate over two items per iteration and first iteration will advance iterator to 0.
return iter{i: -2, f: f}
return &iter{i: -2, f: f}
}

// FieldsIterator is an interface allowing to iterate over fields.
Expand All @@ -72,7 +72,7 @@ type iter struct {
i int
}

func (i iter) Next() bool {
func (i *iter) Next() bool {
if i.i >= len(i.f) {
return false
}
Expand All @@ -81,7 +81,7 @@ func (i iter) Next() bool {
return i.i < len(i.f)
}

func (i iter) At() (k, v string) {
func (i *iter) At() (k, v string) {
if i.i < 0 || i.i >= len(i.f) {
return "", ""
}
Expand Down
124 changes: 38 additions & 86 deletions providers/openmetrics/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,120 +4,72 @@
package metrics

import (
"context"
"io"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2/testproto/v1"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/testing/testpb"
)

func TestClientInterceptorSuite(t *testing.T) {
suite.Run(t, &ClientInterceptorTestSuite{})
c := NewClientMetrics(WithClientHandlingTimeHistogram())
suite.Run(t, &ClientInterceptorTestSuite{
InterceptorTestSuite: &testpb.InterceptorTestSuite{
TestService: &testpb.TestPingService{T: t},
ClientOpts: []grpc.DialOption{
grpc.WithUnaryInterceptor(UnaryClientInterceptor(c)),
grpc.WithStreamInterceptor(StreamClientInterceptor(c)),
},
},
clientMetrics: c,
})
}

type ClientInterceptorTestSuite struct {
suite.Suite

serverListener net.Listener
server *grpc.Server
clientConn *grpc.ClientConn
testClient pb_testproto.TestServiceClient
ctx context.Context
cancel context.CancelFunc
clientMetrics *ClientMetrics
}

func (s *ClientInterceptorTestSuite) SetupSuite() {
var err error

// Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests.
s.ctx, s.cancel = context.WithTimeout(context.TODO(), 2*time.Second)

s.clientMetrics = NewClientMetrics(WithClientHandlingTimeHistogram())

s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")

// This is the point where we hook up the interceptor
s.server = grpc.NewServer()
pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})

go func() {
err = s.server.Serve(s.serverListener)
require.NoError(s.T(), err, "must not error on server listening")
}()

s.clientConn, err = grpc.DialContext(
s.ctx,
s.serverListener.Addr().String(),
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithUnaryInterceptor(UnaryClientInterceptor(s.clientMetrics)),
grpc.WithStreamInterceptor(StreamClientInterceptor(s.clientMetrics)),
)
require.NoError(s.T(), err, "must not error on client Dial")
s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
*testpb.InterceptorTestSuite
clientMetrics *ClientMetrics
}

func (s *ClientInterceptorTestSuite) SetupTest() {

// Make sure every test starts with same fresh, intialized metric state.
s.clientMetrics.clientStartedCounter.Reset()
s.clientMetrics.clientHandledCounter.Reset()
s.clientMetrics.clientHandledHistogram.Reset()
s.clientMetrics.clientStreamMsgReceived.Reset()
s.clientMetrics.clientStreamMsgSent.Reset()
}

func (s *ClientInterceptorTestSuite) TearDownSuite() {
if s.clientConn != nil {
s.clientConn.Close()
}
if s.serverListener != nil {
s.server.Stop()
s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String())
s.serverListener.Close()
}
}

func (s *ClientInterceptorTestSuite) TearDownTest() {
s.cancel()
}

func (s *ClientInterceptorTestSuite) TestUnaryIncrementsMetrics() {
_, err := s.testClient.PingEmpty(s.ctx, &pb_testproto.PingEmptyRequest{}) // should return with code=OK
_, err := s.Client.PingEmpty(s.SimpleCtx(), &testpb.PingEmptyRequest{})
require.NoError(s.T(), err)
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("unary", "providers.openmetrics.testproto.v1.TestService", "PingEmpty"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("unary", "providers.openmetrics.testproto.v1.TestService", "PingEmpty", "OK"))
requireValueHistCount(s.T(), 1, s.clientMetrics.clientHandledHistogram.WithLabelValues("unary", "providers.openmetrics.testproto.v1.TestService", "PingEmpty"))

_, err = s.testClient.PingError(s.ctx, &pb_testproto.PingErrorRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("unary", testpb.TestServiceFullName, "PingEmpty"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("unary", testpb.TestServiceFullName, "PingEmpty", "OK"))
requireValueHistCount(s.T(), 1, s.clientMetrics.clientHandledHistogram.WithLabelValues("unary", testpb.TestServiceFullName, "PingEmpty"))

_, err = s.Client.PingError(s.SimpleCtx(), &testpb.PingErrorRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
require.Error(s.T(), err)
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("unary", "providers.openmetrics.testproto.v1.TestService", "PingError"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("unary", "providers.openmetrics.testproto.v1.TestService", "PingError", "FailedPrecondition"))
requireValueHistCount(s.T(), 1, s.clientMetrics.clientHandledHistogram.WithLabelValues("unary", "providers.openmetrics.testproto.v1.TestService", "PingError"))
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("unary", testpb.TestServiceFullName, "PingError"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("unary", testpb.TestServiceFullName, "PingError", "FailedPrecondition"))
requireValueHistCount(s.T(), 1, s.clientMetrics.clientHandledHistogram.WithLabelValues("unary", testpb.TestServiceFullName, "PingError"))
}

func (s *ClientInterceptorTestSuite) TestStartedStreamingIncrementsStarted() {
_, err := s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{})
_, err := s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{})
require.NoError(s.T(), err)
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))

_, err = s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
_, err = s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
require.NoError(s.T(), err, "PingList must not fail immediately")
requireValue(s.T(), 2, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 2, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))
}

func (s *ClientInterceptorTestSuite) TestStreamingIncrementsMetrics() {
ss, err := s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{}) // should return with code=OK
ss, err := s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{})
require.NoError(s.T(), err)
// Do a read, just for kicks.
count := 0
Expand All @@ -129,23 +81,23 @@ func (s *ClientInterceptorTestSuite) TestStreamingIncrementsMetrics() {
require.NoError(s.T(), err, "reading pingList shouldn't fail")
count++
}
require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match")
require.EqualValues(s.T(), testpb.ListResponseCount, count, "Number of received msg on the wire must match")

requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList", "OK"))
requireValue(s.T(), countListResponses+1 /* + EOF */, s.clientMetrics.clientStreamMsgReceived.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientStreamMsgSent.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValueHistCount(s.T(), 1, s.clientMetrics.clientHandledHistogram.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList", "OK"))
requireValue(s.T(), testpb.ListResponseCount+1 /* + EOF */, s.clientMetrics.clientStreamMsgReceived.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientStreamMsgSent.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))
requireValueHistCount(s.T(), 1, s.clientMetrics.clientHandledHistogram.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))

ss, err = s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
ss, err = s.Client.PingList(s.SimpleCtx(), &testpb.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)})
require.NoError(s.T(), err, "PingList must not fail immediately")

// Do a read, just to progate errors.
_, err = ss.Recv()
st, _ := status.FromError(err)
require.Equal(s.T(), codes.FailedPrecondition, st.Code(), "Recv must return FailedPrecondition, otherwise the test is wrong")

requireValue(s.T(), 2, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList", "FailedPrecondition"))
requireValueHistCount(s.T(), 2, s.clientMetrics.clientHandledHistogram.WithLabelValues("server_stream", "providers.openmetrics.testproto.v1.TestService", "PingList"))
requireValue(s.T(), 2, s.clientMetrics.clientStartedCounter.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))
requireValue(s.T(), 1, s.clientMetrics.clientHandledCounter.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList", "FailedPrecondition"))
requireValueHistCount(s.T(), 2, s.clientMetrics.clientHandledHistogram.WithLabelValues("server_stream", testpb.TestServiceFullName, "PingList"))
}
8 changes: 4 additions & 4 deletions providers/openmetrics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ type reportable struct {
serverMetrics *ServerMetrics
}

func (rep *reportable) ServerReporter(ctx context.Context, _ interface{}, typ interceptors.GRPCType, service string, method string) (interceptors.Reporter, context.Context) {
return rep.reporter(ctx, rep.serverMetrics, nil, typ, service, method, KindServer)
func (rep *reportable) ServerReporter(ctx context.Context, meta interceptors.CallMeta) (interceptors.Reporter, context.Context) {
return rep.reporter(ctx, rep.serverMetrics, nil, meta.Typ, meta.Service, meta.Method, KindServer)
}

func (rep *reportable) ClientReporter(ctx context.Context, _ interface{}, typ interceptors.GRPCType, service string, method string) (interceptors.Reporter, context.Context) {
return rep.reporter(ctx, nil, rep.clientMetrics, typ, service, method, KindClient)
func (rep *reportable) ClientReporter(ctx context.Context, meta interceptors.CallMeta) (interceptors.Reporter, context.Context) {
return rep.reporter(ctx, nil, rep.clientMetrics, meta.Typ, meta.Service, meta.Method, KindClient)
}

func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *ClientMetrics, rpcType interceptors.GRPCType, service, method string, kind Kind) (interceptors.Reporter, context.Context) {
Expand Down
Loading

0 comments on commit 2f12196

Please sign in to comment.