diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index b8c7ec3127..4bda886c4d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -480,7 +480,9 @@ func runQuery( return errors.Wrap(err, "setup gRPC server") } - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, rulesProxy, + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(proxy)), + grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index dcee94f15e..91df8719bb 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -462,7 +462,9 @@ func runReceive( WriteableStoreServer: webHandler, } - s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, grpcProbe, rw, + s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(rw)), + grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6b5f415c16..e5aa2444f5 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -542,7 +542,7 @@ func runRule( // Start gRPC server. { - store := store.NewTSDBStore(logger, reg, db, component.Rule, lset) + tsdbStore := store.NewTSDBStore(logger, reg, db, component.Rule, lset) tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) if err != nil { @@ -550,7 +550,9 @@ func runRule( } // TODO: Add rules API implementation when ready. - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr, + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)), + grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)), grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index def78d07be..af16b77e50 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -215,7 +215,9 @@ func runSidecar( return errors.Wrap(err, "setup gRPC server") } - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c, m.Labels), + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(promStore)), + grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithListen(conf.grpc.bindAddress), grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), grpcserver.WithTLSConfig(tlsCfg), diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 39105fe316..908246e336 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -345,7 +345,8 @@ func runStore( return errors.Wrap(err, "setup gRPC server") } - s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs, nil, + s := grpcserver.New(logger, reg, tracer, component, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(bs)), grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), diff --git a/pkg/rules/proxy.go b/pkg/rules/proxy.go index a40c14bb79..7910f0ef68 100644 --- a/pkg/rules/proxy.go +++ b/pkg/rules/proxy.go @@ -13,6 +13,7 @@ import ( "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/store/storepb" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -23,6 +24,12 @@ type Proxy struct { rules func() []rulespb.RulesClient } +func RegisterRulesServer(rulesSrv rulespb.RulesServer) func(*grpc.Server) { + return func(s *grpc.Server) { + rulespb.RegisterRulesServer(s, rulesSrv) + } +} + // NewProxy returns new rules.Proxy. func NewProxy(logger log.Logger, rules func() []rulespb.RulesClient) *Proxy { return &Proxy{ diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index 2300c98968..1198370764 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -26,8 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/prober" - "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -44,7 +42,7 @@ type Server struct { // New creates a new gRPC Store API. // If rulesSrv is not nil, it also registers Rules API to the returned server. -func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, storeSrv storepb.StoreServer, rulesSrv rulespb.RulesServer, opts ...Option) *Server { +func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, opts ...Option) *Server { logger = log.With(logger, "service", "gRPC/server", "component", comp.String()) options := options{ network: "tcp", @@ -58,7 +56,7 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), ) panicsTotal := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_grpc_req_panics_recovered_total", + Name: "grpc_req_panics_recovered_total", Help: "Total number of gRPC requests recovered from internal panic.", }) @@ -87,13 +85,9 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer } s := grpc.NewServer(grpcOpts...) - if rulesSrv != nil { - rulespb.RegisterRulesServer(s, rulesSrv) - storepb.RegisterStoreServer(s, storeSrv) - level.Info(logger).Log("msg", "registering as gRPC StoreAPI and RulesAPI") - } else { - storepb.RegisterStoreServer(s, storeSrv) - level.Info(logger).Log("msg", "registering as gRPC StoreAPI") + // Register all configured servers. + for _, f := range options.registerServerFuncs { + f(s) } met.InitializeMetrics(s) @@ -152,16 +146,3 @@ func (s *Server) Shutdown(err error) { } level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err) } - -// ReadWriteStoreServer is a StoreServer and a WriteableStoreServer. -type ReadWriteStoreServer interface { - storepb.StoreServer - storepb.WriteableStoreServer -} - -// NewReadWrite creates a new server that can be written to. -func NewReadWrite(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, storeSrv ReadWriteStoreServer, opts ...Option) *Server { - s := New(logger, reg, tracer, comp, probe, storeSrv, nil, opts...) - storepb.RegisterWriteableStoreServer(s.srv, storeSrv) - return s -} diff --git a/pkg/server/grpc/option.go b/pkg/server/grpc/option.go index 82785f003e..5820d0a52b 100644 --- a/pkg/server/grpc/option.go +++ b/pkg/server/grpc/option.go @@ -6,11 +6,15 @@ package grpc import ( "crypto/tls" "time" + + "google.golang.org/grpc" ) const UnixSocket = "/tmp/test.sock" type options struct { + registerServerFuncs []registerServerFunc + gracePeriod time.Duration listen string network string @@ -29,6 +33,16 @@ func (f optionFunc) apply(o *options) { f(o) } +type registerServerFunc func(s *grpc.Server) + +// WithGRPCServer calls the passed gRPC registration functions on the created +// grpc.Server. +func WithServer(f registerServerFunc) Option { + return optionFunc(func(o *options) { + o.registerServerFuncs = append(o.registerServerFuncs, f) + }) +} + // WithGracePeriod sets shutdown grace period for gRPC server. // Server waits connections to drain for specified amount of time. func WithGracePeriod(t time.Duration) Option { diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 426550f845..9c9bfe2b74 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tracing" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -77,6 +78,12 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { return &m } +func RegisterStoreServer(storeSrv storepb.StoreServer) func(*grpc.Server) { + return func(s *grpc.Server) { + storepb.RegisterStoreServer(s, storeSrv) + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 70126fd675..87ab161999 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/store/labelpb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -39,6 +40,12 @@ type TSDBStore struct { maxBytesPerFrame int } +func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) { + return func(s *grpc.Server) { + storepb.RegisterWriteableStoreServer(s, storeSrv) + } +} + // ReadWriteTSDBStore is a TSDBStore that can also be written to. type ReadWriteTSDBStore struct { storepb.StoreServer