Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: Introduce graceful shutdown for gRPC Servers #1687

Merged
merged 11 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#1687](https://github.com/thanos-io/thanos/pull/1687) Add a new `--grpc-grace-period` CLI option to components which serve gRPC to set how long to wait until gRPC Server shuts down.
- [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up.
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information.
- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down.
Expand Down
29 changes: 14 additions & 15 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import (
"text/template"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/olekukonko/tablewriter"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand All @@ -20,19 +29,9 @@ import (
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/ui"
"github.com/thanos-io/thanos/pkg/verifier"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/olekukonko/tablewriter"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
kingpin "gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -311,7 +310,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {
cmd := root.Command("web", "Web interface for remote storage bucket")
bind := cmd.Flag("listen", "HTTP host:port to listen on").Default("0.0.0.0:8080").String()
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
_, httpGracePeriod := regHTTPFlags(cmd)
interval := cmd.Flag("refresh", "Refresh interval to download metadata from remote storage").Default("30m").Duration()
timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration()
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()
Expand All @@ -321,9 +320,9 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str

statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := server.NewHTTP(logger, reg, component.Bucket, statusProber,
server.WithListen(*bind),
server.WithGracePeriod(time.Duration(*httpGracePeriod)),
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
httpserver.WithListen(*bind),
httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)),
)

bucketUI := ui.NewBucketUI(logger, *label)
Expand Down
14 changes: 6 additions & 8 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
"strings"
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand All @@ -25,10 +22,12 @@ import (
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -79,8 +78,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
"Compaction index verification will ignore out of order label names.").
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
httpAddr, httpGracePeriod := regHTTPFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions.").
Default("./data").String()
Expand Down Expand Up @@ -179,9 +177,9 @@ func runCompact(

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := server.NewHTTP(logger, reg, component, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
srv := httpserver.New(logger, reg, component, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)
Expand Down
14 changes: 6 additions & 8 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"path/filepath"
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand All @@ -23,19 +20,20 @@ import (
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Downsample
cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

httpAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
httpAddr, httpGracePeriod := regHTTPFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()
Expand Down Expand Up @@ -126,9 +124,9 @@ func runDownsample(
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
srv := httpserver.New(logger, reg, comp, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
g.Add(srv.ListenAndServe, srv.Shutdown)

Expand Down
12 changes: 7 additions & 5 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,31 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration {

func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
grpcGracePeriod *model.Duration,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
) {
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components.").
Default("0.0.0.0:10901").String()
grpcGracePeriod = modelDuration(cmd.Flag("grpc-grace-period", "Time to wait after an interrupt received for GRPC Server.").Default("2m")) // by default it's the same as query.timeout.

grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()

return grpcBindAddr,
grpcGracePeriod,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA
}

func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string {
return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
}
func regHTTPFlags(cmd *kingpin.CmdClause) (httpBindAddr *string, httpGracePeriod *model.Duration) {
httpBindAddr = cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
httpGracePeriod = modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("2m")) // by default it's the same as query.timeout.

func regHTTPGracePeriodFlag(cmd *kingpin.CmdClause) *model.Duration {
return modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("5s"))
return httpBindAddr, httpGracePeriod
}

func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *extflag.PathOrContent {
Expand Down
156 changes: 0 additions & 156 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,25 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/debug"
"syscall"

gmetrics "github.com/armon/go-metrics"
gprom "github.com/armon/go-metrics/prometheus"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
"go.uber.org/automaxprocs/maxprocs"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -222,145 +208,3 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error {
return errors.New("canceled")
}
}

func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}
tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA)
if err != nil {
return opts, err
}
if tlsCfg != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
}
return opts, nil
}

func defaultTLSServerOpts(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) {
if key == "" && cert == "" {
if clientCA != "" {
return nil, errors.New("when a client CA is used a server key and certificate must also be provided")
}

level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable")
return nil, nil
}

level.Info(logger).Log("msg", "enabling server side TLS")

if key == "" || cert == "" {
return nil, errors.New("both server key and certificate must be provided")
}

tlsCfg := &tls.Config{
MinVersion: tls.VersionTLS12,
}

tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "server credentials")
}

tlsCfg.Certificates = []tls.Certificate{tlsCert}

if clientCA != "" {
caPEM, err := ioutil.ReadFile(clientCA)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
tlsCfg.ClientCAs = certPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert

level.Info(logger).Log("msg", "server TLS client verification enabled")
}

return tlsCfg, nil
}

func defaultTLSClientOpts(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) {
var certPool *x509.CertPool
if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if serverName != "" {
tlsCfg.ServerName = serverName
}

if (key != "") != (cert != "") {
return nil, errors.New("both client key and certificate must be provided")
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS client authentication enabled")
}
return tlsCfg, nil
}

func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)
panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
reg.MustRegister(met, panicsTotal)

grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
opts = append(opts,
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, srv)
met.InitializeMetrics(s)

return s
}
Loading