Skip to content

Commit

Permalink
Add Jaeger Agent Configuration (#434)
Browse files Browse the repository at this point in the history
* Added agent protocol config wiring.  Removed agent http server

Signed-off-by: Joe Elliott <[email protected]>

* Conditionally start agent protocols based on config

Signed-off-by: Joe Elliott <[email protected]>

* Bail out of startAgent if nothing is configured

Signed-off-by: Joe Elliott <[email protected]>

* Updated readme

Signed-off-by: Joe Elliott <[email protected]>

* Added tests

Signed-off-by: Joe Elliott <[email protected]>

* Removed binary test because opencensus jaeger exporter doesn't support it

Signed-off-by: Joe Elliott <[email protected]>

* Corrected test to expect jaeger format and removed redundant test

Signed-off-by: Joe Elliott <[email protected]>

* Added independently configurable agent processors

Signed-off-by: Joe Elliott <[email protected]>

* Added config tests

Signed-off-by: Joe Elliott <[email protected]>

* Added support for http agent

Signed-off-by: Joe Elliott <[email protected]>

* Fixed testbed tests

Signed-off-by: Joe Elliott <[email protected]>

* Fixed imports

Signed-off-by: Joe Elliott <[email protected]>

* Improved coverage in factory.go

Signed-off-by: Joe Elliott <[email protected]>

* Added http proxy tests

Signed-off-by: Joe Elliott <[email protected]>

* Moved location of the testdata reference to show it refers to both agent and collector protocols

Signed-off-by: Joe Elliott <[email protected]>

* Replaced hardcoded port with dynamic

Signed-off-by: Joe Elliott <[email protected]>

* Synchronously stop processors

Signed-off-by: Joe Elliott <[email protected]>

* Added testutils method to wait for a port and used it to wait for the http server

Signed-off-by: Joe Elliott <[email protected]>

* Cleaned up gross wait function

Signed-off-by: Joe Elliott <[email protected]>

* Added WaitForPort Test

Signed-off-by: Joe Elliott <[email protected]>

* Fixed testutils error.  Added baggageRestrictions test

Signed-off-by: Joe Elliott <[email protected]>

* Added tests for port in use and trace source

Signed-off-by: Joe Elliott <[email protected]>

* Pass logger to processors

Signed-off-by: Joe Elliott <[email protected]>

* Removed flaky tests

Signed-off-by: Joe Elliott <[email protected]>

* Added test to confirm binary thrift opens the right port

Signed-off-by: Joe Elliott <[email protected]>

* Added tests to to confirm invalid ports would not start

Signed-off-by: Joe Elliott <[email protected]>

* Only call startAgent to avoid startCollector race condition

Signed-off-by: Joe Elliott <[email protected]>

* Migrated assert.NoError

Signed-off-by: Joe Elliott <[email protected]>

* Consolidated similar code into a one function

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored and Paulo Janotti committed Dec 4, 2019
1 parent fe3782c commit c897290
Show file tree
Hide file tree
Showing 15 changed files with 337 additions and 91 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ receivers:
endpoint: "localhost:14268"
thrift-tchannel:
endpoint: "localhost:14267"
thrift-compact:
endpoint: "localhost:6831"
thrift-binary:
endpoint: "localhost:6832"

prometheus:
config:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/stretchr/testify v1.4.0
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
github.com/uber/tchannel-go v1.10.0
go.opencensus.io v0.22.1
go.uber.org/zap v1.10.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:Fv9bK1Q+ly/ROk4aJsVMeuIwPel4bEnD8EPiI91nZMg=
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
Expand Down Expand Up @@ -662,6 +663,8 @@ github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQG
github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw=
github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/tchannel-go v1.10.0 h1:YOihLHuvkwT3nzvpgqFtexFW+pb5vD1Tz7h/bIWApgE=
github.com/uber/tchannel-go v1.10.0/go.mod h1:Rrgz1eL8kMjW/nEzZos0t+Heq0O4LhnUJVA32OvWKHo=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
Expand Down
22 changes: 22 additions & 0 deletions internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package testutils

import (
"encoding/json"
"fmt"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -67,3 +69,23 @@ func GetAvailablePort(t *testing.T) uint16 {

return uint16(portInt)
}

// WaitForPort repeatedly attempts to open a local port until it either succeeds or 5 seconds pass
// It is useful if you need to asynchronously start a service and wait for it to start
func WaitForPort(t *testing.T, port uint16) error {
t.Helper()

totalDuration := 5 * time.Second
wait := 100 * time.Millisecond
address := fmt.Sprintf("localhost:%d", port)
for i := totalDuration; i > 0; i -= wait {
conn, err := net.Dial("tcp", address)

if err == nil && conn != nil {
conn.Close()
return nil
}
time.Sleep(wait)
}
return fmt.Errorf("failed to wait for port %d", port)
}
17 changes: 17 additions & 0 deletions internal/testutils/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package testutils

import (
"fmt"
"net"
"strconv"
"testing"
Expand All @@ -33,6 +34,22 @@ func TestGetAvailablePort(t *testing.T) {
testEndpointAvailable(t, "localhost:"+portStr)
}

func TestWaitForPort(t *testing.T) {
port := GetAvailablePort(t)
err := WaitForPort(t, port)
require.Error(t, err)

port = GetAvailablePort(t)
l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
require.NoError(t, err)

err = WaitForPort(t, port)
require.NoError(t, err)

err = l.Close()
require.NoError(t, err)
}

func testEndpointAvailable(t *testing.T, endpoint string) {
// Endpoint should be free.
ln0, err := net.Listen("tcp", endpoint)
Expand Down
8 changes: 7 additions & 1 deletion receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ This receiver receives traces in the [Jaeger](https://www.jaegertracing.io)
format. It translates them into the internal format and sends
it to processors and exporters.

It supports multiple protocols:
It supports the Jaeger Collector protocols:
- Thrift HTTP
- Thrift TChannel
- gRPC
Expand All @@ -120,6 +120,12 @@ receivers:
jaeger:
```

It also supports the Jaeger Agent protocols:
- Thrift Compact
- Thrift Binary

By default, these services are not started unless an endpoint is explicitly defined.

It is possible to configure the protocols on different ports, refer to
[config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config
examples.
Expand Down
10 changes: 10 additions & 0 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "0.0.0.0:123",
},
},
"thrift-compact": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "0.0.0.0:456",
},
},
"thrift-binary": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "0.0.0.0:789",
},
},
},
})

Expand Down
30 changes: 26 additions & 4 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
// TODO https://github.com/open-telemetry/opentelemetry-collector/issues/267
// Remove ThriftTChannel support.
protoThriftTChannel = "thrift-tchannel"
protoThriftBinary = "thrift-binary"
protoThriftCompact = "thrift-compact"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "localhost:14250"
Expand Down Expand Up @@ -103,6 +105,8 @@ func (f *Factory) CreateTraceReceiver(
protoGRPC := rCfg.Protocols[protoGRPC]
protoHTTP := rCfg.Protocols[protoThriftHTTP]
protoTChannel := rCfg.Protocols[protoThriftTChannel]
protoThriftCompact := rCfg.Protocols[protoThriftCompact]
protoThriftBinary := rCfg.Protocols[protoThriftBinary]

config := Configuration{}
var grpcServerOptions []grpc.ServerOption
Expand Down Expand Up @@ -141,19 +145,37 @@ func (f *Factory) CreateTraceReceiver(
}
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0) {
err := fmt.Errorf("either %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
if protoThriftBinary != nil && protoThriftBinary.IsEnabled() {
var err error
config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint)
if err != nil {
return nil, err
}
}

if protoThriftCompact != nil && protoThriftCompact.IsEnabled() {
var err error
config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint)
if err != nil {
return nil, err
}
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil && protoThriftBinary == nil && protoThriftCompact == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) {
err := fmt.Errorf("either %v, %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
protoGRPC,
protoThriftHTTP,
protoThriftTChannel,
protoThriftCompact,
protoThriftBinary,
typeStr,
)
return nil, err
}

// Create the receiver.
return New(ctx, &config, nextConsumer)
return New(ctx, &config, nextConsumer, logger)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
Expand Down
30 changes: 30 additions & 0 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/receiver"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -75,6 +77,34 @@ func TestCreateInvalidTChannelEndpoint(t *testing.T) {
assert.Error(t, err, "receiver creation with invalid tchannel endpoint must fail")
}

func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols[protoThriftBinary] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "",
},
}
_, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Error(t, err, "receiver creation with no endpoints must fail")
}

func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols[protoThriftCompact] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "",
},
}
_, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Error(t, err, "receiver creation with no endpoints must fail")
}

func TestCreateNoPort(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
Expand Down
Loading

0 comments on commit c897290

Please sign in to comment.