Skip to content

Commit

Permalink
receive: use grpc to forward remote write requests
Browse files Browse the repository at this point in the history
Signed-off-by: Brett Jones <[email protected]>
  • Loading branch information
blockloop committed Jan 9, 2020
1 parent 1d71579 commit 5748a8a
Show file tree
Hide file tree
Showing 11 changed files with 848 additions and 225 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags
- [#1967](https://github.com/thanos-io/thanos/issues/1967) Receive: Allow local TSDB compaction
- [#1970](https://github.com/thanos-io/thanos/issues/1970) Receive: Use gRPC for forwarding requests between peers

### Fixed

Expand Down
53 changes: 4 additions & 49 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -21,9 +19,12 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -34,11 +35,7 @@ import (
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -165,48 +162,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}
}

func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
),
}

if reg != nil {
reg.MustRegister(grpcMets)
}

if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "enabling client to server TLS")

tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}

// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
// store nodes, merging and duplicating the data to satisfy user query.
func runQuery(
Expand Down Expand Up @@ -251,7 +206,7 @@ func runQuery(
})
reg.MustRegister(duplicatedStores)

dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName)
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName)
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
Expand All @@ -27,7 +30,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
Expand Down Expand Up @@ -189,6 +191,11 @@ func runReceive(
if err != nil {
return err
}
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, rwServerCert != "", rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName)
if err != nil {
return err
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
ListenAddress: rwAddress,
Registry: reg,
Expand All @@ -199,6 +206,7 @@ func runReceive(
Tracer: tracer,
TLSConfig: rwTLSConfig,
TLSClientConfig: rwTLSClientConfig,
DialOpts: dialOpts,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -376,8 +384,12 @@ func runReceive(
s.Shutdown(errors.New("reload hashrings"))
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset)
rw := store.ReadWriteTSDBStore{
StoreServer: tsdbStore,
WriteableStoreServer: webHandler,
}

s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, tsdbStore,
s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
59 changes: 59 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package extgrpc

import (
"math"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
// Current limit is ~2GB.
// TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpcMets.UnaryClientInterceptor(),
tracing.UnaryClientInterceptor(tracer),
),
),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
),
),
}

if reg != nil {
reg.MustRegister(grpcMets)
}

if !secure {
return append(dialOpts, grpc.WithInsecure()), nil
}

level.Info(logger).Log("msg", "enabling client to server TLS")

tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName)
if err != nil {
return nil, err
}
return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil
}
Loading

0 comments on commit 5748a8a

Please sign in to comment.