Skip to content

Commit

Permalink
Move approvals test into beater/jaeger
Browse files Browse the repository at this point in the history
And add approvals for Thrift-over-HTTP.
  • Loading branch information
axw committed Dec 20, 2019
1 parent 647b09d commit ad31536
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 91 deletions.
64 changes: 0 additions & 64 deletions beater/api/jaeger/grpc_approval_test.go

This file was deleted.

14 changes: 9 additions & 5 deletions beater/jaeger/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func testHTTPMux(t *testing.T, test httpMuxTest) {
}))
require.NoError(t, err)

body := encodeThriftBatch(test.spans...)
body := encodeThriftSpans(test.spans...)
req := httptest.NewRequest("POST", "/api/traces", body)
req.Header.Set("Content-Type", "application/x-thrift")

Expand Down Expand Up @@ -129,7 +129,7 @@ func TestHTTPMux_InvalidContentType(t *testing.T) {

func TestHTTPMux_ValidContentTypes(t *testing.T) {
for _, contentType := range []string{"application/x-thrift", "application/vnd.apache.thrift.binary"} {
body := encodeThriftBatch(&jaegerthrift.Span{})
body := encodeThriftSpans(&jaegerthrift.Span{})
c, recorder := newRequestContext("POST", "/api/traces", body)
c.Request.Header.Set("Content-Type", contentType)
newHTTPHandler(nopConsumer())(c)
Expand All @@ -149,7 +149,7 @@ func TestHTTPMux_ConsumerError(t *testing.T) {
var consumer traceConsumerFunc = func(ctx context.Context, td consumerdata.TraceData) error {
return errors.New("bauch tut weh")
}
c, recorder := newRequestContext("POST", "/api/traces", encodeThriftBatch(&jaegerthrift.Span{}))
c, recorder := newRequestContext("POST", "/api/traces", encodeThriftSpans(&jaegerthrift.Span{}))
newHTTPHandler(consumer)(c)
assert.Equal(t, http.StatusInternalServerError, recorder.Code)
assert.Regexp(t, `{"error":"internal error: bauch tut weh"}`+"\n", recorder.Body.String())
Expand All @@ -164,8 +164,12 @@ func newRequestContext(method, path string, body io.Reader) (*request.Context, *
return c, rr
}

func encodeThriftBatch(spans ...*jaegerthrift.Span) io.Reader {
batch := jaegerthrift.Batch{Process: &jaegerthrift.Process{ServiceName: "whatever"}, Spans: spans}
func encodeThriftSpans(spans ...*jaegerthrift.Span) io.Reader {
batch := &jaegerthrift.Batch{Process: &jaegerthrift.Process{ServiceName: "whatever"}, Spans: spans}
return encodeThriftBatch(batch)
}

func encodeThriftBatch(batch *jaegerthrift.Batch) io.Reader {
transport := thrift.NewTMemoryBuffer()
if err := batch.Write(thrift.NewTBinaryProtocolTransport(transport)); err != nil {
panic(err)
Expand Down
121 changes: 99 additions & 22 deletions beater/jaeger/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ package jaeger
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path/filepath"
"testing"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
jaegermodel "github.com/jaegertracing/jaeger/model"
jaegerthriftconv "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
jaegerthrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/stretchr/testify/assert"
Expand All @@ -39,9 +43,44 @@ import (

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/tests/approvals"
)

func TestApprovals(t *testing.T) {
cfg := config.DefaultConfig("8.0.0")
cfg.JaegerConfig.GRPC.Enabled = true
cfg.JaegerConfig.GRPC.Host = "localhost:0"
cfg.JaegerConfig.HTTP.Enabled = true
cfg.JaegerConfig.HTTP.Host = "localhost:0"

for _, name := range []string{
"batch_1", "batch_2",
} {
t.Run(name, func(t *testing.T) {
tc := testcase{cfg: cfg, grpcDialOpts: []grpc.DialOption{grpc.WithInsecure()}}
tc.setup(t)
defer tc.teardown(t)

f := filepath.Join("testdata", name)
data, err := ioutil.ReadFile(f + ".json")
require.NoError(t, err)
var request api_v2.PostSpansRequest
require.NoError(t, json.Unmarshal(data, &request))

require.NoError(t, tc.sendBatchGRPC(request.Batch))
require.NoError(t, approvals.ApproveEvents(tc.events, f, ""))

tc.events = nil
thriftBatch := &jaegerthrift.Batch{
Process: thriftProcessFromModel(request.Batch.Process),
Spans: jaegerthriftconv.FromDomain(request.Batch.Spans),
}
require.NoError(t, tc.sendBatchHTTP(thriftBatch))
require.NoError(t, approvals.ApproveEvents(tc.events, f, ""))
})
}
}

func TestServerIntegration(t *testing.T) {
for name, tc := range map[string]testcase{
"default config": {
Expand All @@ -51,6 +90,7 @@ func TestServerIntegration(t *testing.T) {
cfg: func() *config.Config {
cfg := config.DefaultConfig("8.0.0")
cfg.JaegerConfig.GRPC.Enabled = true
cfg.JaegerConfig.GRPC.Host = "localhost:0"
return cfg
}(),
grpcDialOpts: []grpc.DialOption{grpc.WithInsecure()},
Expand All @@ -59,14 +99,17 @@ func TestServerIntegration(t *testing.T) {
cfg: func() *config.Config {
cfg := config.DefaultConfig("8.0.0")
cfg.JaegerConfig.HTTP.Enabled = true
cfg.JaegerConfig.HTTP.Host = "localhost:0"
return cfg
}(),
},
"default config with Jaeger gRPC and HTTP enabled": {
cfg: func() *config.Config {
cfg := config.DefaultConfig("8.0.0")
cfg.JaegerConfig.GRPC.Enabled = true
cfg.JaegerConfig.GRPC.Host = "localhost:0"
cfg.JaegerConfig.HTTP.Enabled = true
cfg.JaegerConfig.HTTP.Host = "localhost:0"
return cfg
}(),
grpcDialOpts: []grpc.DialOption{grpc.WithInsecure()},
Expand Down Expand Up @@ -179,31 +222,31 @@ func TestServerIntegration(t *testing.T) {
return
}

var transformed int
var nevents int
if tc.grpcClient != nil {
err := tc.sendSpanGRPC()
if tc.grpcSendShouldFail {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, transformed+1, len(tc.transformed))
transformed++
assert.Equal(t, nevents+1, len(tc.events))
nevents++
tc.tracer.Flush(nil)
transactions := tc.tracer.Payloads().Transactions
require.Len(t, transactions, transformed)
assert.Equal(t, "/jaeger.api_v2.CollectorService/PostSpans", transactions[transformed-1].Name)
require.Len(t, transactions, nevents)
assert.Equal(t, "/jaeger.api_v2.CollectorService/PostSpans", transactions[nevents-1].Name)
}
}
if tc.httpURL != nil {
err := tc.sendSpanHTTP()
require.NoError(t, err)

assert.Equal(t, transformed+1, len(tc.transformed))
transformed++
assert.Equal(t, nevents+1, len(tc.events))
nevents++
tc.tracer.Flush(nil)
transactions := tc.tracer.Payloads().Transactions
require.Len(t, transactions, transformed)
assert.Equal(t, "POST /api/traces", transactions[transformed-1].Name)
require.Len(t, transactions, nevents)
assert.Equal(t, "POST /api/traces", transactions[nevents-1].Name)
}
})
}
Expand All @@ -215,17 +258,19 @@ type testcase struct {
grpcDialShouldFail bool
grpcSendShouldFail bool

transformed []transform.Transformable
server *Server
serverDone <-chan error
grpcClient *grpc.ClientConn
httpURL *url.URL
tracer *apmtest.RecordingTracer
events []beat.Event
server *Server
serverDone <-chan error
grpcClient *grpc.ClientConn
httpURL *url.URL
tracer *apmtest.RecordingTracer
}

func (tc *testcase) setup(t *testing.T) {
reporter := func(ctx context.Context, pub publish.PendingReq) error {
tc.transformed = append(tc.transformed, pub.Transformables...)
reporter := func(ctx context.Context, req publish.PendingReq) error {
for _, transformable := range req.Transformables {
tc.events = append(tc.events, transformable.Transform(req.Tcontext)...)
}
return nil
}

Expand Down Expand Up @@ -291,20 +336,29 @@ func (tc *testcase) teardown(t *testing.T) {
}

func (tc *testcase) sendSpanGRPC() error {
batch := jaegermodel.Batch{
return tc.sendBatchGRPC(jaegermodel.Batch{
Spans: []*jaegermodel.Span{{
TraceID: jaegermodel.NewTraceID(123, 456),
SpanID: jaegermodel.NewSpanID(789),
}},
}
client := api_v2.NewCollectorServiceClient(tc.grpcClient)
})
}

func (tc *testcase) sendBatchGRPC(batch jaegermodel.Batch) error {
client := api_v2.NewCollectorServiceClient(tc.grpcClient)
_, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{Batch: batch})
return err
}

func (tc *testcase) sendSpanHTTP() error {
body := encodeThriftBatch(&jaegerthrift.Span{TraceIdHigh: 123, TraceIdLow: 456, SpanId: 789})
return tc.sendBatchHTTP(&jaegerthrift.Batch{
Process: &jaegerthrift.Process{ServiceName: "whatever"},
Spans: []*jaegerthrift.Span{{TraceIdHigh: 123, TraceIdLow: 456, SpanId: 789}},
})
}

func (tc *testcase) sendBatchHTTP(batch *jaegerthrift.Batch) error {
body := encodeThriftBatch(batch)
resp, err := http.Post(tc.httpURL.String(), "application/x-thrift", body)
if err != nil {
return err
Expand All @@ -315,3 +369,26 @@ func (tc *testcase) sendSpanHTTP() error {
}
return nil
}

func thriftProcessFromModel(in *jaegermodel.Process) *jaegerthrift.Process {
out := &jaegerthrift.Process{ServiceName: in.ServiceName}
out.Tags = make([]*jaegerthrift.Tag, len(in.Tags))
for i, kv := range in.Tags {
kv := kv // copy for pointer refs
tag := &jaegerthrift.Tag{Key: kv.Key, VType: jaegerthrift.TagType(kv.VType)}
switch kv.VType {
case jaegermodel.ValueType_STRING:
tag.VStr = &kv.VStr
case jaegermodel.ValueType_BOOL:
tag.VBool = &kv.VBool
case jaegermodel.ValueType_INT64:
tag.VLong = &kv.VInt64
case jaegermodel.ValueType_FLOAT64:
tag.VDouble = &kv.VFloat64
case jaegermodel.ValueType_BINARY:
tag.VBinary = kv.VBinary
}
out.Tags[i] = tag
}
return out
}
File renamed without changes.
File renamed without changes.

0 comments on commit ad31536

Please sign in to comment.