Skip to content

Commit

Permalink
*: Refactor gRPC server registration (#3223)
Browse files Browse the repository at this point in the history
This decouples the grpc server package from specific gRPC servers.
Previously there was tight coupling and leaking of specific server
impleementations into a "generic" instantiation of an opinionated gRPC
server.

Signed-off-by: Frederic Branczyk <[email protected]>
  • Loading branch information
brancz authored Sep 25, 2020
1 parent 5a776d7 commit a62cbb9
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 30 deletions.
4 changes: 3 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,9 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, rulesProxy,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ func runReceive(
WriteableStoreServer: webHandler,
}

s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, grpcProbe, rw,
s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(rw)),
grpcserver.WithServer(store.RegisterWritableStoreServer(rw)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
6 changes: 4 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,15 +542,17 @@ func runRule(

// Start gRPC server.
{
store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
tsdbStore := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}

// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)),
grpcserver.WithServer(thanosrules.RegisterRulesServer(ruleMgr)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c, m.Labels),
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs, nil,
s := grpcserver.New(logger, reg, tracer, component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
7 changes: 7 additions & 0 deletions pkg/rules/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -23,6 +24,12 @@ type Proxy struct {
rules func() []rulespb.RulesClient
}

func RegisterRulesServer(rulesSrv rulespb.RulesServer) func(*grpc.Server) {
return func(s *grpc.Server) {
rulespb.RegisterRulesServer(s, rulesSrv)
}
}

// NewProxy returns new rules.Proxy.
func NewProxy(logger log.Logger, rules func() []rulespb.RulesClient) *Proxy {
return &Proxy{
Expand Down
29 changes: 5 additions & 24 deletions pkg/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -44,7 +42,7 @@ type Server struct {

// New creates a new gRPC Store API.
// If rulesSrv is not nil, it also registers Rules API to the returned server.
func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, storeSrv storepb.StoreServer, rulesSrv rulespb.RulesServer, opts ...Option) *Server {
func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, opts ...Option) *Server {
logger = log.With(logger, "service", "gRPC/server", "component", comp.String())
options := options{
network: "tcp",
Expand All @@ -58,7 +56,7 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
)
panicsTotal := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Name: "grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})

Expand Down Expand Up @@ -87,13 +85,9 @@ func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer
}
s := grpc.NewServer(grpcOpts...)

if rulesSrv != nil {
rulespb.RegisterRulesServer(s, rulesSrv)
storepb.RegisterStoreServer(s, storeSrv)
level.Info(logger).Log("msg", "registering as gRPC StoreAPI and RulesAPI")
} else {
storepb.RegisterStoreServer(s, storeSrv)
level.Info(logger).Log("msg", "registering as gRPC StoreAPI")
// Register all configured servers.
for _, f := range options.registerServerFuncs {
f(s)
}

met.InitializeMetrics(s)
Expand Down Expand Up @@ -152,16 +146,3 @@ func (s *Server) Shutdown(err error) {
}
level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err)
}

// ReadWriteStoreServer is a StoreServer and a WriteableStoreServer.
type ReadWriteStoreServer interface {
storepb.StoreServer
storepb.WriteableStoreServer
}

// NewReadWrite creates a new server that can be written to.
func NewReadWrite(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, probe *prober.GRPCProbe, storeSrv ReadWriteStoreServer, opts ...Option) *Server {
s := New(logger, reg, tracer, comp, probe, storeSrv, nil, opts...)
storepb.RegisterWriteableStoreServer(s.srv, storeSrv)
return s
}
14 changes: 14 additions & 0 deletions pkg/server/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ package grpc
import (
"crypto/tls"
"time"

"google.golang.org/grpc"
)

const UnixSocket = "/tmp/test.sock"

type options struct {
registerServerFuncs []registerServerFunc

gracePeriod time.Duration
listen string
network string
Expand All @@ -29,6 +33,16 @@ func (f optionFunc) apply(o *options) {
f(o)
}

type registerServerFunc func(s *grpc.Server)

// WithGRPCServer calls the passed gRPC registration functions on the created
// grpc.Server.
func WithServer(f registerServerFunc) Option {
return optionFunc(func(o *options) {
o.registerServerFuncs = append(o.registerServerFuncs, f)
})
}

// WithGracePeriod sets shutdown grace period for gRPC server.
// Server waits connections to drain for specified amount of time.
func WithGracePeriod(t time.Duration) Option {
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tracing"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -77,6 +78,12 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
return &m
}

func RegisterStoreServer(storeSrv storepb.StoreServer) func(*grpc.Server) {
return func(s *grpc.Server) {
storepb.RegisterStoreServer(s, storeSrv)
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/store/labelpb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -39,6 +40,12 @@ type TSDBStore struct {
maxBytesPerFrame int
}

func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) {
return func(s *grpc.Server) {
storepb.RegisterWriteableStoreServer(s, storeSrv)
}
}

// ReadWriteTSDBStore is a TSDBStore that can also be written to.
type ReadWriteTSDBStore struct {
storepb.StoreServer
Expand Down

0 comments on commit a62cbb9

Please sign in to comment.