diff --git a/CHANGELOG.md b/CHANGELOG.md index 712aee4ba6e..8078522d487 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers. - [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix` - [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase +- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive: Fixed a leak on receive Store API Series, which was leaking on errors. ### Changed diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 57ce6cb7b43..7708a6e9e46 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -23,6 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" "golang.org/x/sync/errgroup" ) @@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error { return merr.Err() } -func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore { +func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer { t.mtx.RLock() defer t.mtx.RUnlock() - res := make(map[string]*store.TSDBStore, len(t.tenants)) + res := make(map[string]storepb.StoreServer, len(t.tenants)) for k, tenant := range t.tenants { s := tenant.store() if s != nil { diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index c8dd3c05a89..8cb84135f7b 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -24,14 +24,15 @@ import ( ) // MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances. +// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864 type MultiTSDBStore struct { logger log.Logger component component.SourceStoreAPI - tsdbStores func() map[string]*TSDBStore + tsdbStores func() map[string]storepb.StoreServer } // NewMultiTSDBStore creates a new MultiTSDBStore. -func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore { +func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore { if logger == nil { logger = log.NewNopLogger() } @@ -97,6 +98,8 @@ type tenantSeriesSetServer struct { tenant string } +// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality. +// Details https://github.com/thanos-io/thanos/issues/2864. func newTenantSeriesSetServer( ctx context.Context, tenant string, @@ -110,11 +113,9 @@ func newTenantSeriesSetServer( } } -func (s *tenantSeriesSetServer) Context() context.Context { - return s.ctx -} +func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx } -func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { +func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) { var err error tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) { err = store.Series(r, s) @@ -202,7 +203,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri defer wg.Done() ss.Series(store, r) }() - seriesSet = append(seriesSet, ss) } diff --git a/pkg/store/multitsdb_test.go b/pkg/store/multitsdb_test.go index e0da85e4b86..f93169e323d 100644 --- a/pkg/store/multitsdb_test.go +++ b/pkg/store/multitsdb_test.go @@ -4,6 +4,7 @@ package store import ( + "context" "fmt" "io/ioutil" "math" @@ -11,8 +12,11 @@ import ( "os" "path/filepath" "testing" + "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -21,6 +25,8 @@ import ( ) func TestMultiTSDBSeries(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { if ok := t.Run("headOnly", func(t testutil.TB) { @@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)} } - tsdbs := map[string]*TSDBStore{} + tsdbs := map[string]storepb.StoreServer{} for i, db := range dbs { tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger} } - store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs }) + store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs }) var expected []storepb.Series lastLabels := storepb.Series{} @@ -154,3 +160,135 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB }, ) } + +type mockedStoreServer struct { + storepb.StoreServer + + responses []*storepb.SeriesResponse +} + +func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { + for _, r := range m.responses { + if err := server.Send(r); err != nil { + return err + } + } + return nil +} + +// Regression test against https://github.com/thanos-io/thanos/issues/2823. +func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) { + t.Run("exhausted StoreSet", func(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + s := newTenantSeriesSetServer(context.Background(), "a", nil) + + resps := []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + } + + m := &mockedStoreServer{responses: resps} + + go func() { + s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}) + }() + + testutil.Ok(t, s.Err()) + i := 0 + for s.Next() { + l, c := s.At() + + testutil.Equals(t, resps[i].GetSeries().Labels, l) + testutil.Equals(t, resps[i].GetSeries().Chunks, c) + + i++ + } + testutil.Ok(t, s.Err()) + testutil.Equals(t, 3, i) + }) + + t.Run("cancelled, not exhausted StoreSet", func(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx, cancel := context.WithCancel(context.Background()) + s := newTenantSeriesSetServer(ctx, "a", nil) + + m := &mockedStoreServer{responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }} + go func() { + s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}) + }() + + testutil.Ok(t, s.Err()) + testutil.Equals(t, true, s.Next()) + cancel() + }) +} + +type mockedSeriesServer struct { + storepb.Store_SeriesServer + ctx context.Context + + send func(*storepb.SeriesResponse) error +} + +func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error { + return s.send(r) +} +func (s *mockedSeriesServer) Context() context.Context { return s.ctx } + +// Regression test against https://github.com/thanos-io/thanos/issues/2823. +// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted +func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer { + return map[string]storepb.StoreServer{ + // Ensure more than 10 (internal respCh channel). + "a": &mockedStoreServer{responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }}, + "b": &mockedStoreServer{responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }}, + } + }) + + if ok := t.Run("failing send", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // We mimic failing series server, but practically context cancel will do the same. + testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + cancel() + return ctx.Err() + }, + })) + testutil.NotOk(t, ctx.Err()) + }); !ok { + return + } +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index f3d85c27f43..074a6cc85d0 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1653,3 +1653,70 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { }, ) } + +func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + clients := []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + // Ensure more than 10 (internal respCh channel). + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }, + } + + logger := log.NewNopLogger() + p := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + if ok := t.Run("failling send", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // We mimic failing series server, but practically context cancel will do the same. + testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + cancel() + return ctx.Err() + }, + })) + testutil.NotOk(t, ctx.Err()) + }); !ok { + return + } +}