diff --git a/README.md b/README.md index 5fbf33a28ff..019ac4c6d72 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,10 @@ receivers: endpoint: "localhost:14268" thrift-tchannel: endpoint: "localhost:14267" + thrift-compact: + endpoint: "localhost:6831" + thrift-binary: + endpoint: "localhost:6832" prometheus: config: diff --git a/go.mod b/go.mod index 206f5c18f8c..b6eeffb0afa 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/stretchr/testify v1.4.0 github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect - github.com/uber/jaeger-lib v2.0.0+incompatible + github.com/uber/jaeger-lib v2.2.0+incompatible github.com/uber/tchannel-go v1.10.0 go.opencensus.io v0.22.1 go.uber.org/zap v1.10.0 diff --git a/go.sum b/go.sum index f44ce026baf..434b8db559d 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:Fv9bK1Q+ly/ROk4aJsVMeuIwPel4bEnD8EPiI91nZMg= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= @@ -662,6 +663,8 @@ github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQG github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw= github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= +github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/uber/tchannel-go v1.10.0 h1:YOihLHuvkwT3nzvpgqFtexFW+pb5vD1Tz7h/bIWApgE= github.com/uber/tchannel-go v1.10.0/go.mod h1:Rrgz1eL8kMjW/nEzZos0t+Heq0O4LhnUJVA32OvWKHo= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index d5de3c92f4e..c5fcfd5c859 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -16,9 +16,11 @@ package testutils import ( "encoding/json" + "fmt" "net" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -67,3 +69,23 @@ func GetAvailablePort(t *testing.T) uint16 { return uint16(portInt) } + +// WaitForPort repeatedly attempts to open a local port until it either succeeds or 5 seconds pass +// It is useful if you need to asynchronously start a service and wait for it to start +func WaitForPort(t *testing.T, port uint16) error { + t.Helper() + + totalDuration := 5 * time.Second + wait := 100 * time.Millisecond + address := fmt.Sprintf("localhost:%d", port) + for i := totalDuration; i > 0; i -= wait { + conn, err := net.Dial("tcp", address) + + if err == nil && conn != nil { + conn.Close() + return nil + } + time.Sleep(wait) + } + return fmt.Errorf("failed to wait for port %d", port) +} diff --git a/internal/testutils/testutils_test.go b/internal/testutils/testutils_test.go index 9b07b0d8a92..ea0fde7a6d8 100644 --- a/internal/testutils/testutils_test.go +++ b/internal/testutils/testutils_test.go @@ -15,6 +15,7 @@ package testutils import ( + "fmt" "net" "strconv" "testing" @@ -33,6 +34,22 @@ func TestGetAvailablePort(t *testing.T) { testEndpointAvailable(t, "localhost:"+portStr) } +func TestWaitForPort(t *testing.T) { + port := GetAvailablePort(t) + err := WaitForPort(t, port) + require.Error(t, err) + + port = GetAvailablePort(t) + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + require.NoError(t, err) + + err = WaitForPort(t, port) + require.NoError(t, err) + + err = l.Close() + require.NoError(t, err) +} + func testEndpointAvailable(t *testing.T, endpoint string) { // Endpoint should be free. ln0, err := net.Listen("tcp", endpoint) diff --git a/receiver/README.md b/receiver/README.md index 7ed685da0db..9dd9dfa3cc3 100644 --- a/receiver/README.md +++ b/receiver/README.md @@ -107,7 +107,7 @@ This receiver receives traces in the [Jaeger](https://www.jaegertracing.io) format. It translates them into the internal format and sends it to processors and exporters. -It supports multiple protocols: +It supports the Jaeger Collector protocols: - Thrift HTTP - Thrift TChannel - gRPC @@ -120,6 +120,12 @@ receivers: jaeger: ``` +It also supports the Jaeger Agent protocols: +- Thrift Compact +- Thrift Binary + +By default, these services are not started unless an endpoint is explicitly defined. + It is possible to configure the protocols on different ports, refer to [config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config examples. diff --git a/receiver/jaegerreceiver/config_test.go b/receiver/jaegerreceiver/config_test.go index d70825b73ef..eb606091875 100644 --- a/receiver/jaegerreceiver/config_test.go +++ b/receiver/jaegerreceiver/config_test.go @@ -65,6 +65,16 @@ func TestLoadConfig(t *testing.T) { Endpoint: "0.0.0.0:123", }, }, + "thrift-compact": { + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "0.0.0.0:456", + }, + }, + "thrift-binary": { + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "0.0.0.0:789", + }, + }, }, }) diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 29fb717c2d9..b4a17463269 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -41,6 +41,8 @@ const ( // TODO https://github.com/open-telemetry/opentelemetry-collector/issues/267 // Remove ThriftTChannel support. protoThriftTChannel = "thrift-tchannel" + protoThriftBinary = "thrift-binary" + protoThriftCompact = "thrift-compact" // Default endpoints to bind to. defaultGRPCBindEndpoint = "localhost:14250" @@ -103,6 +105,8 @@ func (f *Factory) CreateTraceReceiver( protoGRPC := rCfg.Protocols[protoGRPC] protoHTTP := rCfg.Protocols[protoThriftHTTP] protoTChannel := rCfg.Protocols[protoThriftTChannel] + protoThriftCompact := rCfg.Protocols[protoThriftCompact] + protoThriftBinary := rCfg.Protocols[protoThriftBinary] config := Configuration{} var grpcServerOptions []grpc.ServerOption @@ -141,19 +145,37 @@ func (f *Factory) CreateTraceReceiver( } } - if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil) || - (config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0) { - err := fmt.Errorf("either %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver", + if protoThriftBinary != nil && protoThriftBinary.IsEnabled() { + var err error + config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint) + if err != nil { + return nil, err + } + } + + if protoThriftCompact != nil && protoThriftCompact.IsEnabled() { + var err error + config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint) + if err != nil { + return nil, err + } + } + + if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil && protoThriftBinary == nil && protoThriftCompact == nil) || + (config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) { + err := fmt.Errorf("either %v, %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver", protoGRPC, protoThriftHTTP, protoThriftTChannel, + protoThriftCompact, + protoThriftBinary, typeStr, ) return nil, err } // Create the receiver. - return New(ctx, &config, nextConsumer) + return New(ctx, &config, nextConsumer, logger) } // CreateMetricsReceiver creates a metrics receiver based on provided config. diff --git a/receiver/jaegerreceiver/factory_test.go b/receiver/jaegerreceiver/factory_test.go index 11dfc3ae30f..fde495253a1 100644 --- a/receiver/jaegerreceiver/factory_test.go +++ b/receiver/jaegerreceiver/factory_test.go @@ -23,6 +23,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configcheck" "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/receiver" ) func TestCreateDefaultConfig(t *testing.T) { @@ -75,6 +77,34 @@ func TestCreateInvalidTChannelEndpoint(t *testing.T) { assert.Error(t, err, "receiver creation with invalid tchannel endpoint must fail") } +func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig() + rCfg := cfg.(*Config) + + rCfg.Protocols[protoThriftBinary] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "", + }, + } + _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + assert.Error(t, err, "receiver creation with no endpoints must fail") +} + +func TestCreateInvalidThriftCompactEndpoint(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig() + rCfg := cfg.(*Config) + + rCfg.Protocols[protoThriftCompact] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "", + }, + } + _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + assert.Error(t, err, "receiver creation with no endpoints must fail") +} + func TestCreateNoPort(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 821b36be9bf..c8fe181a4de 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -17,6 +17,8 @@ package jaegerreceiver import ( "context" "fmt" + "net" + "net/http" "testing" "time" @@ -24,11 +26,14 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "go.opencensus.io/trace" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector/internal" + "github.com/open-telemetry/opentelemetry-collector/internal/testutils" "github.com/open-telemetry/opentelemetry-collector/receiver/receivertest" ) @@ -40,6 +45,22 @@ func TestJaegerAgentUDP_ThriftCompact_6831(t *testing.T) { }) } +func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { + port := 999999 + + config := &Configuration{ + AgentCompactThriftPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + assert.NoError(t, err, "Failed to create new Jaeger Receiver") + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.Error(t, err, "should not have been able to startTraceReception") + + jr.StopTraceReception() +} + func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { t.Skipf("Unfortunately due to Jaeger internal versioning, OpenCensus-Go's Thrift seems to conflict with ours") @@ -50,19 +71,95 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { }) } +func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { + // This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above. + port := testutils.GetAvailablePort(t) + + config := &Configuration{ + AgentBinaryThriftPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + assert.NoError(t, err, "Failed to create new Jaeger Receiver") + + mh := receivertest.NewMockHost() + err = jr.(*jReceiver).startAgent(mh) + assert.NoError(t, err, "StartTraceReception failed") + defer jr.StopTraceReception() + + l, err := net.Listen("udp", fmt.Sprintf("localhost:%d", port)) + assert.Error(t, err, "should not have been able to listen to the port") + + if l != nil { + l.Close() + } +} + +func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { + port := 999999 + + config := &Configuration{ + AgentBinaryThriftPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + assert.NoError(t, err, "Failed to create new Jaeger Receiver") + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.Error(t, err, "should not have been able to startTraceReception") + + jr.StopTraceReception() +} + +func TestJaegerHTTP(t *testing.T) { + port := testutils.GetAvailablePort(t) + config := &Configuration{ + AgentHTTPPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + assert.NoError(t, err, "Failed to create new Jaeger Receiver") + defer jr.StopTraceReception() + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.NoError(t, err, "StartTraceReception failed") + + // allow http server to start + err = testutils.WaitForPort(t, port) + assert.NoError(t, err, "WaitForPort failed") + + // this functionality is just stubbed out at the moment. just confirm they 200. + testURL := fmt.Sprintf("http://localhost:%d/sampling?service=test", port) + resp, err := http.Get(testURL) + assert.NoError(t, err, "should not have failed to make request") + if resp != nil { + assert.Equal(t, 200, resp.StatusCode, "should have returned 200") + } + + testURL = fmt.Sprintf("http://localhost:%d/sampling?service=test", port) + resp, err = http.Get(testURL) + assert.NoError(t, err, "should not have failed to make request") + if resp != nil { + assert.Equal(t, 200, resp.StatusCode, "should have returned 200") + } + + testURL = fmt.Sprintf("http://localhost:%d/baggageRestrictions?service=test", port) + resp, err = http.Get(testURL) + assert.NoError(t, err, "should not have failed to make request") + if resp != nil { + assert.Equal(t, 200, resp.StatusCode, "should have returned 200") + } +} + func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) { // 1. Create the Jaeger receiver aka "server" sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), receiverConfig, sink) - if err != nil { - t.Fatalf("Failed to create new Jaeger Receiver: %v", err) - } + jr, err := New(context.Background(), receiverConfig, sink, zap.NewNop()) + assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.StopTraceReception() mh := receivertest.NewMockHost() - if err := jr.StartTraceReception(mh); err != nil { - t.Fatalf("StartTraceReception failed: %v", err) - } + err = jr.StartTraceReception(mh) + assert.NoError(t, err, "StartTraceReception failed") now := time.Unix(1542158650, 536343000).UTC() nowPlus10min := now.Add(10 * time.Minute) @@ -81,9 +178,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu }, }, }) - if err != nil { - t.Fatalf("Failed to create the Jaeger OpenCensus exporter for the live application: %v", err) - } + assert.NoError(t, err, "Failed to create the Jaeger OpenCensus exporter for the live application") // 3. Now finally send some spans spandata := []*trace.SpanData{ @@ -220,6 +315,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu }, }, }, + SourceFormat: "jaeger", }, } diff --git a/receiver/jaegerreceiver/testdata/config.yaml b/receiver/jaegerreceiver/testdata/config.yaml index fb34391528d..68c08de2e42 100644 --- a/receiver/jaegerreceiver/testdata/config.yaml +++ b/receiver/jaegerreceiver/testdata/config.yaml @@ -15,6 +15,10 @@ receivers: endpoint: ":3456" thrift-tchannel: endpoint: "0.0.0.0:123" + thrift-compact: + endpoint: "0.0.0.0:456" + thrift-binary: + endpoint: "0.0.0.0:789" # The following demonstrates disabling the receiver. # All of the protocols need to be disabled for the receiver to be disabled. @@ -30,6 +34,10 @@ receivers: disabled: true thrift-tchannel: disabled: true + thrift-compact: + disabled: true + thrift-binary: + disabled: true # The following demonstrates specifying different endpoints. # The Jaeger receiver connects to ports on all available network interfaces. diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 249995199a2..43c384cb128 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -23,14 +23,19 @@ import ( "net/http" "sync" + apacheThrift "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" - agentapp "github.com/jaegertracing/jaeger/cmd/agent/app" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/cmd/agent/app/servers" + "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/thrift-gen/baggage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + jaegerThrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/sampling" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "github.com/uber/jaeger-lib/metrics" @@ -54,9 +59,9 @@ type Configuration struct { CollectorGRPCPort int CollectorGRPCOptions []grpc.ServerOption - AgentPort int AgentCompactThriftPort int AgentBinaryThriftPort int + AgentHTTPPort int } // Receiver type is used to receive spans that were originally intended to be sent to Jaeger. @@ -72,13 +77,15 @@ type jReceiver struct { config *Configuration - agent *agentapp.Agent - grpc *grpc.Server tchanServer *jTchannelReceiver collectorServer *http.Server + agentProcessors []processors.Processor + agentServer *http.Server + defaultAgentCtx context.Context + logger *zap.Logger } type jTchannelReceiver struct { @@ -88,6 +95,10 @@ type jTchannelReceiver struct { } const ( + defaultAgentQueueSize = 1000 + defaultAgentMaxPacketSize = 65000 + defaultAgentServerWorkers = 10 + // As per https://www.jaegertracing.io/docs/1.13/deployment/ // By default, the port used by jaeger-agent to send spans in model.proto format defaultGRPCPort = 14250 @@ -96,18 +107,11 @@ const ( // By default, can accept spans directly from clients in jaeger.thrift format over binary thrift protocol defaultCollectorHTTPPort = 14268 - // As per https://www.jaegertracing.io/docs/1.7/deployment/#agent - // 5775 UDP accept zipkin.thrift over compact thrift protocol - // 6831 UDP accept jaeger.thrift over compact thrift protocol - // 6832 UDP accept jaeger.thrift over binary thrift protocol - defaultCompactThriftUDPPort = 6831 - defaultBinaryThriftUDPPort = 6832 - traceSource string = "Jaeger" ) // New creates a TraceReceiver that receives traffic as a collector with both Thrift and HTTP transports. -func New(ctx context.Context, config *Configuration, nextConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) { +func New(ctx context.Context, config *Configuration, nextConsumer consumer.TraceConsumer, logger *zap.Logger) (receiver.TraceReceiver, error) { return &jReceiver{ config: config, defaultAgentCtx: observability.ContextWithReceiverName(context.Background(), "jaeger-agent"), @@ -115,6 +119,7 @@ func New(ctx context.Context, config *Configuration, nextConsumer consumer.Trace tchanServer: &jTchannelReceiver{ nextConsumer: nextConsumer, }, + logger: logger, }, nil } @@ -131,19 +136,6 @@ func (jr *jReceiver) collectorAddr() string { return fmt.Sprintf(":%d", port) } -const defaultAgentPort = 5778 - -func (jr *jReceiver) agentAddress() string { - var port int - if jr.config != nil { - port = jr.config.AgentPort - } - if port <= 0 { - port = defaultAgentPort - } - return fmt.Sprintf(":%d", port) -} - // TODO https://github.com/open-telemetry/opentelemetry-collector/issues/267 // Remove ThriftTChannel support. func (jr *jReceiver) tchannelAddr() string { @@ -173,23 +165,37 @@ func (jr *jReceiver) agentCompactThriftAddr() string { if jr.config != nil { port = jr.config.AgentCompactThriftPort } - if port <= 0 { - port = defaultCompactThriftUDPPort - } return fmt.Sprintf(":%d", port) } +func (jr *jReceiver) agentCompactThriftEnabled() bool { + return jr.config != nil && jr.config.AgentCompactThriftPort > 0 +} + func (jr *jReceiver) agentBinaryThriftAddr() string { var port int if jr.config != nil { port = jr.config.AgentBinaryThriftPort } - if port <= 0 { - port = defaultBinaryThriftUDPPort + return fmt.Sprintf(":%d", port) +} + +func (jr *jReceiver) agentBinaryThriftEnabled() bool { + return jr.config != nil && jr.config.AgentBinaryThriftPort > 0 +} + +func (jr *jReceiver) agentHTTPPortAddr() string { + var port int + if jr.config != nil { + port = jr.config.AgentHTTPPort } return fmt.Sprintf(":%d", port) } +func (jr *jReceiver) agentHTTPEnabled() bool { + return jr.config != nil && jr.config.AgentHTTPPort > 0 +} + func (jr *jReceiver) TraceSource() string { return traceSource } @@ -227,9 +233,14 @@ func (jr *jReceiver) stopTraceReceptionLocked() error { jr.stopOnce.Do(func() { var errs []error - if jr.agent != nil { - jr.agent.Stop() - jr.agent = nil + if jr.agentServer != nil { + if aerr := jr.agentServer.Close(); aerr != nil { + errs = append(errs, aerr) + } + jr.agentServer = nil + } + for _, processor := range jr.agentProcessors { + processor.Stop() } if jr.collectorServer != nil { @@ -302,8 +313,8 @@ func (jtr *jTchannelReceiver) SubmitBatches(ctx thrift.Context, batches []*jaege } var _ reporter.Reporter = (*jReceiver)(nil) -var _ agentapp.CollectorProxy = (*jReceiver)(nil) var _ api_v2.CollectorServiceServer = (*jReceiver)(nil) +var _ configmanager.ClientConfigManager = (*jReceiver)(nil) // EmitZipkinBatch implements cmd/agent/reporter.Reporter and it forwards // Zipkin spans received by the Jaeger agent processor. @@ -315,6 +326,7 @@ func (jr *jReceiver) EmitZipkinBatch(spans []*zipkincore.Span) error { // Jaeger spans received by the Jaeger agent processor. func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error { td, err := jaegertranslator.ThriftBatchToOCProto(batch) + td.SourceFormat = "jaeger" if err != nil { observability.RecordMetricsForTraceReceiver(jr.defaultAgentCtx, len(batch.Spans), len(batch.Spans)) return err @@ -326,14 +338,6 @@ func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error { return err } -func (jr *jReceiver) GetReporter() reporter.Reporter { - return jr -} - -func (jr *jReceiver) GetManager() configmanager.ClientConfigManager { - return jr -} - func (jr *jReceiver) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { return &sampling.SamplingStrategyResponse{}, nil } @@ -362,47 +366,60 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } func (jr *jReceiver) startAgent(_ receiver.Host) error { - processorConfigs := []agentapp.ProcessorConfiguration{ - { - // Compact Thrift running by default on 6831. - Model: "jaeger", - Protocol: "compact", - Server: agentapp.ServerConfiguration{ - HostPort: jr.agentCompactThriftAddr(), - }, - }, - { - // Binary Thrift running by default on 6832. - Model: "jaeger", - Protocol: "binary", - Server: agentapp.ServerConfiguration{ - HostPort: jr.agentBinaryThriftAddr(), - }, - }, + if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() { + return nil } - builder := agentapp.Builder{ - Processors: processorConfigs, - HTTPServer: agentapp.HTTPServerConfiguration{ - HostPort: jr.agentAddress(), - }, + if jr.agentBinaryThriftEnabled() { + processor, err := jr.buildProcessor(jr.agentBinaryThriftAddr(), apacheThrift.NewTBinaryProtocolFactoryDefault()) + if err != nil { + return err + } + jr.agentProcessors = append(jr.agentProcessors, processor) } - agent, err := builder.CreateAgent(jr, zap.NewNop(), metrics.NullFactory) - if err != nil { - return err + if jr.agentCompactThriftEnabled() { + processor, err := jr.buildProcessor(jr.agentCompactThriftAddr(), apacheThrift.NewTCompactProtocolFactory()) + if err != nil { + return err + } + jr.agentProcessors = append(jr.agentProcessors, processor) } - if err := agent.Run(); err != nil { - return err + for _, processor := range jr.agentProcessors { + go processor.Serve() } - // Otherwise no error was encountered, - jr.agent = agent + if jr.agentHTTPEnabled() { + jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPPortAddr(), jr, metrics.NullFactory) + + go func() { + if err := jr.agentServer.ListenAndServe(); err != nil { + jr.logger.Error("http server failure", zap.Error(err)) + } + }() + } return nil } +func (jr *jReceiver) buildProcessor(address string, factory apacheThrift.TProtocolFactory) (processors.Processor, error) { + handler := jaegerThrift.NewAgentProcessor(jr) + transport, err := thriftudp.NewTUDPServerTransport(address) + if err != nil { + return nil, err + } + server, err := servers.NewTBufferedServer(transport, defaultAgentQueueSize, defaultAgentMaxPacketSize, metrics.NullFactory) + if err != nil { + return nil, err + } + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, factory, handler, jr.logger) + if err != nil { + return nil, err + } + return processor, nil +} + func (jr *jReceiver) startCollector(host receiver.Host) error { tch, terr := tchannel.NewChannel("jaeger-collector", new(tchannel.ChannelOptions)) if terr != nil { diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 910236b79b4..ab1dd49aa53 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/trace" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -42,6 +43,13 @@ import ( tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" ) +func TestTraceSource(t *testing.T) { + jr, err := New(context.Background(), &Configuration{}, nil, zap.NewNop()) + assert.NoError(t, err, "should not have failed to create the Jaeger receiver") + + assert.Equal(t, traceSource, jr.TraceSource()) +} + func TestReception(t *testing.T) { // 1. Create the Jaeger receiver aka "server" config := &Configuration{ @@ -49,7 +57,7 @@ func TestReception(t *testing.T) { } sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), config, sink) + jr, err := New(context.Background(), config, sink, zap.NewNop()) defer jr.StopTraceReception() assert.NoError(t, err, "should not have failed to create the Jaeger received") @@ -100,7 +108,7 @@ func TestGRPCReception(t *testing.T) { } sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), config, sink) + jr, err := New(context.Background(), config, sink, zap.NewNop()) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.StopTraceReception() @@ -158,7 +166,7 @@ func TestGRPCReceptionWithTLS(t *testing.T) { } sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), config, sink) + jr, err := New(context.Background(), config, sink, zap.NewNop()) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.StopTraceReception() diff --git a/testbed/go.mod b/testbed/go.mod index 5b6f0f80037..960827737fa 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -11,6 +11,7 @@ require ( github.com/spf13/viper v1.4.1-0.20190911140308-99520c81d86e github.com/stretchr/testify v1.4.0 go.opencensus.io v0.22.1 + go.uber.org/zap v1.10.0 ) replace github.com/open-telemetry/opentelemetry-collector => ../ diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index c2f004c6609..5214544b0b2 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -22,6 +22,8 @@ import ( "sync" "sync/atomic" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" @@ -109,7 +111,7 @@ func (mb *MockBackend) Start(backendType BackendType) error { jaegerCfg := jaegerreceiver.Configuration{ CollectorHTTPPort: 14268, } - mb.jaegerReceiver, err = jaegerreceiver.New(context.Background(), &jaegerCfg, mb.tc) + mb.jaegerReceiver, err = jaegerreceiver.New(context.Background(), &jaegerCfg, mb.tc, zap.NewNop()) if err != nil { return err }