Skip to content

Commit

Permalink
cherry pick store gateway fix to release 0.30 (#6089)
Browse files Browse the repository at this point in the history
* Fix: Failure to close BlockSeriesClient cause store-gateway deadlock (#6086)

* Fix: Failure to close BlockSeriesClient cause store-gateway deadlock

Signed-off-by: Alan Protasio <[email protected]>

* Adding tests

Signed-off-by: Alan Protasio <[email protected]>

* reverting the change on get series

Signed-off-by: Alan Protasio <[email protected]>

* fix lint

Signed-off-by: Alan Protasio <[email protected]>

---------

Signed-off-by: Alan Protasio <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Alan Protasio <[email protected]>
Signed-off-by: Ben Ye <[email protected]>
Co-authored-by: Alan Protasio <[email protected]>
  • Loading branch information
yeya24 and alanprot authored Jan 31, 2023
1 parent ee14f41 commit fe3f5d2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#6066](https://github.com/thanos-io/thanos/pull/6066) Tracing: fixed panic because of nil sampler
- [#6086](https://github.com/thanos-io/thanos/pull/6086) Store Gateway: Fix store-gateway deadlock due to not close BlockSeriesClient

## [v0.30.1](https://github.com/thanos-io/thanos/tree/release-0.30) - 4.01.2023

Expand Down
5 changes: 5 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}

shardMatcher := req.ShardInfo.Matcher(&s.buffers)

blockClient := newBlockSeriesClient(
srv.Context(),
s.logger,
Expand All @@ -1220,9 +1221,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.seriesBatchSize,
s.metrics.chunkFetchDuration,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
"block.id": blk.meta.ULID,
"block.mint": blk.meta.MinTime,
Expand Down Expand Up @@ -1464,6 +1467,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration)
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
Expand Down Expand Up @@ -1638,6 +1642,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
SkipChunks: true,
}
blockClient := newBlockSeriesClient(newCtx, s.logger, b, seriesReq, nil, bytesLimiter, nil, true, SeriesBatchSize, s.metrics.chunkFetchDuration)
defer blockClient.Close()

if err := blockClient.ExpandPostings(
reqSeriesMatchersNoExtLabels,
Expand Down
23 changes: 23 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -765,6 +766,10 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
vals, err := s.store.LabelNames(ctx, tc.req)
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

testutil.Ok(t, err)

testutil.Equals(t, tc.expected, vals.Names)
Expand Down Expand Up @@ -868,6 +873,10 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
vals, err := s.store.LabelValues(ctx, tc.req)
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

testutil.Ok(t, err)

testutil.Equals(t, tc.expected, emptyToNil(vals.Values))
Expand All @@ -882,3 +891,17 @@ func emptyToNil(values []string) []string {
}
return values
}

func waitTimeout(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return
case <-time.After(timeout):
t.Fatalf("timeout waiting wg for %v", timeout)
}
}

0 comments on commit fe3f5d2

Please sign in to comment.