Skip to content

Commit

Permalink
Recover from panics in Series calls (thanos-io#6077)
Browse files Browse the repository at this point in the history
* Recover from panics in Series calls

This commit adds panic recovery for Series calls in all Store servers.

Signed-off-by: Filip Petkovski <[email protected]>

* Apply error suggestion

Signed-off-by: Filip Petkovski <[email protected]>

---------

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored and Nathaniel Graham committed Apr 17, 2023
1 parent 830da2b commit 6507a04
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func runQuery(
grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(store.RegisterStoreServer(proxy, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func runReceive(
)

srv := grpcserver.New(logger, receive.NewUnRegisterer(reg), tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(rw)),
grpcserver.WithServer(store.RegisterStoreServer(rw, logger)),
grpcserver.WithServer(store.RegisterWritableStoreServer(rw)),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func runRule(
return nil
}),
)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)))
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore, logger)))
}

options = append(options, grpcserver.WithServer(
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func runSidecar(
)

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(store.RegisterStoreServer(promStore, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func runStore(
}

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs)),
grpcserver.WithServer(store.RegisterStoreServer(bs, logger)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
Expand Down
12 changes: 6 additions & 6 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,16 @@ func (t *tenant) store() *store.TSDBStore {
return t.storeTSDB
}

func (t *tenant) client() store.Client {
func (t *tenant) client(logger log.Logger) store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

store := t.store()
if store == nil {
tsdbStore := t.store()
if tsdbStore == nil {
return nil
}
client := storepb.ServerAsClient(store, 0)
return newLocalClient(client, store.LabelSet, store.TimeRange)
client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0)
return newLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange)
}

func (t *tenant) exemplars() *exemplars.TSDB {
Expand Down Expand Up @@ -432,7 +432,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client()
client := tenant.client(t.logger)
if client != nil {
res = append(res, client)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
return &m
}

func RegisterStoreServer(storeSrv storepb.StoreServer) func(*grpc.Server) {
func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*grpc.Server) {
return func(s *grpc.Server) {
storepb.RegisterStoreServer(s, storeSrv)
storepb.RegisterStoreServer(s, NewRecoverableStoreServer(logger, storeSrv))
}
}

Expand Down
52 changes: 52 additions & 0 deletions pkg/store/recover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"fmt"
"runtime"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/storepb"
)

type recoverableStoreServer struct {
logger log.Logger
storepb.StoreServer
}

func NewRecoverableStoreServer(logger log.Logger, storeServer storepb.StoreServer) *recoverableStoreServer {
return &recoverableStoreServer{logger: logger, StoreServer: storeServer}
}

func (r *recoverableStoreServer) Series(request *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
defer r.recover(srv)
return r.StoreServer.Series(request, srv)
}

func (r *recoverableStoreServer) recover(srv storepb.Store_SeriesServer) {
e := recover()
if e == nil {
return
}

switch err := e.(type) {
case runtime.Error:
// Print the stack trace but do not inhibit the running application.
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]

level.Error(r.logger).Log("msg", "runtime panic in TSDB Series server", "err", err.Error(), "stacktrace", string(buf))
if err := srv.Send(storepb.NewWarnSeriesResponse(err)); err != nil {
level.Error(r.logger).Log("err", err)
}
default:
if err := srv.Send(storepb.NewWarnSeriesResponse(errors.New(fmt.Sprintf("unknown error while processing Series: %v", e)))); err != nil {
level.Error(r.logger).Log("err", err)
}
}
}
33 changes: 33 additions & 0 deletions pkg/store/recover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"context"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"

"github.com/thanos-io/thanos/pkg/store/storepb"
)

func TestRecoverableServer(t *testing.T) {
logger := log.NewNopLogger()
store := NewRecoverableStoreServer(logger, &panicStoreServer{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv := storepb.NewInProcessStream(ctx, 1)

testutil.Ok(t, store.Series(&storepb.SeriesRequest{}, srv))
}

type panicStoreServer struct {
storepb.StoreServer
}

func (m *panicStoreServer) Series(_ *storepb.SeriesRequest, _ storepb.Store_SeriesServer) error {
panic("something went wrong.")
}
8 changes: 8 additions & 0 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type inProcessStream struct {
err chan error
}

func NewInProcessStream(ctx context.Context, bufferSize int) *inProcessStream {
return &inProcessStream{
ctx: ctx,
recv: make(chan *SeriesResponse, bufferSize),
err: make(chan error),
}
}

func (s *inProcessStream) Context() context.Context { return s.ctx }

func (s *inProcessStream) Send(r *SeriesResponse) error {
Expand Down

0 comments on commit 6507a04

Please sign in to comment.