From e1153afee27f8096a4a17d816a075ee807559f2e Mon Sep 17 00:00:00 2001 From: Brett Jones Date: Wed, 8 Jan 2020 17:06:06 -0600 Subject: [PATCH] receive: use grpc to forward remote write requests Signed-off-by: Brett Jones --- CHANGELOG.md | 1 + cmd/thanos/query.go | 53 ++----------------------- cmd/thanos/receive.go | 7 ++++ pkg/extgrpc/client.go | 59 ++++++++++++++++++++++++++++ pkg/receive/handler.go | 87 +++++++++++++++++++++++++----------------- 5 files changed, 123 insertions(+), 84 deletions(-) create mode 100644 pkg/extgrpc/client.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 565858fa6ff..76b15d4fab0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags - [#1967](https://github.com/thanos-io/thanos/issues/1967) Receive: Allow local TSDB compaction +- [#1970](https://github.com/thanos-io/thanos/issues/1970) Receive: Use gRPC for forwarding requests between peers ### Fixed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 2dd9c176633..349e9bf2eac 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -10,8 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -21,9 +19,12 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" + kingpin "gopkg.in/alecthomas/kingpin.v2" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/prober" @@ -34,11 +35,7 @@ import ( httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/tls" - "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) // registerQuery registers a query command. @@ -165,48 +162,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { } } -func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { - grpcMets := grpc_prometheus.NewClientMetrics() - grpcMets.EnableClientHandlingTimeHistogram( - grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), - ) - dialOpts := []grpc.DialOption{ - // We want to make sure that we can receive huge gRPC messages from storeAPI. - // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. - // Current limit is ~2GB. - // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), - grpc.WithUnaryInterceptor( - grpc_middleware.ChainUnaryClient( - grpcMets.UnaryClientInterceptor(), - tracing.UnaryClientInterceptor(tracer), - ), - ), - grpc.WithStreamInterceptor( - grpc_middleware.ChainStreamClient( - grpcMets.StreamClientInterceptor(), - tracing.StreamClientInterceptor(tracer), - ), - ), - } - - if reg != nil { - reg.MustRegister(grpcMets) - } - - if !secure { - return append(dialOpts, grpc.WithInsecure()), nil - } - - level.Info(logger).Log("msg", "enabling client to server TLS") - - tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName) - if err != nil { - return nil, err - } - return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil -} - // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured // store nodes, merging and duplicating the data to satisfy user query. func runQuery( @@ -251,7 +206,7 @@ func runQuery( }) reg.MustRegister(duplicatedStores) - dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName) + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName) if err != nil { return errors.Wrap(err, "building gRPC client") } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index fbe0eee834f..a572c836013 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/receive" @@ -190,6 +191,11 @@ func runReceive( if err != nil { return err } + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, rwServerCert != "", rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName) + if err != nil { + return err + } + webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ ListenAddress: rwAddress, Registry: reg, @@ -200,6 +206,7 @@ func runReceive( Tracer: tracer, TLSConfig: rwTLSConfig, TLSClientConfig: rwTLSClientConfig, + DialOpts: dialOpts, }) statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) diff --git a/pkg/extgrpc/client.go b/pkg/extgrpc/client.go new file mode 100644 index 00000000000..8caebceb784 --- /dev/null +++ b/pkg/extgrpc/client.go @@ -0,0 +1,59 @@ +package extgrpc + +import ( + "math" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + opentracing "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/tls" + "github.com/thanos-io/thanos/pkg/tracing" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// StoreClientGRPCOpts creates grpc dial options for connectiong to a store client. +func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { + grpcMets := grpc_prometheus.NewClientMetrics() + grpcMets.EnableClientHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ) + dialOpts := []grpc.DialOption{ + // We want to make sure that we can receive huge gRPC messages from storeAPI. + // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. + // Current limit is ~2GB. + // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + grpc.WithUnaryInterceptor( + grpc_middleware.ChainUnaryClient( + grpcMets.UnaryClientInterceptor(), + tracing.UnaryClientInterceptor(tracer), + ), + ), + grpc.WithStreamInterceptor( + grpc_middleware.ChainStreamClient( + grpcMets.StreamClientInterceptor(), + tracing.StreamClientInterceptor(tracer), + ), + ), + } + + if reg != nil { + reg.MustRegister(grpcMets) + } + + if !secure { + return append(dialOpts, grpc.WithInsecure()), nil + } + + level.Info(logger).Log("msg", "enabling client to server TLS") + + tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName) + if err != nil { + return nil, err + } + return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 6dfedab5d99..86b006d4ac8 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1,7 +1,6 @@ package receive import ( - "bytes" "context" "crypto/tls" "fmt" @@ -24,11 +23,13 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" terrors "github.com/prometheus/prometheus/tsdb/errors" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -47,18 +48,18 @@ type Options struct { Writer *Writer ListenAddress string Registry prometheus.Registerer - Endpoint string TenantHeader string ReplicaHeader string + Endpoint string ReplicationFactor uint64 Tracer opentracing.Tracer TLSConfig *tls.Config TLSClientConfig *tls.Config + DialOpts []grpc.DialOption } // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { - client *http.Client logger log.Logger writer *Writer router *route.Router @@ -67,6 +68,7 @@ type Handler struct { mtx sync.RWMutex hashring Hashring + peers *peerGroup // Metrics. forwardRequestsTotal *prometheus.CounterVec @@ -77,19 +79,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler { logger = log.NewNopLogger() } - transport := http.DefaultTransport.(*http.Transport) - transport.TLSClientConfig = o.TLSClientConfig - client := &http.Client{Transport: transport} - if o.Tracer != nil { - client.Transport = tracing.HTTPTripperware(logger, client.Transport) - } - h := &Handler{ - client: client, logger: logger, writer: o.Writer, router: route.New(), options: o, + peers: newPeerGroup(o.DialOpts...), forwardRequestsTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -355,20 +350,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic } // Make a request to the specified endpoint. go func(endpoint string) { - buf, err := proto.Marshal(wreqs[endpoint]) - if err != nil { - level.Error(h.logger).Log("msg", "marshaling proto", "err", err, "endpoint", endpoint) - ec <- err - return - } - req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(snappy.Encode(nil, buf))) - if err != nil { - level.Error(h.logger).Log("msg", "creating request", "err", err, "endpoint", endpoint) - ec <- err - return - } - req.Header.Add(h.options.TenantHeader, tenant) - req.Header.Add(h.options.ReplicaHeader, strconv.FormatUint(replicas[endpoint].n, 10)) + var err error // Increment the counters as necessary now that // the requests will go out. @@ -380,25 +362,27 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic h.forwardRequestsTotal.WithLabelValues("success").Inc() }() + cl, err := h.peers.Get(ctx, endpoint) + if err != nil { + level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint) + ec <- err + return + } + // Create a span to track the request made to another receive node. span, ctx := tracing.StartSpan(ctx, "thanos_receive_forward") defer span.Finish() - // Actually make the request against the endpoint - // we determined should handle these time series. - var res *http.Response - res, err = h.client.Do(req.WithContext(ctx)) + _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ + Timeseries: wreqs[endpoint].Timeseries, + Tenant: tenant, + Replica: int64(replicas[endpoint].n), + }) if err != nil { level.Error(h.logger).Log("msg", "forwarding request", "err", err, "endpoint", endpoint) ec <- err return } - if res.StatusCode != http.StatusOK { - err = errors.New(strconv.Itoa(res.StatusCode)) - level.Error(h.logger).Log("msg", "forwarding returned non-200 status", "err", err, "endpoint", endpoint) - ec <- err - return - } ec <- nil }(endpoint) } @@ -489,3 +473,36 @@ func isConflict(err error) bool { err.Error() == strconv.Itoa(http.StatusConflict) || status.Code(err) == codes.FailedPrecondition } + +func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { + return &peerGroup{ + dialOpts: dialOpts, + cache: map[string]storepb.WriteableStoreClient{}, + m: sync.Mutex{}, + } +} + +type peerGroup struct { + dialOpts []grpc.DialOption + cache map[string]storepb.WriteableStoreClient + m sync.Mutex +} + +func (p *peerGroup) Get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { + p.m.Lock() + defer p.m.Unlock() + + c, ok := p.cache[addr] + if ok { + return c, nil + } + + conn, err := grpc.DialContext(ctx, addr, p.dialOpts...) + if err != nil { + return nil, errors.Wrap(err, "failed to dial peer") + } + + client := storepb.NewWriteableStoreClient(conn) + p.cache[addr] = client + return client, nil +}