Skip to content

Commit

Permalink
Added HTTP reporter with support for sending auth credentials
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Nov 10, 2017
1 parent a2ed9b8 commit 846ac77
Show file tree
Hide file tree
Showing 7 changed files with 713 additions and 15 deletions.
87 changes: 72 additions & 15 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"github.com/apache/thrift/lib/go/thrift"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/uber/jaeger/cmd/agent/app/httpserver"
"github.com/uber/jaeger/cmd/agent/app/processors"
"github.com/uber/jaeger/cmd/agent/app/reporter"
tchreporter "github.com/uber/jaeger/cmd/agent/app/reporter/tchannel"
httpReporter "github.com/uber/jaeger/cmd/agent/app/reporter/http"
tcReporter "github.com/uber/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/uber/jaeger/cmd/agent/app/servers"
"github.com/uber/jaeger/cmd/agent/app/servers/thriftudp"
jmetrics "github.com/uber/jaeger/pkg/metrics"
Expand Down Expand Up @@ -76,7 +76,13 @@ type Builder struct {
DiscoveryMinPeers int `yaml:"minPeers"`
CollectorServiceName string `yaml:"collectorServiceName"`

tchreporter.Builder
tcReporter.Builder

// These fields are copied from http.Builder because yaml does not parse embedded structs
Scheme string `yaml:"scheme"`
AuthToken string `yaml:"authToken"`
Username string `yaml:"username"`
Password string `yaml:"password"`

otherReporters []reporter.Reporter
metricsFactory metrics.Factory
Expand Down Expand Up @@ -114,17 +120,52 @@ func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder {
return b
}

func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (*tchreporter.Reporter, error) {
if len(b.Builder.CollectorHostPorts) == 0 {
func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (reporter.Reporter, error) {
if b.useTChannelReporter() {
logger.Info("Using TChannel to report spans to the Collector")
return b.createTChannelMainReporter(mFactory, logger)
}

logger.Info("Using HTTP to report spans to the Collector")
return b.createHTTPMainReporter(mFactory, logger)
}

func (b *Builder) createHTTPMainReporter(mFactory metrics.Factory, logger *zap.Logger) (reporter.Reporter, error) {
hrBuilder := httpReporter.NewBuilder()

if b.Scheme != "" {
hrBuilder.WithScheme(b.Scheme)
}

if len(b.CollectorHostPorts) > 0 {
hrBuilder.WithCollectorHostPorts(b.CollectorHostPorts)
} else {
return nil, fmt.Errorf(`no "CollectorHostPorts" specified`)
}

if b.AuthToken != "" {
hrBuilder.WithAuthToken(b.AuthToken)
}

if b.Username != "" && b.Password != "" {
hrBuilder.WithUsername(b.Username)
hrBuilder.WithPassword(b.Password)
}

return hrBuilder.CreateReporter(mFactory, logger)
}

func (b *Builder) createTChannelMainReporter(mFactory metrics.Factory, logger *zap.Logger) (reporter.Reporter, error) {
if len(b.CollectorHostPorts) == 0 {
b.Builder.CollectorHostPorts = b.CollectorHostPorts
}
if b.Builder.CollectorServiceName == "" {
if b.CollectorServiceName == "" {
b.Builder.CollectorServiceName = b.CollectorServiceName
}
if b.Builder.DiscoveryMinPeers == 0 {
if b.DiscoveryMinPeers == 0 {
b.Builder.DiscoveryMinPeers = b.DiscoveryMinPeers
}
return b.Builder.CreateReporter(mFactory, logger)
return b.CreateReporter(mFactory, logger)
}

func (b *Builder) getMetricsFactory() (metrics.Factory, error) {
Expand All @@ -144,7 +185,7 @@ func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) {
if err != nil {
return nil, errors.Wrap(err, "cannot create main Reporter")
}
var rep reporter.Reporter = mainReporter
var rep = mainReporter
if len(b.otherReporters) > 0 {
reps := append([]reporter.Reporter{mainReporter}, b.otherReporters...)
rep = reporter.NewMultiReporter(reps...)
Expand All @@ -153,7 +194,7 @@ func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) {
if err != nil {
return nil, err
}
httpServer := b.HTTPServer.GetHTTPServer(b.CollectorServiceName, mainReporter.Channel(), mFactory)
httpServer := b.GetHTTPServer(mainReporter, mFactory)
if b.metricsFactory == nil {
b.Metrics.RegisterHandler(httpServer.Handler.(*http.ServeMux))
}
Expand Down Expand Up @@ -190,13 +231,29 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory)
return retMe, nil
}

func (b *Builder) useTChannelReporter() bool {
// if we don't have credentials, we use the tchannel reporter
// if we have an auth token or a pair of username+password, we should use the http reporter
return b.AuthToken == "" && (b.Username == "" || b.Password == "")
}

// GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries.
func (c HTTPServerConfiguration) GetHTTPServer(svc string, channel *tchannel.Channel, mFactory metrics.Factory) *http.Server {
mgr := httpserver.NewCollectorProxy(svc, channel, mFactory)
if c.HostPort == "" {
c.HostPort = defaultHTTPServerHostPort
func (b *Builder) GetHTTPServer(r reporter.Reporter, mFactory metrics.Factory) *http.Server {
var mgr httpserver.ClientConfigManager
if b.useTChannelReporter() {
channel := r.(*tcReporter.Reporter).Channel()
mgr = httpserver.NewCollectorProxy(b.CollectorServiceName, channel, mFactory)
} else {
// TODO: this manager is used for the sampling and baggage restrictions, not sure we need for this here:
// is there a non-tchannel sampling/baggage restriction endpoint on the collector side?
mgr = nil
}
return httpserver.NewHTTPServer(c.HostPort, mgr, mFactory)

if b.HTTPServer.HostPort == "" {
b.HTTPServer.HostPort = defaultHTTPServerHostPort
}

return httpserver.NewHTTPServer(b.HTTPServer.HostPort, mgr, mFactory)
}

// GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration
Expand Down
97 changes: 97 additions & 0 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/uber/jaeger/cmd/agent/app/reporter/http"
"github.com/uber/jaeger/thrift-gen/jaeger"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)
Expand Down Expand Up @@ -177,6 +178,102 @@ func TestBuilderWithProcessorErrors(t *testing.T) {
}
}

func TestBuilderWithAuthToken(t *testing.T) {
y := `
collectorHostPorts:
- 127.0.0.1:14267
authToken: the-token
`
cfg := Builder{}
err := yaml.Unmarshal([]byte(y), &cfg)
assert.NoError(t, err)
a, err := cfg.createMainReporter(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

_, ok := a.(*http.Reporter)
assert.True(t, ok)
}

func TestBuilderWithUsernameAndPassword(t *testing.T) {
y := `
collectorHostPorts:
- 127.0.0.1:14267
username: jdoe
password: password
`
cfg := Builder{}
err := yaml.Unmarshal([]byte(y), &cfg)
assert.NoError(t, err)
a, err := cfg.createMainReporter(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

_, ok := a.(*http.Reporter)
assert.True(t, ok)
}

func TestBuilderWithHTTPS(t *testing.T) {
y := `
collectorHostPorts:
- 127.0.0.1:14267
authToken: the-token
scheme: https
`
cfg := Builder{}
err := yaml.Unmarshal([]byte(y), &cfg)
assert.NoError(t, err)
a, err := cfg.createMainReporter(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

r, ok := a.(*http.Reporter)
assert.Equal(t, "https://127.0.0.1:14267", r.Endpoint())
assert.True(t, ok)
}

func TestBuilderWithMultipleCollectorHostPortsForHTTP(t *testing.T) {
y := `
collectorHostPorts:
- 127.0.0.1:14267
- 127.0.0.1:14268
- 127.0.0.1:14269
authToken: the-token
`
cfg := Builder{}
err := yaml.Unmarshal([]byte(y), &cfg)
assert.NoError(t, err)
a, err := cfg.createMainReporter(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

r, ok := a.(*http.Reporter)
assert.Equal(t, "http://127.0.0.1:14267", r.Endpoint())
assert.True(t, ok)
}

func TestBuilderWithIncorrectURLFormatForCollectorHostPort(t *testing.T) {
y := `
collectorHostPorts:
- https://127.0.0.1:14267/some/path
authToken: the-token
`
cfg := Builder{}
err := yaml.Unmarshal([]byte(y), &cfg)
assert.NoError(t, err)
a, err := cfg.createMainReporter(metrics.NullFactory, zap.NewNop())
assert.NoError(t, err)

r, ok := a.(*http.Reporter)
assert.Equal(t, "https://127.0.0.1:14267", r.Endpoint())
assert.True(t, ok)
}

func TestBuilderWithoutCollectorHostsPorts(t *testing.T) {
y := "authToken: the-token"
cfg := &Builder{}
err := yaml.Unmarshal([]byte(y), &cfg)
agent, err := cfg.CreateAgent(zap.NewNop())
assert.EqualError(t, err, `cannot create main Reporter: no "CollectorHostPorts" specified`)
assert.Nil(t, agent)
}

type fakeReporter struct{}

func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
Expand Down
24 changes: 24 additions & 0 deletions cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
suffixServerMaxPacketSize = "server-max-packet-size"
suffixServerHostPort = "server-host-port"
collectorHostPort = "collector.host-port"
scheme = "collector.scheme"
authToken = "collector.auth-token"
username = "collector.username"
password = "collector.password"
httpServerHostPort = "http-server.host-port"
discoveryMinPeers = "discovery.min-peers"
)
Expand Down Expand Up @@ -55,6 +59,22 @@ func AddFlags(flags *flag.FlagSet) {
collectorHostPort,
"",
"comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)")
flags.String(
scheme,
"http",
"protocol scheme to use when talking to the collector")
flags.String(
authToken,
"",
"token to send in a Bearer auth to the collector ")
flags.String(
username,
"",
"username to send in a Basic auth to the collector ")
flags.String(
password,
"",
"password to send in a Basic auth to the collector")
flags.String(
httpServerHostPort,
defaultHTTPServerHostPort,
Expand Down Expand Up @@ -82,6 +102,10 @@ func (b *Builder) InitFromViper(v *viper.Viper) {
if len(v.GetString(collectorHostPort)) > 0 {
b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",")
}
b.Scheme = v.GetString(scheme)
b.AuthToken = v.GetString(authToken)
b.Username = v.GetString(username)
b.Password = v.GetString(password)
b.HTTPServer.HostPort = v.GetString(httpServerHostPort)
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
}
70 changes: 70 additions & 0 deletions cmd/agent/app/reporter/http/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2017 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
)

// Builder Struct to hold configurations
type Builder struct {
scheme string
collectorHostPorts []string `yaml:"collectorHostPorts"`

authToken string
username string
password string
}

// NewBuilder creates a new reporter builder.
func NewBuilder() *Builder {
return &Builder{}
}

// WithCollectorHostPorts sets the collectors hosts and ports to use
func (b *Builder) WithCollectorHostPorts(s []string) *Builder {
b.collectorHostPorts = s
return b
}

// WithScheme sets the protocol to use
func (b *Builder) WithScheme(s string) *Builder {
b.scheme = s
return b
}

// WithAuthToken sets the authentication token to use
func (b *Builder) WithAuthToken(s string) *Builder {
b.authToken = s
return b
}

// WithUsername sets the username to use
func (b *Builder) WithUsername(s string) *Builder {
b.username = s
return b
}

// WithPassword sets the password token to use
func (b *Builder) WithPassword(s string) *Builder {
b.password = s
return b
}

// CreateReporter creates the a reporter based on the configuration
func (b *Builder) CreateReporter(mFactory metrics.Factory, logger *zap.Logger) (*Reporter, error) {
return New(b.scheme, b.collectorHostPorts, b.authToken, b.username, b.password, mFactory, logger), nil
}
Loading

0 comments on commit 846ac77

Please sign in to comment.