diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 8aa6d28bf6b..3f76d73fd3b 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -761,7 +761,7 @@ func runQuery( grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), - grpcserver.WithServer(store.RegisterStoreServer(proxy)), + grpcserver.WithServer(store.RegisterStoreServer(proxy, logger)), grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)), grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)), diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 3e89a2e1702..f005fae21f4 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -338,7 +338,7 @@ func runReceive( ) srv := grpcserver.New(logger, receive.NewUnRegisterer(reg), tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(rw)), + grpcserver.WithServer(store.RegisterStoreServer(rw, logger)), grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))), grpcserver.WithServer(info.RegisterInfoServer(infoSrv)), diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 0bd29b905d6..2d8d79cf1e9 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -634,7 +634,7 @@ func runRule( return nil }), ) - options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore))) + options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore, logger))) } options = append(options, grpcserver.WithServer( diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 7d6da14d1ac..478ae29e13a 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -283,7 +283,7 @@ func runSidecar( ) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(promStore)), + grpcserver.WithServer(store.RegisterStoreServer(promStore, logger)), grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))), diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index abae34d9552..a9eb00f2127 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -445,7 +445,7 @@ func runStore( } s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(bs)), + grpcserver.WithServer(store.RegisterStoreServer(bs, logger)), grpcserver.WithServer(info.RegisterInfoServer(infoSrv)), grpcserver.WithListen(conf.grpcConfig.bindAddress), grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)), diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 878e894e5d9..2200822abb7 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -159,16 +159,16 @@ func (t *tenant) store() *store.TSDBStore { return t.storeTSDB } -func (t *tenant) client() store.Client { +func (t *tenant) client(logger log.Logger) store.Client { t.mtx.RLock() defer t.mtx.RUnlock() - store := t.store() - if store == nil { + tsdbStore := t.store() + if tsdbStore == nil { return nil } - client := storepb.ServerAsClient(store, 0) - return newLocalClient(client, store.LabelSet, store.TimeRange) + client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0) + return newLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange) } func (t *tenant) exemplars() *exemplars.TSDB { @@ -432,7 +432,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client { res := make([]store.Client, 0, len(t.tenants)) for _, tenant := range t.tenants { - client := tenant.client() + client := tenant.client(t.logger) if client != nil { res = append(res, client) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index d9245035f45..d969ddb64f6 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -91,9 +91,9 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { return &m } -func RegisterStoreServer(storeSrv storepb.StoreServer) func(*grpc.Server) { +func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*grpc.Server) { return func(s *grpc.Server) { - storepb.RegisterStoreServer(s, storeSrv) + storepb.RegisterStoreServer(s, NewRecoverableStoreServer(logger, storeSrv)) } } diff --git a/pkg/store/recover.go b/pkg/store/recover.go new file mode 100644 index 00000000000..c453c3bec3c --- /dev/null +++ b/pkg/store/recover.go @@ -0,0 +1,52 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "fmt" + "runtime" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type recoverableStoreServer struct { + logger log.Logger + storepb.StoreServer +} + +func NewRecoverableStoreServer(logger log.Logger, storeServer storepb.StoreServer) *recoverableStoreServer { + return &recoverableStoreServer{logger: logger, StoreServer: storeServer} +} + +func (r *recoverableStoreServer) Series(request *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + defer r.recover(srv) + return r.StoreServer.Series(request, srv) +} + +func (r *recoverableStoreServer) recover(srv storepb.Store_SeriesServer) { + e := recover() + if e == nil { + return + } + + switch err := e.(type) { + case runtime.Error: + // Print the stack trace but do not inhibit the running application. + buf := make([]byte, 64<<10) + buf = buf[:runtime.Stack(buf, false)] + + level.Error(r.logger).Log("msg", "runtime panic in TSDB Series server", "err", err.Error(), "stacktrace", string(buf)) + if err := srv.Send(storepb.NewWarnSeriesResponse(err)); err != nil { + level.Error(r.logger).Log("err", err) + } + default: + if err := srv.Send(storepb.NewWarnSeriesResponse(errors.New(fmt.Sprintf("unknown error while processing Series: %v", e)))); err != nil { + level.Error(r.logger).Log("err", err) + } + } +} diff --git a/pkg/store/recover_test.go b/pkg/store/recover_test.go new file mode 100644 index 00000000000..c33d337e853 --- /dev/null +++ b/pkg/store/recover_test.go @@ -0,0 +1,33 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func TestRecoverableServer(t *testing.T) { + logger := log.NewNopLogger() + store := NewRecoverableStoreServer(logger, &panicStoreServer{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + srv := storepb.NewInProcessStream(ctx, 1) + + testutil.Ok(t, store.Series(&storepb.SeriesRequest{}, srv)) +} + +type panicStoreServer struct { + storepb.StoreServer +} + +func (m *panicStoreServer) Series(_ *storepb.SeriesRequest, _ storepb.Store_SeriesServer) error { + panic("something went wrong.") +} diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index aeb4a25aef2..7d2b8c7f619 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -55,6 +55,14 @@ type inProcessStream struct { err chan error } +func NewInProcessStream(ctx context.Context, bufferSize int) *inProcessStream { + return &inProcessStream{ + ctx: ctx, + recv: make(chan *SeriesResponse, bufferSize), + err: make(chan error), + } +} + func (s *inProcessStream) Context() context.Context { return s.ctx } func (s *inProcessStream) Send(r *SeriesResponse) error {