diff --git a/Gopkg.lock b/Gopkg.lock index db54b701d8b..7687d36c5da 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -982,6 +982,7 @@ "google.golang.org/grpc", "google.golang.org/grpc/balancer/roundrobin", "google.golang.org/grpc/codes", + "google.golang.org/grpc/credentials", "google.golang.org/grpc/grpclog", "google.golang.org/grpc/resolver", "google.golang.org/grpc/resolver/manual", diff --git a/cmd/agent/app/flags_test.go b/cmd/agent/app/flags_test.go index 9a271f0287a..7ea6738ef5f 100644 --- a/cmd/agent/app/flags_test.go +++ b/cmd/agent/app/flags_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestBingFlags(t *testing.T) { +func TestBindFlags(t *testing.T) { v := viper.New() b := &Builder{} command := cobra.Command{} diff --git a/cmd/agent/app/reporter/flags_test.go b/cmd/agent/app/reporter/flags_test.go index e47b236581d..354a240327a 100644 --- a/cmd/agent/app/reporter/flags_test.go +++ b/cmd/agent/app/reporter/flags_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestBingFlags(t *testing.T) { +func TestBindFlags(t *testing.T) { v := viper.New() command := cobra.Command{} flags := &flag.FlagSet{} diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index f8b0eb97ccf..fabd2ef6c7b 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -15,6 +15,7 @@ package grpc import ( + "crypto/x509" "errors" "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -22,6 +23,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -37,11 +39,34 @@ type ProxyBuilder struct { conn *grpc.ClientConn } +var systemCertPool = x509.SystemCertPool // to allow overriding in unit test + // NewCollectorProxy creates ProxyBuilder func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { if len(o.CollectorHostPort) == 0 { return nil, errors.New("could not create collector proxy, address is missing") } + var dialOption grpc.DialOption + if o.TLS { // user requested a secure connection + var creds credentials.TransportCredentials + if len(o.TLSCA) == 0 { // no truststore given, use SystemCertPool + pool, err := systemCertPool() + if err != nil { + return nil, err + } + creds = credentials.NewClientTLSFromCert(pool, o.TLSServerName) + } else { // setup user specified truststore + var err error + creds, err = credentials.NewClientTLSFromFile(o.TLSCA, o.TLSServerName) + if err != nil { + return nil, err + } + } + dialOption = grpc.WithTransportCredentials(creds) + } else { // insecure connection + dialOption = grpc.WithInsecure() + } + var target string if len(o.CollectorHostPort) > 1 { r, _ := manual.GenerateAndRegisterManualResolver() @@ -56,7 +81,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) } // It does not return error if the collector is not running conn, _ := grpc.Dial(target, - grpc.WithInsecure(), + dialOption, grpc.WithBalancerName(roundrobin.Name), grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(o.MaxRetry)))) grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}) diff --git a/cmd/agent/app/reporter/grpc/collector_proxy_test.go b/cmd/agent/app/reporter/grpc/collector_proxy_test.go index a9e345fecc5..e944e56b0ba 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy_test.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy_test.go @@ -15,7 +15,11 @@ package grpc import ( + "crypto/x509" + "errors" + "io/ioutil" "net" + "os" "testing" "time" @@ -30,6 +34,21 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/jaeger" ) +const certPEM = ` +-----BEGIN CERTIFICATE----- +MIICBzCCAXCgAwIBAgIQNkTaUtOczDHvL2YT/kqScTANBgkqhkiG9w0BAQsFADAX +MRUwEwYDVQQKEwxqYWdlcnRyYWNpbmcwHhcNMTkwMjA4MDYyODAyWhcNMTkwMjA4 +MDcyODAyWjAXMRUwEwYDVQQKEwxqYWdlcnRyYWNpbmcwgZ8wDQYJKoZIhvcNAQEB +BQADgY0AMIGJAoGBAMcOLYflHGbqC1f7+tbnsdfcpd0rEuX65+ab0WzelAgvo988 +yD+j7LDLPIE8IPk/tfqaETZ8h0LRUUTn8F2rW/wgrl/G8Onz0utog38N0elfTifG +Mu7GJCr/+aYM5xbQMDj4Brb4vhnkJF8UBe49fWILhIltUcm1SeKqVX3d1FvpAgMB +AAGjVDBSMA4GA1UdDwEB/wQEAwICpDATBgNVHSUEDDAKBggrBgEFBQcDATAPBgNV +HRMBAf8EBTADAQH/MBoGA1UdEQQTMBGCCWxvY2FsaG9zdIcEfwAAATANBgkqhkiG +9w0BAQsFAAOBgQCreFjwpAn1HqJT812JOwoWKrt1NjOKGcz7pvIs1k3DfQVLH2aZ +iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25 +17x1o3Q64AnPCWOLpN2wjkfZqX7gZ84nsxpqb9Sbw1+2+kqX7dSZ3mfVxQ== +-----END CERTIFICATE-----` + func TestProxyBuilderMissingAddress(t *testing.T) { proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop()) require.Nil(t, proxy) @@ -37,13 +56,77 @@ func TestProxyBuilderMissingAddress(t *testing.T) { } func TestProxyBuilder(t *testing.T) { - proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{"localhost:0000"}}, metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - require.NotNil(t, proxy) - assert.NotNil(t, proxy.GetReporter()) - assert.NotNil(t, proxy.GetManager()) - assert.Nil(t, proxy.Close()) - assert.EqualError(t, proxy.Close(), "rpc error: code = Canceled desc = grpc: the client connection is closing") + tmpfile, err := ioutil.TempFile("", "cert*.pem") + if err != nil { + t.Fatalf("failed to create tempfile: %s", err) + } + + defer func() { + tmpfile.Close() + os.Remove(tmpfile.Name()) + }() + + if _, err := tmpfile.Write([]byte(certPEM)); err != nil { + t.Fatalf("failed to write cert to tempfile: %s", err) + } + + tests := []struct { + name string + proxyOptions *Options + expectError bool + }{ + { + name: "with insecure grpc connection", + proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}}, + expectError: false, + }, + { + name: "with secure grpc connection", + proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}, TLS: true}, + expectError: false, + }, + { + name: "with secure grpc connection and own CA", + proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}, TLS: true, TLSCA: tmpfile.Name()}, + expectError: false, + }, + { + name: "with secure grpc connection and a CA file which does not exist", + proxyOptions: &Options{CollectorHostPort: []string{"localhost:0000"}, TLS: true, TLSCA: "/not/valid"}, + expectError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop()) + if test.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.NotNil(t, proxy) + + assert.NotNil(t, proxy.GetReporter()) + assert.NotNil(t, proxy.GetManager()) + + assert.Nil(t, proxy.Close()) + assert.EqualError(t, proxy.Close(), "rpc error: code = Canceled desc = grpc: the client connection is closing") + } + }) + } +} + +// This test is only for coverage. +func TestSystemCertPoolError(t *testing.T) { + fakeErr := errors.New("fake error") + systemCertPool = func() (*x509.CertPool, error) { + return nil, fakeErr + } + _, err := NewCollectorProxy(&Options{ + CollectorHostPort: []string{"foo", "bar"}, + TLS: true, + }, nil, nil) + assert.Equal(t, fakeErr, err) } func TestMultipleCollectors(t *testing.T) { diff --git a/cmd/agent/app/reporter/grpc/flags.go b/cmd/agent/app/reporter/grpc/flags.go index 487c2c587b1..cdcef6d5d34 100644 --- a/cmd/agent/app/reporter/grpc/flags.go +++ b/cmd/agent/app/reporter/grpc/flags.go @@ -22,10 +22,13 @@ import ( ) const ( - gRPCPrefix = "reporter.grpc." - collectorHostPort = gRPCPrefix + "host-port" - retry = gRPCPrefix + "retry.max" - defaultMaxRetry = 3 + gRPCPrefix = "reporter.grpc." + collectorHostPort = gRPCPrefix + "host-port" + retry = gRPCPrefix + "retry.max" + defaultMaxRetry = 3 + collectorTLS = gRPCPrefix + "tls" + collectorTLSCA = gRPCPrefix + "tls.ca" + collectorTLSServerName = gRPCPrefix + "tls.server-name" ) // Options Struct to hold configurations @@ -33,12 +36,18 @@ type Options struct { // CollectorHostPort is list of host:port Jaeger Collectors. CollectorHostPort []string MaxRetry uint + TLS bool + TLSCA string + TLSServerName string } // AddFlags adds flags for Options. func AddFlags(flags *flag.FlagSet) { flags.String(collectorHostPort, "", "Comma-separated string representing host:port of a static list of collectors to connect to directly.") flags.Uint(retry, defaultMaxRetry, "Sets the maximum number of retries for a call.") + flags.Bool(collectorTLS, false, "Enable TLS.") + flags.String(collectorTLSCA, "", "Path to a TLS CA file. (default use the systems truststore)") + flags.String(collectorTLSServerName, "", "Override the TLS server name.") } // InitFromViper initializes Options with properties retrieved from Viper. @@ -48,5 +57,8 @@ func (o *Options) InitFromViper(v *viper.Viper) *Options { o.CollectorHostPort = strings.Split(hostPorts, ",") } o.MaxRetry = uint(v.GetInt(retry)) + o.TLS = v.GetBool(collectorTLS) + o.TLSCA = v.GetString(collectorTLSCA) + o.TLSServerName = v.GetString(collectorTLSServerName) return o } diff --git a/cmd/agent/app/reporter/grpc/flags_test.go b/cmd/agent/app/reporter/grpc/flags_test.go index 1cba9c3ee27..7c6927cb135 100644 --- a/cmd/agent/app/reporter/grpc/flags_test.go +++ b/cmd/agent/app/reporter/grpc/flags_test.go @@ -24,15 +24,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestBingFlags(t *testing.T) { +func TestBindFlags(t *testing.T) { tests := []struct { cOpts []string expected *Options }{ {cOpts: []string{"--reporter.grpc.host-port=localhost:1111", "--reporter.grpc.retry.max=15"}, - expected: &Options{CollectorHostPort: []string{"localhost:1111"}, MaxRetry:15}}, + expected: &Options{CollectorHostPort: []string{"localhost:1111"}, MaxRetry: 15}}, {cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222"}, - expected: &Options{CollectorHostPort: []string{"localhost:1111", "localhost:2222"}, MaxRetry:defaultMaxRetry}}, + expected: &Options{CollectorHostPort: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry}}, } for _, test := range tests { v := viper.New() diff --git a/cmd/agent/app/reporter/tchannel/flags_test.go b/cmd/agent/app/reporter/tchannel/flags_test.go index 2cafa1255b7..e64523b749c 100644 --- a/cmd/agent/app/reporter/tchannel/flags_test.go +++ b/cmd/agent/app/reporter/tchannel/flags_test.go @@ -26,7 +26,7 @@ import ( "go.uber.org/zap" ) -func TestBingFlags(t *testing.T) { +func TestBindFlags(t *testing.T) { tests := []struct { flags []string builder Builder diff --git a/cmd/collector/app/builder/builder_flags.go b/cmd/collector/app/builder/builder_flags.go index 1adf49df701..19703677668 100644 --- a/cmd/collector/app/builder/builder_flags.go +++ b/cmd/collector/app/builder/builder_flags.go @@ -28,6 +28,9 @@ const ( collectorPort = "collector.port" collectorHTTPPort = "collector.http-port" collectorGRPCPort = "collector.grpc-port" + collectorGRPCTLS = "collector.grpc.tls" + collectorGRPCCert = "collector.grpc.tls.cert" + collectorGRPCKey = "collector.grpc.tls.key" collectorZipkinHTTPort = "collector.zipkin.http-port" defaultTChannelPort = 14267 @@ -49,6 +52,12 @@ type CollectorOptions struct { CollectorHTTPPort int // CollectorGRPCPort is the port that the collector service listens in on for gRPC requests CollectorGRPCPort int + // CollectorGRPCTLS defines if the server is setup with TLS + CollectorGRPCTLS bool + // CollectorGRPCCert is the path to a TLS certificate file for the server + CollectorGRPCCert string + // CollectorGRPCKey is the path to a TLS key file for the server + CollectorGRPCKey string // CollectorZipkinHTTPPort is the port that the Zipkin collector service listens in on for http requests CollectorZipkinHTTPPort int } @@ -61,6 +70,9 @@ func AddFlags(flags *flag.FlagSet) { flags.Int(collectorHTTPPort, defaultHTTPPort, "The HTTP port for the collector service") flags.Int(collectorGRPCPort, defaultGRPCPort, "The gRPC port for the collector service") flags.Int(collectorZipkinHTTPort, 0, "The HTTP port for the Zipkin collector service e.g. 9411") + flags.Bool(collectorGRPCTLS, false, "Enable TLS") + flags.String(collectorGRPCCert, "", "Path to TLS certificate file") + flags.String(collectorGRPCKey, "", "Path to TLS key file") } // InitFromViper initializes CollectorOptions with properties from viper @@ -70,6 +82,9 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions { cOpts.CollectorPort = v.GetInt(collectorPort) cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort) cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort) + cOpts.CollectorGRPCTLS = v.GetBool(collectorGRPCTLS) + cOpts.CollectorGRPCCert = v.GetString(collectorGRPCCert) + cOpts.CollectorGRPCKey = v.GetString(collectorGRPCKey) cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort) return cOpts } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index d1797af1564..e250dff6c91 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -33,6 +33,7 @@ import ( "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" basicB "github.com/jaegertracing/jaeger/cmd/builder" "github.com/jaegertracing/jaeger/cmd/collector/app" @@ -140,7 +141,7 @@ func main() { ch.Serve(listener) } - server, err := startGRPCServer(builderOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger) + server, err := startGRPCServer(builderOpts, grpcHandler, strategyStore, logger) if err != nil { logger.Fatal("Could not start gRPC collector", zap.Error(err)) } @@ -207,13 +208,29 @@ func main() { } func startGRPCServer( - port int, + opts *builder.CollectorOptions, handler *app.GRPCHandler, samplingStore strategystore.StrategyStore, logger *zap.Logger, ) (*grpc.Server, error) { - server := grpc.NewServer() - _, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) { + var server *grpc.Server + + if opts.CollectorGRPCTLS { // user requested a server with TLS, setup creds + if opts.CollectorGRPCCert == "" || opts.CollectorGRPCKey == "" { + return nil, fmt.Errorf("you requested TLS but configuration does not include a path to cert and/or key") + } + creds, err := credentials.NewServerTLSFromFile( + opts.CollectorGRPCCert, + opts.CollectorGRPCKey, + ) + if err != nil { + return nil, fmt.Errorf("failed to load TLS keys: %s", err) + } + server = grpc.NewServer(grpc.Creds(creds)) + } else { // server without TLS + server = grpc.NewServer() + } + _, err := grpcserver.StartGRPCCollector(opts.CollectorGRPCPort, server, handler, samplingStore, logger, func(err error) { logger.Fatal("gRPC collector failed", zap.Error(err)) }) if err != nil {