Skip to content

Commit

Permalink
receive: Fixed leak on receive and querier proxying Store API Series,…
Browse files Browse the repository at this point in the history
… which was leaking on errors.

Fixes: #2823

TestTenantSeriesSetServert_NotLeakingIfNotExhausted was showing leaks:

```
    TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:132: leaktest: timed out checking goroutines
    TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]:
        github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Send(0xc000708360, 0xc0003104c0, 0x0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:141 +0x13e
        github.com/thanos-io/thanos/pkg/store.(*mockedStoreServer).Series(0xc0004e6330, 0xc0007083c0, 0x20ac2c0, 0xc000708360, 0x5116a0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:173 +0x76
        github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series.func1(0x2097760, 0xc00003c940)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:121 +0x56
        github.com/thanos-io/thanos/pkg/tracing.DoInSpan(0x2097760, 0xc00003c940, 0x1c8bace, 0x17, 0xc000173760, 0x0, 0x0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/tracing/tracing.go:72 +0xcc
        github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series(0xc000708360, 0x20983e0, 0xc0004e6330, 0xc0007083c0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:120 +0xfa
        github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2.1(0xc000708360, 0xc0004e6330)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:225 +0x62
        created by github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:224 +0x618
    --- FAIL: TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet (10.03s)
FAIL

Process finished with exit code 1

```

TestMultiTSDBStore_NotLeakingOnPrematureFinish was showing:

```
TestMultiTSDBStore_NotLeakingOnSendError: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]:
        github.com/thanos-io/thanos/pkg/store.ctxRespSender.send(...)
        	/home/bwplotka/Repos/thanos/pkg/store/proxy.go:198
        github.com/thanos-io/thanos/pkg/store.(*MultiTSDBStore).Series.func1(0x0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:214 +0x5cf
        golang.org/x/sync/errgroup.(*Group).Go.func1(0xc0002708d0, 0xc000416380)
        	/home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
        created by golang.org/x/sync/errgroup.(*Group).Go
        	/home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66
--- FAIL: TestMultiTSDBStore_NotLeakingOnSendError (10.02s)
```

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jul 9, 2020
1 parent 9d0bd29 commit b3f37a1
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive: Fixed a leak on receive Store API Series, which was leaking on errors.

### Changed

Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
return merr.Err()
}

func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]*store.TSDBStore, len(t.tenants))
res := make(map[string]storepb.StoreServer, len(t.tenants))
for k, tenant := range t.tenants {
s := tenant.store()
if s != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
)

// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances.
// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864
type MultiTSDBStore struct {
logger log.Logger
component component.SourceStoreAPI
tsdbStores func() map[string]*TSDBStore
tsdbStores func() map[string]storepb.StoreServer
}

// NewMultiTSDBStore creates a new MultiTSDBStore.
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -97,6 +98,8 @@ type tenantSeriesSetServer struct {
tenant string
}

// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
// Details https://github.com/thanos-io/thanos/issues/2864.
func newTenantSeriesSetServer(
ctx context.Context,
tenant string,
Expand All @@ -110,11 +113,9 @@ func newTenantSeriesSetServer(
}
}

func (s *tenantSeriesSetServer) Context() context.Context {
return s.ctx
}
func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx }

func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) {
var err error
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
err = store.Series(r, s)
Expand Down Expand Up @@ -202,7 +203,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
defer wg.Done()
ss.Series(store, r)
}()

seriesSet = append(seriesSet, ss)
}

Expand Down
142 changes: 140 additions & 2 deletions pkg/store/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
package store

import (
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand All @@ -21,6 +25,8 @@ import (
)

func TestMultiTSDBSeries(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
if ok := t.Run("headOnly", func(t testutil.TB) {
Expand Down Expand Up @@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)}
}

tsdbs := map[string]*TSDBStore{}
tsdbs := map[string]storepb.StoreServer{}
for i, db := range dbs {
tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger}
}

store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs })
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })

var expected []storepb.Series
lastLabels := storepb.Series{}
Expand Down Expand Up @@ -154,3 +160,135 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
},
)
}

type mockedStoreServer struct {
storepb.StoreServer

responses []*storepb.SeriesResponse
}

func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
for _, r := range m.responses {
if err := server.Send(r); err != nil {
return err
}
}
return nil
}

// Regression test against https://github.com/thanos-io/thanos/issues/2823.
func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) {
t.Run("exhausted StoreSet", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

s := newTenantSeriesSetServer(context.Background(), "a", nil)

resps := []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}

m := &mockedStoreServer{responses: resps}

go func() {
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
}()

testutil.Ok(t, s.Err())
i := 0
for s.Next() {
l, c := s.At()

testutil.Equals(t, resps[i].GetSeries().Labels, l)
testutil.Equals(t, resps[i].GetSeries().Chunks, c)

i++
}
testutil.Ok(t, s.Err())
testutil.Equals(t, 3, i)
})

t.Run("cancelled, not exhausted StoreSet", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx, cancel := context.WithCancel(context.Background())
s := newTenantSeriesSetServer(ctx, "a", nil)

m := &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}}
go func() {
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
}()

testutil.Ok(t, s.Err())
testutil.Equals(t, true, s.Next())
cancel()
})
}

type mockedSeriesServer struct {
storepb.Store_SeriesServer
ctx context.Context

send func(*storepb.SeriesResponse) error
}

func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error {
return s.send(r)
}
func (s *mockedSeriesServer) Context() context.Context { return s.ctx }

// Regression test against https://github.com/thanos-io/thanos/issues/2823.
// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted
func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer {
return map[string]storepb.StoreServer{
// Ensure more than 10 (internal respCh channel).
"a": &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}},
"b": &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}},
}
})

if ok := t.Run("failing send", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
cancel()
return ctx.Err()
},
}))
testutil.NotOk(t, ctx.Err())
}); !ok {
return
}
}
67 changes: 67 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,3 +1653,70 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
},
)
}

func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

clients := []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
// Ensure more than 10 (internal respCh channel).
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
minTime: math.MinInt64,
maxTime: math.MaxInt64,
},
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
minTime: math.MinInt64,
maxTime: math.MaxInt64,
},
}

logger := log.NewNopLogger()
p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 0,
}

if ok := t.Run("failling send", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
cancel()
return ctx.Err()
},
}))
testutil.NotOk(t, ctx.Err())
}); !ok {
return
}
}

0 comments on commit b3f37a1

Please sign in to comment.