From b8760e2485aa7a3ad04f025b4fe7426e4a47ab0d Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 4 Mar 2019 15:05:03 +0100 Subject: [PATCH 1/2] Allow thrift reporter even if grpc hosts are not provided Signed-off-by: Pavol Loffay --- cmd/agent/app/builder.go | 32 +++++++++ cmd/agent/app/builder_test.go | 88 ++++++++++++++++++++++++ cmd/agent/app/reporter/metrics_test.go | 3 +- cmd/agent/app/reporter/tchannel/flags.go | 2 +- cmd/agent/main.go | 22 +----- cmd/all-in-one/main.go | 22 +----- 6 files changed, 125 insertions(+), 44 deletions(-) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 0eae51559c4..f463eff6a47 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -16,17 +16,22 @@ package app import ( "fmt" + "io/ioutil" "net/http" + "os" "github.com/apache/thrift/lib/go/thrift" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "google.golang.org/grpc/grpclog" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" zipkinThrift "github.com/jaegertracing/jaeger/thrift-gen/agent" @@ -209,3 +214,30 @@ func defaultInt(value int, defaultVal int) int { } return value } + +// CreateCollectorProxy creates collector proxy +func CreateCollectorProxy( + opts *reporter.Options, + tchanRep *tchannel.Builder, + grpcRepOpts *grpc.Options, + logger *zap.Logger, + mFactory metrics.Factory, +) (CollectorProxy, error) { + // GRPC type is set as default in viper, but we check for legacy flags + // to keep backward compatibility + if opts.ReporterType == reporter.GRPC && + len(tchanRep.CollectorHostPorts) > 0 && + len(grpcRepOpts.CollectorHostPort) == 0 { + logger.Warn("Using deprecated configuration", zap.String("option", "--collector-host.port")) + return tchannel.NewCollectorProxy(tchanRep, mFactory, logger) + } + switch opts.ReporterType { + case reporter.GRPC: + grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) + return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger) + case reporter.TCHANNEL: + return tchannel.NewCollectorProxy(tchanRep, mFactory, logger) + default: + return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(opts.ReporterType))) + } +} diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index e5d7b24377a..518126b7af7 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -16,16 +16,23 @@ package app import ( "errors" + "flag" "fmt" "strings" "testing" + "time" + "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" + "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" "gopkg.in/yaml.v2" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" "github.com/jaegertracing/jaeger/thrift-gen/baggage" @@ -179,3 +186,84 @@ func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.S func (fakeCollectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) { return nil, nil } + + +func TestCreateCollectorProxy(t *testing.T) { + tests := []struct{ + flags []string + err string + metric metricstest.ExpectedMetric + }{ + { + err: "could not create collector proxy, address is missing", + }, + { + flags: []string{"--collector.host-port=foo"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + }, + { + flags: []string{"--reporter.type=tchannel"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + }, + { + flags: []string{"--reporter.type=tchannel", "--collector.host-port=foo"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + }, + { + flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + }, + { + flags: []string{"--reporter.type=grpc", "--collector.host-port=foo"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "tchannel", "format": "jaeger"}, Value: 1}, + }, + { + flags: []string{"--reporter.type=grpc", "--reporter.grpc.host-port=foo", "--collector.host-port=foo"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "grpc", "format": "jaeger"}, Value: 1}, + }, + { + flags: []string{"--reporter.type=grpc", "--reporter.grpc.host-port=foo"}, + metric: metricstest.ExpectedMetric{Name: "reporter.batches.failures", Tags: map[string]string{"protocol": "grpc", "format": "jaeger"}, Value: 1}, + }, + } + + for _, test := range tests { + flags := &flag.FlagSet{} + tchannel.AddFlags(flags) + grpc.AddFlags(flags) + reporter.AddFlags(flags) + + command := cobra.Command{} + command.PersistentFlags().AddGoFlagSet(flags) + v := viper.New() + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags(test.flags) + require.NoError(t, err) + + rOpts := new(reporter.Options).InitFromViper(v) + tchan := tchannel.NewBuilder().InitFromViper(v, zap.NewNop()) + grpcOpts := new(grpc.Options).InitFromViper(v) + + metricsFactory := metricstest.NewFactory(time.Microsecond) + proxy, err := CreateCollectorProxy(rOpts, tchan, grpcOpts, zap.NewNop(), metricsFactory) + if test.err != "" { + assert.EqualError(t, err, test.err) + assert.Nil(t, proxy) + } else { + require.NoError(t, err) + proxy.GetReporter().EmitBatch(jaeger.NewBatch()) + metricsFactory.AssertCounterMetrics(t, test.metric) + } + } +} + +func TestCreateCollectorProxy_UnknownReporter(t *testing.T) { + rOpts := new(reporter.Options) + tchan := tchannel.NewBuilder() + grpcOpts := new(grpc.Options) + + proxy, err := CreateCollectorProxy(rOpts, tchan, grpcOpts, zap.NewNop(), metrics.NullFactory) + assert.Nil(t, proxy) + assert.EqualError(t, err, "unknown reporter type ") +} diff --git a/cmd/agent/app/reporter/metrics_test.go b/cmd/agent/app/reporter/metrics_test.go index 151488994a3..d8abac22e5c 100644 --- a/cmd/agent/app/reporter/metrics_test.go +++ b/cmd/agent/app/reporter/metrics_test.go @@ -104,7 +104,8 @@ func TestMetricsReporter(t *testing.T) { err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}}) require.Error(t, err) }, rep: &noopReporter{err: errors.New("foo")}}, - {expectedCounters: []metricstest.ExpectedMetric{ + {expectedCounters: + []metricstest.ExpectedMetric{ {Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1}, {Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2}, }, expectedGauges: []metricstest.ExpectedMetric{ diff --git a/cmd/agent/app/reporter/tchannel/flags.go b/cmd/agent/app/reporter/tchannel/flags.go index 654ebb6a688..f383a2590a8 100644 --- a/cmd/agent/app/reporter/tchannel/flags.go +++ b/cmd/agent/app/reporter/tchannel/flags.go @@ -51,7 +51,7 @@ func AddFlags(flags *flag.FlagSet) { tchannelPrefix+reportTimeout, time.Second, "sets the timeout used when reporting spans") - // TODO remove deprecated in 1.9 + // TODO remove deprecated in 2.0 flags.String( collectorHostPort, "", diff --git a/cmd/agent/main.go b/cmd/agent/main.go index fc7e93258a7..5feaf376d19 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -17,7 +17,6 @@ package main import ( "fmt" "io" - "io/ioutil" "net/http" "os" "os/signal" @@ -28,7 +27,6 @@ import ( "github.com/spf13/viper" jMetrics "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "google.golang.org/grpc/grpclog" "github.com/jaegertracing/jaeger/cmd/agent/app" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" @@ -73,7 +71,7 @@ func main() { rOpts := new(reporter.Options).InitFromViper(v) tChanOpts := new(tchannel.Builder).InitFromViper(v, logger) grpcOpts := new(grpc.Options).InitFromViper(v) - cp, err := createCollectorProxy(rOpts, tChanOpts, grpcOpts, logger, mFactory) + cp, err := app.CreateCollectorProxy(rOpts, tChanOpts, grpcOpts, logger, mFactory) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) } @@ -123,21 +121,3 @@ func main() { os.Exit(1) } } - -func createCollectorProxy( - opts *reporter.Options, - tchanRep *tchannel.Builder, - grpcRepOpts *grpc.Options, - logger *zap.Logger, - mFactory jMetrics.Factory, -) (app.CollectorProxy, error) { - switch opts.ReporterType { - case reporter.GRPC: - grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr)) - return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger) - case reporter.TCHANNEL: - return tchannel.NewCollectorProxy(tchanRep, mFactory, logger) - default: - return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(opts.ReporterType))) - } -} diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index a0c0e87ff95..70ec5aa8510 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -197,7 +197,7 @@ func startAgent( ) { metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) - cp, err := createCollectorProxy(cOpts, repOpts, tchanRep, grpcRepOpts, logger, metricsFactory) + cp, err := agentApp.CreateCollectorProxy(repOpts, tchanRep, grpcRepOpts, logger, metricsFactory) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) } @@ -213,26 +213,6 @@ func startAgent( } } -func createCollectorProxy( - cOpts *collector.CollectorOptions, - repOpts *agentRep.Options, - tchanRepOpts *agentTchanRep.Builder, - grpcRepOpts *agentGrpcRep.Options, - logger *zap.Logger, - mFactory metrics.Factory, -) (agentApp.CollectorProxy, error) { - switch repOpts.ReporterType { - case agentRep.GRPC: - grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)) - return agentGrpcRep.NewCollectorProxy(grpcRepOpts, mFactory, logger) - case agentRep.TCHANNEL: - tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) - return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger) - default: - return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(repOpts.ReporterType))) - } -} - func startCollector( cOpts *collector.CollectorOptions, spanWriter spanstore.Writer, From eab8541f953d93cc40736e34a7003963dd32a5d6 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 4 Mar 2019 15:41:41 +0100 Subject: [PATCH 2/2] Fix all-in-one Signed-off-by: Pavol Loffay --- cmd/all-in-one/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 70ec5aa8510..765e3e9bcae 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -197,6 +197,7 @@ func startAgent( ) { metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) + grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)) cp, err := agentApp.CreateCollectorProxy(repOpts, tchanRep, grpcRepOpts, logger, metricsFactory) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err))