diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 74550c558ba..a90b534f218 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -246,18 +246,30 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{})) } -func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) { +func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) { opts := []grpc.ServerOption{} + tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA) + if err != nil { + return opts, err + } + if tlsCfg != nil { + opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))) + } + return opts, nil +} +func defaultTLSServerOpts(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) { if key == "" && cert == "" { if clientCA != "" { return nil, errors.New("when a client CA is used a server key and certificate must also be provided") } level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable") - return opts, nil + return nil, nil } + level.Info(logger).Log("msg", "enabling server side TLS") + if key == "" || cert == "" { return nil, errors.New("both server key and certificate must be provided") } @@ -271,8 +283,6 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp return nil, errors.Wrap(err, "server credentials") } - level.Info(logger).Log("msg", "enabled gRPC server side TLS") - tlsCfg.Certificates = []tls.Certificate{tlsCert} if clientCA != "" { @@ -288,10 +298,55 @@ func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grp tlsCfg.ClientCAs = certPool tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert - level.Info(logger).Log("msg", "gRPC server TLS client verification enabled") + level.Info(logger).Log("msg", "server TLS client verification enabled") } - return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil + return tlsCfg, nil +} + +func defaultTLSClientOpts(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) { + var certPool *x509.CertPool + if caCert != "" { + caPEM, err := ioutil.ReadFile(caCert) + if err != nil { + return nil, errors.Wrap(err, "reading client CA") + } + + certPool = x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caPEM) { + return nil, errors.Wrap(err, "building client CA") + } + level.Info(logger).Log("msg", "TLS client using provided certificate pool") + } else { + var err error + certPool, err = x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "reading system certificate pool") + } + level.Info(logger).Log("msg", "TLS client using system certificate pool") + } + + tlsCfg := &tls.Config{ + RootCAs: certPool, + } + + if serverName != "" { + tlsCfg.ServerName = serverName + } + + if (key != "") != (cert != "") { + return nil, errors.New("both client key and certificate must be provided") + } + + if cert != "" { + cert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return nil, errors.Wrap(err, "client credentials") + } + tlsCfg.Certificates = []tls.Certificate{cert} + level.Info(logger).Log("msg", "TLS client authentication enabled") + } + return tlsCfg, nil } func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server { diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 6821e6fd364..e4c98de02b1 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -2,10 +2,7 @@ package main import ( "context" - "crypto/tls" - "crypto/x509" "fmt" - "io/ioutil" "math" "net" "net/http" @@ -164,7 +161,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { } } -func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert string, serverName string) ([]grpc.DialOption, error) { +func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { grpcMets := grpc_prometheus.NewClientMetrics() grpcMets.EnableClientHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{ @@ -199,50 +196,13 @@ func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope return append(dialOpts, grpc.WithInsecure()), nil } - level.Info(logger).Log("msg", "Enabling client to server TLS") + level.Info(logger).Log("msg", "enabling client to server TLS") - var certPool *x509.CertPool - - if caCert != "" { - caPEM, err := ioutil.ReadFile(caCert) - if err != nil { - return nil, errors.Wrap(err, "reading client CA") - } - - certPool = x509.NewCertPool() - if !certPool.AppendCertsFromPEM(caPEM) { - return nil, errors.Wrap(err, "building client CA") - } - level.Info(logger).Log("msg", "TLS Client using provided certificate pool") - } else { - var err error - certPool, err = x509.SystemCertPool() - if err != nil { - return nil, errors.Wrap(err, "reading system certificate pool") - } - level.Info(logger).Log("msg", "TLS Client using system certificate pool") - } - - tlsCfg := &tls.Config{ - RootCAs: certPool, - } - - if serverName != "" { - tlsCfg.ServerName = serverName - } - - if cert != "" { - cert, err := tls.LoadX509KeyPair(cert, key) - if err != nil { - return nil, errors.Wrap(err, "client credentials") - } - tlsCfg.Certificates = []tls.Certificate{cert} - level.Info(logger).Log("msg", "TLS Client authentication enabled") + tlsCfg, err := defaultTLSClientOpts(logger, cert, key, caCert, serverName) + if err != nil { + return nil, err } - - creds := credentials.NewTLS(tlsCfg) - - return append(dialOpts, grpc.WithTransportCredentials(creds)), nil + return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil } // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured @@ -428,7 +388,7 @@ func runQuery( } logger := log.With(logger, "component", component.Query.String()) - opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA) + opts, err := defaultGRPCTLSServerOpts(logger, srvCert, srvKey, srvClientCA) if err != nil { return errors.Wrap(err, "build gRPC server") } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 96391540378..72f97286e52 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -35,11 +35,18 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { comp := component.Receive cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)") - grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) httpBindAddr := regHTTPAddrFlag(cmd) + grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) - remoteWriteAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). + rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). Default("0.0.0.0:19291").String() + rwServerCert := cmd.Flag("remote-write.server-tls-cert", "TLS Certificate for HTTP server, leave blank to disable TLS").Default("").String() + rwServerKey := cmd.Flag("remote-write.server-tls-key", "TLS Key for the HTTP server, leave blank to disable TLS").Default("").String() + rwServerClientCA := cmd.Flag("remote-write.server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String() + rwClientCert := cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() + rwClientKey := cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate").Default("").String() + rwClientServerCA := cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers").Default("").String() + rwClientServerName := cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB."). Default("./data").String() @@ -87,7 +94,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { if hostname == "" || err != nil { return errors.New("--receive.local-endpoint is empty and host could not be determined.") } - parts := strings.Split(*remoteWriteAddress, ":") + parts := strings.Split(*rwAddress, ":") port := parts[len(parts)-1] *local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port) } @@ -98,11 +105,18 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { reg, tracer, *grpcBindAddr, - *cert, - *key, - *clientCA, + *grpcCert, + *grpcKey, + *grpcClientCA, *httpBindAddr, - *remoteWriteAddress, + *rwAddress, + *rwServerCert, + *rwServerKey, + *rwServerClientCA, + *rwClientCert, + *rwClientKey, + *rwClientServerCA, + *rwClientServerName, *dataDir, objStoreConfig, lset, @@ -124,11 +138,18 @@ func runReceive( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, - cert string, - key string, - clientCA string, + grpcCert string, + grpcKey string, + grpcClientCA string, httpBindAddr string, - remoteWriteAddress string, + rwAddress string, + rwServerCert string, + rwServerKey string, + rwServerClientCA string, + rwClientCert string, + rwClientKey string, + rwClientServerCA string, + rwClientServerName string, dataDir string, objStoreConfig *extflag.PathOrContent, lset labels.Labels, @@ -153,14 +174,24 @@ func runReceive( } localStorage := &tsdb.ReadyStorage{} + rwTLSConfig, err := defaultTLSServerOpts(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA) + if err != nil { + return err + } + rwTLSClientConfig, err := defaultTLSClientOpts(logger, rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName) + if err != nil { + return err + } webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - ListenAddress: remoteWriteAddress, + ListenAddress: rwAddress, Registry: reg, Endpoint: endpoint, TenantHeader: tenantHeader, ReplicaHeader: replicaHeader, ReplicationFactor: replicationFactor, Tracer: tracer, + TLSConfig: rwTLSConfig, + TLSClientConfig: rwTLSClientConfig, }) statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) @@ -317,7 +348,7 @@ func runReceive( startGRPC := make(chan struct{}) g.Add(func() error { defer close(startGRPC) - opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, grpcCert, grpcKey, grpcClientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 139a7c8a748..faa6a0c0a98 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -214,7 +214,7 @@ func runStore( return errors.Wrap(err, "listen API address") } - opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) + opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "grpc server options") } diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 23bb54f2803..16a988098e2 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -3,6 +3,7 @@ package receive import ( "bytes" "context" + "crypto/tls" "fmt" "io/ioutil" stdlog "log" @@ -49,6 +50,8 @@ type Options struct { ReplicaHeader string ReplicationFactor uint64 Tracer opentracing.Tracer + TLSConfig *tls.Config + TLSClientConfig *tls.Config } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -72,9 +75,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler { logger = log.NewNopLogger() } - client := &http.Client{} + transport := http.DefaultTransport.(*http.Transport) + transport.TLSClientConfig = o.TLSClientConfig + client := &http.Client{Transport: transport} if o.Tracer != nil { - client.Transport = tracing.HTTPTripperware(logger, http.DefaultTransport) + client.Transport = tracing.HTTPTripperware(logger, client.Transport) } h := &Handler{ @@ -186,8 +191,9 @@ func (h *Handler) Run() error { errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) httpSrv := &http.Server{ - Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName), - ErrorLog: errlog, + Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName), + ErrorLog: errlog, + TLSConfig: h.options.TLSConfig, } return httpSrv.Serve(h.listener)