diff --git a/cmd/agent/app/processors/thrift_processor.go b/cmd/agent/app/processors/thrift_processor.go index 8856aa5a6b0..f16aea5cd86 100644 --- a/cmd/agent/app/processors/thrift_processor.go +++ b/cmd/agent/app/processors/thrift_processor.go @@ -119,7 +119,9 @@ func (s *ThriftProcessor) processBuffer() { protocol.Transport().Write(payload) s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", len(payload))) - if ok, err := s.handler.Process(context.Background(), protocol, protocol); !ok { + // NB: oddly, thrift-gen/agent/agent.go:L156 does this: `return true, thrift.WrapTException(err2)` + // So we check for both OK and error. + if ok, err := s.handler.Process(context.Background(), protocol, protocol); !ok || err != nil { s.logger.Error("Processor failed", zap.Error(err)) s.metrics.HandlerProcessError.Inc(1) } diff --git a/cmd/agent/app/processors/thrift_processor_test.go b/cmd/agent/app/processors/thrift_processor_test.go index 37a030b74cc..99730c906b1 100644 --- a/cmd/agent/app/processors/thrift_processor_test.go +++ b/cmd/agent/app/processors/thrift_processor_test.go @@ -24,7 +24,7 @@ import ( "github.com/apache/thrift/lib/go/thrift" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" + "go.uber.org/zap/zaptest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -64,7 +64,7 @@ func createProcessor(t *testing.T, mFactory metrics.Factory, tFactory thrift.TPr require.NoError(t, err) numProcessors := 1 - processor, err := NewThriftProcessor(server, numProcessors, mFactory, tFactory, handler, zap.NewNop()) + processor, err := NewThriftProcessor(server, numProcessors, mFactory, tFactory, handler, zaptest.NewLogger(t)) require.NoError(t, err) go processor.Serve() @@ -83,14 +83,14 @@ func initCollectorAndReporter(t *testing.T) (*metricstest.Factory, *testutils.Gr grpcCollector := testutils.StartGRPCCollector(t) conn, err := grpc.Dial(grpcCollector.Listener().Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) - rep := grpcrep.NewReporter(conn, map[string]string{}, zap.NewNop()) + rep := grpcrep.NewReporter(conn, map[string]string{}, zaptest.NewLogger(t)) metricsFactory := metricstest.NewFactory(0) reporter := reporter.WrapWithMetrics(rep, metricsFactory) return metricsFactory, grpcCollector, reporter, conn } func TestNewThriftProcessor_ZeroCount(t *testing.T) { - _, err := NewThriftProcessor(nil, 0, nil, nil, nil, zap.NewNop()) + _, err := NewThriftProcessor(nil, 0, nil, nil, nil, zaptest.NewLogger(t)) assert.EqualError(t, err, "number of processors must be greater than 0, called with 0") } @@ -153,31 +153,36 @@ func TestProcessor_HandlerError(t *testing.T) { ) } +// TestJaegerProcessor instantiates a real UDP receiver and a real gRPC collector +// and executes end-to-end batch submission. func TestJaegerProcessor(t *testing.T) { tests := []struct { factory thrift.TProtocolFactory + name string }{ - {compactFactory}, - {binaryFactory}, + {compactFactory, "compact"}, + {binaryFactory, "binary"}, } for _, test := range tests { - metricsFactory, collector, reporter, conn := initCollectorAndReporter(t) + t.Run(test.name, func(t *testing.T) { + metricsFactory, collector, reporter, conn := initCollectorAndReporter(t) - hostPort, processor := createProcessor(t, metricsFactory, test.factory, agent.NewAgentProcessor(reporter)) + hostPort, processor := createProcessor(t, metricsFactory, test.factory, agent.NewAgentProcessor(reporter)) - client, clientCloser, err := testutils.NewJaegerThriftUDPClient(hostPort, test.factory) - require.NoError(t, err) + client, clientCloser, err := testutils.NewJaegerThriftUDPClient(hostPort, test.factory) + require.NoError(t, err) - err = client.EmitBatch(context.Background(), batch) - require.NoError(t, err) + err = client.EmitBatch(context.Background(), batch) + require.NoError(t, err) - assertJaegerProcessorCorrectness(t, collector, metricsFactory) + assertJaegerProcessorCorrectness(t, collector, metricsFactory) - processor.Stop() - clientCloser.Close() - conn.Close() - collector.Close() + processor.Stop() + clientCloser.Close() + conn.Close() + collector.Close() + }) } } @@ -188,7 +193,7 @@ func assertJaegerProcessorCorrectness(t *testing.T, collector *testutils.GrpcCol nameF := func() string { return collector.GetJaegerBatches()[0].Spans[0].OperationName } - assertProcessorCorrectness(t, metricsFactory, sizeF, nameF, "jaeger") + assertCollectorReceivedData(t, metricsFactory, sizeF, nameF, "jaeger") } func assertZipkinProcessorCorrectness(t *testing.T, collector *testutils.GrpcCollector, metricsFactory *metricstest.Factory) { @@ -198,37 +203,40 @@ func assertZipkinProcessorCorrectness(t *testing.T, collector *testutils.GrpcCol nameF := func() string { return collector.GetJaegerBatches()[0].Spans[0].OperationName } - assertProcessorCorrectness(t, metricsFactory, sizeF, nameF, "zipkin") + assertCollectorReceivedData(t, metricsFactory, sizeF, nameF, "zipkin") } -func assertProcessorCorrectness( +// assertCollectorReceivedData verifies that collector received the data +// and that agent reporter emitted corresponding metrics. +func assertCollectorReceivedData( t *testing.T, metricsFactory *metricstest.Factory, sizeF func() int, nameF func() string, format string, ) { - // wait for server to receive - for i := 0; i < 1000; i++ { - if sizeF() == 1 { - break - } - time.Sleep(1 * time.Millisecond) - } - - require.Equal(t, 1, sizeF()) + require.Eventually(t, + func() bool { return sizeF() == 1 }, + 5*time.Second, + time.Millisecond, + "server should have received spans") assert.Equal(t, testSpanName, nameF()) - // wait for reporter to emit metrics - for i := 0; i < 1000; i++ { + key := "reporter.spans.submitted|format=" + format + ok := assert.Eventuallyf(t, + func() bool { + c, _ := metricsFactory.Snapshot() + _, ok := c[key] + return ok + }, + 5*time.Second, + time.Millisecond, + "reporter should have emitted metric %s", key) + if !ok { c, _ := metricsFactory.Snapshot() - if _, ok := c["tc-reporter.spans.submitted"]; ok { - break - } - time.Sleep(1 * time.Millisecond) + t.Log("all metrics", c) } - // agentReporter must emit metrics metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ {Name: "reporter.batches.submitted", Tags: map[string]string{"format": format}, Value: 1}, {Name: "reporter.spans.submitted", Tags: map[string]string{"format": format}, Value: 1}, diff --git a/cmd/agent/app/reporter/grpc/reporter.go b/cmd/agent/app/reporter/grpc/reporter.go index b68a50cc9f8..9349b4667e7 100644 --- a/cmd/agent/app/reporter/grpc/reporter.go +++ b/cmd/agent/app/reporter/grpc/reporter.go @@ -17,6 +17,7 @@ package grpc import ( "context" + "fmt" "go.uber.org/zap" "google.golang.org/grpc" @@ -79,6 +80,7 @@ func (r *Reporter) send(ctx context.Context, spans []*model.Span, process *model } else { r.logger.Error("Could not send spans over gRPC", zap.Error(err)) } + err = fmt.Errorf("failed to export spans: %w", err) } return err } diff --git a/cmd/agent/app/reporter/grpc/reporter_test.go b/cmd/agent/app/reporter/grpc/reporter_test.go index 32b08f840c9..39d4c234b33 100644 --- a/cmd/agent/app/reporter/grpc/reporter_test.go +++ b/cmd/agent/app/reporter/grpc/reporter_test.go @@ -131,12 +131,12 @@ func TestReporter_EmitBatch(t *testing.T) { } func TestReporter_SendFailure(t *testing.T) { - conn, err := grpc.Dial("", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial("invalid-host-name-blah:12345", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) rep := NewReporter(conn, nil, zap.NewNop()) err = rep.send(context.Background(), nil, nil) require.Error(t, err) - assert.Contains(t, err.Error(), "transport: Error while dialing dial tcp: missing address") + assert.Contains(t, err.Error(), "failed to export spans:") } func TestReporter_AddProcessTags_EmptyTags(t *testing.T) { @@ -218,11 +218,11 @@ func TestReporter_MultitenantEmitBatch(t *testing.T) { }{ { in: &jThrift.Batch{Process: &jThrift.Process{ServiceName: "node"}, Spans: []*jThrift.Span{{OperationName: "foo", StartTime: int64(model.TimeAsEpochMicroseconds(tm))}}}, - err: "rpc error: code = PermissionDenied desc = missing tenant header", + err: "missing tenant header", }, } for _, test := range tests { err = rep.EmitBatch(context.Background(), test.in) - assert.EqualError(t, err, test.err) + assert.Contains(t, err.Error(), test.err) } } diff --git a/cmd/agent/app/testutils/mock_grpc_collector.go b/cmd/agent/app/testutils/mock_grpc_collector.go index b7bf0f70b37..6c0bec83f2d 100644 --- a/cmd/agent/app/testutils/mock_grpc_collector.go +++ b/cmd/agent/app/testutils/mock_grpc_collector.go @@ -41,7 +41,7 @@ func StartGRPCCollector(t *testing.T) *GrpcCollector { server := grpc.NewServer() lis, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) - handler := &mockSpanHandler{} + handler := &mockSpanHandler{t: t} api_v2.RegisterCollectorServiceServer(server, handler) go func() { require.NoError(t, server.Serve(lis)) @@ -65,6 +65,7 @@ func (c *GrpcCollector) Close() error { } type mockSpanHandler struct { + t *testing.T mux sync.Mutex requests []*api_v2.PostSpansRequest } @@ -81,6 +82,7 @@ func (h *mockSpanHandler) GetJaegerBatches() []model.Batch { } func (h *mockSpanHandler) PostSpans(_ context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) { + h.t.Logf("mockSpanHandler received %d spans", len(r.Batch.Spans)) h.mux.Lock() defer h.mux.Unlock() h.requests = append(h.requests, r) diff --git a/go.mod b/go.mod index b4f64368485..76818d4c0a4 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( go.uber.org/zap v1.24.0 golang.org/x/net v0.5.0 golang.org/x/sys v0.4.0 - google.golang.org/grpc v1.51.0 + google.golang.org/grpc v1.52.0 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -150,7 +150,7 @@ require ( go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect golang.org/x/text v0.6.0 // indirect - google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect + google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 81323a2042f..fdadcae841e 100644 --- a/go.sum +++ b/go.sum @@ -1111,8 +1111,8 @@ google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c h1:QgY/XxIAIeccR+Ca/rDdKubLIU9rcJ3xfy1DC/Wd2Oo= -google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+E5bKwmfYo09AXuVN4dD894kIKUFmVbP2/Fo= +google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c= +google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -1136,8 +1136,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U= -google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= +google.golang.org/grpc v1.52.0 h1:kd48UiU7EHsV4rnLyOJRuP/Il/UHE7gdDAQ+SZI7nZk= +google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=