Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Jaeger Agent Configuration #434

Merged
merged 29 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6640ea1
Added agent protocol config wiring. Removed agent http server
joe-elliott Nov 22, 2019
46dbf35
Conditionally start agent protocols based on config
joe-elliott Nov 22, 2019
c705302
Bail out of startAgent if nothing is configured
joe-elliott Nov 22, 2019
fd4799f
Updated readme
joe-elliott Nov 22, 2019
b131c9b
Added tests
joe-elliott Nov 23, 2019
88a60d0
Removed binary test because opencensus jaeger exporter doesn't suppor…
joe-elliott Nov 23, 2019
c030827
Corrected test to expect jaeger format and removed redundant test
joe-elliott Nov 23, 2019
5b284b7
Added independently configurable agent processors
joe-elliott Nov 25, 2019
8c024bc
Added config tests
joe-elliott Nov 25, 2019
c945fce
Added support for http agent
joe-elliott Nov 25, 2019
a734960
Fixed testbed tests
joe-elliott Nov 25, 2019
073410c
Fixed imports
joe-elliott Nov 25, 2019
96a6fd9
Improved coverage in factory.go
joe-elliott Nov 25, 2019
12937ff
Added http proxy tests
joe-elliott Nov 25, 2019
dd99d1e
Moved location of the testdata reference to show it refers to both ag…
joe-elliott Nov 26, 2019
1e2623e
Replaced hardcoded port with dynamic
joe-elliott Nov 26, 2019
78dcc94
Synchronously stop processors
joe-elliott Nov 26, 2019
5a3ce04
Added testutils method to wait for a port and used it to wait for the…
joe-elliott Nov 26, 2019
8657535
Cleaned up gross wait function
joe-elliott Nov 27, 2019
3a10dfb
Added WaitForPort Test
joe-elliott Nov 27, 2019
651ea96
Fixed testutils error. Added baggageRestrictions test
joe-elliott Nov 27, 2019
787d867
Added tests for port in use and trace source
joe-elliott Nov 27, 2019
899a22c
Pass logger to processors
joe-elliott Nov 27, 2019
c9a1e78
Removed flaky tests
joe-elliott Nov 27, 2019
18b41d6
Added test to confirm binary thrift opens the right port
joe-elliott Nov 27, 2019
234fdb5
Added tests to to confirm invalid ports would not start
joe-elliott Nov 27, 2019
511ead5
Only call startAgent to avoid startCollector race condition
joe-elliott Nov 27, 2019
ca2d038
Migrated assert.NoError
joe-elliott Dec 4, 2019
37dae1d
Consolidated similar code into a one function
joe-elliott Dec 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ receivers:
endpoint: "localhost:14268"
thrift-tchannel:
endpoint: "localhost:14267"
thrift-compact:
endpoint: "localhost:6831"
thrift-binary:
endpoint: "localhost:6832"

prometheus:
config:
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:Fv9bK1Q+ly/ROk4aJsVMeuIwPel4bEnD8EPiI91nZMg=
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
Expand Down
22 changes: 22 additions & 0 deletions internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package testutils

import (
"encoding/json"
"fmt"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -67,3 +69,23 @@ func GetAvailablePort(t *testing.T) uint16 {

return uint16(portInt)
}

// WaitForPort repeatedly attempts to open a local port until it either succeeds or 5 seconds pass
// It is useful if you need to asynchronously start a service and wait for it to start
func WaitForPort(t *testing.T, port uint16) error {
t.Helper()

totalDuration := 5 * time.Second
wait := 100 * time.Millisecond
address := fmt.Sprintf("localhost:%d", port)
for i := totalDuration; i > 0; i -= wait {
conn, err := net.Dial("tcp", address)

if err == nil && conn != nil {
conn.Close()
return nil
}
time.Sleep(wait)
}
return fmt.Errorf("failed to wait for port %d", port)
}
17 changes: 17 additions & 0 deletions internal/testutils/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package testutils

import (
"fmt"
"net"
"strconv"
"testing"
Expand All @@ -33,6 +34,22 @@ func TestGetAvailablePort(t *testing.T) {
testEndpointAvailable(t, "localhost:"+portStr)
}

func TestWaitForPort(t *testing.T) {
port := GetAvailablePort(t)
err := WaitForPort(t, port)
require.Error(t, err)

port = GetAvailablePort(t)
l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
require.NoError(t, err)

err = WaitForPort(t, port)
require.NoError(t, err)

err = l.Close()
require.NoError(t, err)
}

func testEndpointAvailable(t *testing.T, endpoint string) {
// Endpoint should be free.
ln0, err := net.Listen("tcp", endpoint)
Expand Down
8 changes: 7 additions & 1 deletion receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ This receiver receives traces in the [Jaeger](https://www.jaegertracing.io)
format. It translates them into the internal format and sends
it to processors and exporters.

It supports multiple protocols:
It supports the Jaeger Collector protocols:
- Thrift HTTP
- Thrift TChannel
- gRPC
Expand All @@ -120,6 +120,12 @@ receivers:
jaeger:
```

It also supports the Jaeger Agent protocols:
- Thrift Compact
- Thrift Binary

By default, these services are not started unless an endpoint is explicitly defined.

It is possible to configure the protocols on different ports, refer to
[config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config
examples.
Expand Down
10 changes: 10 additions & 0 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "0.0.0.0:123",
},
},
"thrift-compact": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "0.0.0.0:456",
},
},
"thrift-binary": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "0.0.0.0:789",
},
},
},
})

Expand Down
30 changes: 26 additions & 4 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
// TODO https://github.com/open-telemetry/opentelemetry-collector/issues/267
// Remove ThriftTChannel support.
protoThriftTChannel = "thrift-tchannel"
protoThriftBinary = "thrift-binary"
protoThriftCompact = "thrift-compact"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "localhost:14250"
Expand Down Expand Up @@ -103,6 +105,8 @@ func (f *Factory) CreateTraceReceiver(
protoGRPC := rCfg.Protocols[protoGRPC]
protoHTTP := rCfg.Protocols[protoThriftHTTP]
protoTChannel := rCfg.Protocols[protoThriftTChannel]
protoThriftCompact := rCfg.Protocols[protoThriftCompact]
protoThriftBinary := rCfg.Protocols[protoThriftBinary]

config := Configuration{}
var grpcServerOptions []grpc.ServerOption
Expand Down Expand Up @@ -141,19 +145,37 @@ func (f *Factory) CreateTraceReceiver(
}
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0) {
err := fmt.Errorf("either %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
if protoThriftBinary != nil && protoThriftBinary.IsEnabled() {
var err error
config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint)
if err != nil {
return nil, err
}
}

if protoThriftCompact != nil && protoThriftCompact.IsEnabled() {
var err error
config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint)
if err != nil {
return nil, err
}
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil && protoThriftBinary == nil && protoThriftCompact == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) {
err := fmt.Errorf("either %v, %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
protoGRPC,
protoThriftHTTP,
protoThriftTChannel,
protoThriftCompact,
protoThriftBinary,
typeStr,
)
return nil, err
}

// Create the receiver.
return New(ctx, &config, nextConsumer)
return New(ctx, &config, nextConsumer, logger)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
Expand Down
30 changes: 30 additions & 0 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/receiver"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -75,6 +77,34 @@ func TestCreateInvalidTChannelEndpoint(t *testing.T) {
assert.Error(t, err, "receiver creation with invalid tchannel endpoint must fail")
}

func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols[protoThriftBinary] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "",
},
}
_, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Error(t, err, "receiver creation with no endpoints must fail")
}

func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols[protoThriftCompact] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "",
},
}
_, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Error(t, err, "receiver creation with no endpoints must fail")
}

func TestCreateNoPort(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
Expand Down
114 changes: 113 additions & 1 deletion receiver/jaegerreceiver/jaeger_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ package jaegerreceiver
import (
"context"
"fmt"
"net"
"net/http"
"testing"
"time"

"contrib.go.opencensus.io/exporter/jaeger"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"go.opencensus.io/trace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/internal"
"github.com/open-telemetry/opentelemetry-collector/internal/testutils"
"github.com/open-telemetry/opentelemetry-collector/receiver/receivertest"
)

Expand All @@ -40,6 +45,24 @@ func TestJaegerAgentUDP_ThriftCompact_6831(t *testing.T) {
})
}

func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) {
port := 999999

config := &Configuration{
AgentCompactThriftPort: int(port),
}
jr, err := New(context.Background(), config, nil, zap.NewNop())
if err != nil {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("Failed to create new Jaeger Receiver: %v", err)
}

mh := receivertest.NewMockHost()
err = jr.StartTraceReception(mh)
assert.Error(t, err, "should not have been able to startTraceReception")

jr.StopTraceReception()
}

func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) {
t.Skipf("Unfortunately due to Jaeger internal versioning, OpenCensus-Go's Thrift seems to conflict with ours")

Expand All @@ -50,10 +73,98 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) {
})
}

func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) {
// This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above.
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
port := testutils.GetAvailablePort(t)

config := &Configuration{
AgentBinaryThriftPort: int(port),
}
jr, err := New(context.Background(), config, nil, zap.NewNop())
if err != nil {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("Failed to create new Jaeger Receiver: %v", err)
}

mh := receivertest.NewMockHost()
if err := jr.(*jReceiver).startAgent(mh); err != nil {
t.Fatalf("StartTraceReception failed: %v", err)
}
defer jr.StopTraceReception()

l, err := net.Listen("udp", fmt.Sprintf("localhost:%d", port))
assert.Error(t, err, "should not have been able to listen to the port")

if l != nil {
l.Close()
}
}

func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
port := 999999

config := &Configuration{
AgentBinaryThriftPort: int(port),
}
jr, err := New(context.Background(), config, nil, zap.NewNop())
if err != nil {
t.Fatalf("Failed to create new Jaeger Receiver: %v", err)
}

mh := receivertest.NewMockHost()
err = jr.StartTraceReception(mh)
assert.Error(t, err, "should not have been able to startTraceReception")

jr.StopTraceReception()
}

func TestJaegerHTTP(t *testing.T) {
port := testutils.GetAvailablePort(t)
config := &Configuration{
AgentHTTPPort: int(port),
}
jr, err := New(context.Background(), config, nil, zap.NewNop())
if err != nil {
t.Fatalf("Failed to create new Jaeger Receiver: %v", err)
}
defer jr.StopTraceReception()

mh := receivertest.NewMockHost()
if err := jr.StartTraceReception(mh); err != nil {
t.Fatalf("StartTraceReception failed: %v", err)
}

// allow http server to start
if err := testutils.WaitForPort(t, port); err != nil {
t.Fatalf("WaitForPort failed: %v", err)
}

// this functionality is just stubbed out at the moment. just confirm they 200.
testURL := fmt.Sprintf("http://localhost:%d/sampling?service=test", port)
resp, err := http.Get(testURL)
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}

testURL = fmt.Sprintf("http://localhost:%d/sampling?service=test", port)
resp, err = http.Get(testURL)
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}

testURL = fmt.Sprintf("http://localhost:%d/baggageRestrictions?service=test", port)
resp, err = http.Get(testURL)
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}
}

func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) {
// 1. Create the Jaeger receiver aka "server"
sink := new(exportertest.SinkTraceExporter)
jr, err := New(context.Background(), receiverConfig, sink)
jr, err := New(context.Background(), receiverConfig, sink, zap.NewNop())
if err != nil {
t.Fatalf("Failed to create new Jaeger Receiver: %v", err)
}
Expand Down Expand Up @@ -220,6 +331,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu
},
},
},
SourceFormat: "jaeger",
},
}

Expand Down
Loading