From 8d3e1b33066029d23f1a791cc862853690c0b03b Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 17 Aug 2020 18:58:28 +0100 Subject: [PATCH] receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDBStore, fixed multitsdb benchmarks. Fixed https://github.com/thanos-io/thanos/issues/3013 Also: * Fixed other quite big issue with reusing chunk slice. * Fixed framing - previously it was wrongly sending single-chunk frames, taking huge amount of time. Fix: We deletage closer now to ensure multitsdb operate on valid data. Signed-off-by: Bartlomiej Plotka # Conflicts: # pkg/store/tsdb_test.go # pkg/testutil/testutil.go --- pkg/store/bucket_test.go | 21 +- pkg/store/multitsdb.go | 27 +- pkg/store/multitsdb_test.go | 23 +- pkg/store/proxy_test.go | 15 +- pkg/store/storepb/testutil/series.go | 67 ++--- pkg/store/tsdb.go | 57 ++-- pkg/store/tsdb_test.go | 422 ++++++++++++++++++++++++++- pkg/testutil/testutil.go | 28 +- 8 files changed, 566 insertions(+), 94 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 521f33adbc0..6d304e7f6a0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1175,7 +1175,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request var ( logger = log.NewNopLogger() blocks []*bucketBlock - series []storepb.Series + series []*storepb.Series random = rand.New(rand.NewSource(120)) ) @@ -1210,7 +1210,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request // This allows to pick time range that will correspond to number of series picked 1:1. for bi := 0; bi < numOfBlocks; bi++ { head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{ - Dir: tmpDir, + TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)), SamplesPerSeries: samplesPerSeriesPerBlock, Series: seriesPerBlock, PrependLabels: extLset, @@ -1533,17 +1533,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { } // Create TSDB blocks. - opts := storetestutil.HeadGenOptions{ - Dir: tmpDir, + head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ + TSDBDir: filepath.Join(tmpDir, "0"), SamplesPerSeries: 1, Series: 2, PrependLabels: extLset, Random: random, - } - head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts) + }) block1 := createBlockFromHead(t, bktDir, head) testutil.Ok(t, head.Close()) - head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts) + head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ + TSDBDir: filepath.Join(tmpDir, "1"), + SamplesPerSeries: 1, + Series: 2, + PrependLabels: extLset, + Random: random, + }) block2 := createBlockFromHead(t, bktDir, head2) testutil.Ok(t, head2.Close()) @@ -1607,7 +1612,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), + ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...), ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 4c6f43ce3aa..26f3c593534 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -6,6 +6,7 @@ package store import ( "context" "fmt" + "io" "sync" "github.com/go-kit/kit/log" @@ -13,6 +14,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/runutil" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -96,6 +99,8 @@ type tenantSeriesSetServer struct { err error tenant string + + closers []io.Closer } // TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality. @@ -156,6 +161,18 @@ func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error { } } +func (s *tenantSeriesSetServer) Delegate(closer io.Closer) { + s.closers = append(s.closers, closer) +} + +func (s *tenantSeriesSetServer) Close() error { + var merr tsdb_errors.MultiError + for _, c := range s.closers { + merr.Add(c.Close()) + } + return merr.Err() +} + func (s *tenantSeriesSetServer) Next() (ok bool) { s.cur, ok = <-s.recv return ok @@ -188,6 +205,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri // Each might be quite large (multi chunk long series given by sidecar). respSender, respCh := newCancelableRespChannel(gctx, 10) + var closers []io.Closer g.Go(func() error { // This go routine is responsible for calling store's Series concurrently. Merged results // are passed to respCh and sent concurrently to client (if buffer of 10 have room). @@ -216,6 +234,8 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri defer wg.Done() ss.Series(store, r) }() + + closers = append(closers, ss) seriesSet = append(seriesSet, ss) } @@ -237,7 +257,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri } return nil }) - return g.Wait() + err := g.Wait() + for _, c := range closers { + runutil.CloseWithLogOnErr(s.logger, c, "close tenant series request") + } + return err + } // LabelNames returns all known label names. diff --git a/pkg/store/multitsdb_test.go b/pkg/store/multitsdb_test.go index 0c04490dc5d..876fb815fdd 100644 --- a/pkg/store/multitsdb_test.go +++ b/pkg/store/multitsdb_test.go @@ -91,29 +91,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB } }() for j := range dbs { + tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j)) + head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ - Dir: tmpDir, + TSDBDir: tsdbDir, SamplesPerSeries: samplesPerSeriesPerTSDB, Series: seriesPerTSDB, - WithWAL: true, + WithWAL: !flushToBlocks, Random: random, SkipChunks: t.IsBenchmark(), }) - testutil.Ok(t, head.Close()) - - tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j)) - for i := 0; i < len(created); i++ { - resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i])) + resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i])) } if flushToBlocks { - db, err := tsdb.OpenDBReadOnly(tsdbDir, logger) - testutil.Ok(t, err) - - testutil.Ok(t, db.FlushWAL(tmpDir)) - testutil.Ok(t, db.Close()) + _ = createBlockFromHead(t, tsdbDir, head) } + testutil.Ok(t, head.Close()) db, err := tsdb.OpenDBReadOnly(tsdbDir, logger) testutil.Ok(t, err) @@ -128,7 +123,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs }) - var expected []storepb.Series + var expected []*storepb.Series lastLabels := storepb.Series{} for _, resp := range resps { for _, r := range resp { @@ -140,7 +135,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB continue } lastLabels = x - expected = append(expected, *r.GetSeries()) + expected = append(expected, r.GetSeries()) } } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index ff53f79bfda..005bd7d4e55 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -11,6 +11,7 @@ import ( "math" "math/rand" "os" + "path/filepath" "sort" "testing" "time" @@ -1616,17 +1617,16 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { var resps []*storepb.SeriesResponse head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ - Dir: tmpDir, + TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)), SamplesPerSeries: samplesPerSeriesPerClient, Series: seriesPerClient, - MaxFrameBytes: storetestutil.RemoteReadFrameLimit, Random: random, SkipChunks: t.IsBenchmark(), }) testutil.Ok(t, head.Close()) for i := 0; i < len(created); i++ { - resps = append(resps, storepb.NewSeriesResponse(&created[i])) + resps = append(resps, storepb.NewSeriesResponse(created[i])) } clients[j] = &testClient{ @@ -1647,23 +1647,22 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { } var allResps []*storepb.SeriesResponse - var expected []storepb.Series + var expected []*storepb.Series lastLabels := storepb.Series{} for _, c := range clients { m := c.(*testClient).StoreClient.(*mockedStoreAPI) + // NOTE: Proxy will merge all series with same labels without any frame limit (https://github.com/thanos-io/thanos/issues/2332). for _, r := range m.RespSeries { allResps = append(allResps, r) - // Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332). - // Let's do this here as well. x := storepb.Series{Labels: r.GetSeries().Labels} if x.String() == lastLabels.String() { expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...) continue } lastLabels = x - expected = append(expected, *r.GetSeries()) + expected = append(expected, r.GetSeries()) } } @@ -1700,7 +1699,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { // In this we expect exactly the same response as input. expected = expected[:0] for _, r := range allResps { - expected = append(expected, *r.GetSeries()) + expected = append(expected, r.GetSeries()) } storetestutil.TestServerSeries(t, store, &storetestutil.SeriesCase{ diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index 90e11eee182..fc48c0ad9f5 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "math/rand" + "os" "path/filepath" "runtime" "sort" @@ -39,13 +40,12 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { const RemoteReadFrameLimit = 1048576 type HeadGenOptions struct { - Dir string + TSDBDir string SamplesPerSeries, Series int - MaxFrameBytes int // No limit by default. WithWAL bool PrependLabels labels.Labels - SkipChunks bool + SkipChunks bool // Skips chunks in returned slice (not in generated head!). Random *rand.Rand } @@ -54,22 +54,23 @@ type HeadGenOptions struct { // Returned series list has "ext1"="1" prepended. Each series looks as follows: // {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. // Returned series are frame in same way as remote read would frame them. -func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []storepb.Series) { +func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []*storepb.Series) { if opts.SamplesPerSeries < 1 || opts.Series < 1 { t.Fatal("samples and series has to be 1 or more") } - tsdbDir := filepath.Join(opts.Dir, fmt.Sprintf("%d", j)) - fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, tsdbDir) + fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, opts.TSDBDir) var w *wal.WAL var err error if opts.WithWAL { - w, err = wal.New(nil, nil, filepath.Join(tsdbDir, "wal"), true) + w, err = wal.New(nil, nil, filepath.Join(opts.TSDBDir, "wal"), true) testutil.Ok(t, err) + } else { + testutil.Ok(t, os.MkdirAll(filepath.Join(opts.TSDBDir, "wal"), os.ModePerm)) } - h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil) + h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, opts.TSDBDir, nil, tsdb.DefaultStripeSize, nil) testutil.Ok(t, err) app := h.Appender(context.Background()) @@ -96,31 +97,20 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, var ( lset labels.Labels chunkMetas []chunks.Meta - expected = make([]storepb.Series, 0, opts.Series) - sBytes int + expected = make([]*storepb.Series, 0, opts.Series) ) all := allPostings(t, ir) for all.Next() { testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) - i := 0 sLset := storepb.PromLabelsToLabels(lset) - expected = append(expected, storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)}) + expected = append(expected, &storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)}) if opts.SkipChunks { continue } - lBytes := 0 - for _, l := range sLset { - lBytes += l.Size() - } - sBytes = lBytes - - for { - c := chunkMetas[i] - i++ - + for _, c := range chunkMetas { chEnc, err := chks.Chunk(c.Ref) testutil.Ok(t, err) @@ -129,22 +119,11 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, c.MaxTime = c.MinTime + int64(chEnc.NumSamples()) - 1 } - sBytes += len(chEnc.Bytes()) - expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, storepb.AggrChunk{ MinTime: c.MinTime, MaxTime: c.MaxTime, Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes()}, }) - if i >= len(chunkMetas) { - break - } - - // Compose many frames as remote read (so sidecar StoreAPI) would do if requested by maxFrameBytes. - if opts.MaxFrameBytes > 0 && sBytes >= opts.MaxFrameBytes { - expected = append(expected, storepb.Series{Labels: sLset}) - sBytes = lBytes - } } } testutil.Ok(t, all.Err()) @@ -158,7 +137,7 @@ type SeriesServer struct { ctx context.Context - SeriesSet []storepb.Series + SeriesSet []*storepb.Series Warnings []string HintsSet []*types.Any @@ -178,7 +157,7 @@ func (s *SeriesServer) Send(r *storepb.SeriesResponse) error { } if r.GetSeries() != nil { - s.SeriesSet = append(s.SeriesSet, *r.GetSeries()) + s.SeriesSet = append(s.SeriesSet, r.GetSeries()) return nil } @@ -227,7 +206,7 @@ type SeriesCase struct { Req *storepb.SeriesRequest // Exact expectations are checked only for tests. For benchmarks only length is assured. - ExpectedSeries []storepb.Series + ExpectedSeries []*storepb.Series ExpectedWarnings []string ExpectedHints []hintspb.SeriesResponseHints } @@ -252,10 +231,18 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series }) } - testutil.Equals(t, c.ExpectedSeries[0].Chunks[0], srv.SeriesSet[0].Chunks[0]) - - // This might give unreadable output for millions of series on fail.. - testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet) + // Huge responses can produce unreadable diffs - make it more human readable. + if len(c.ExpectedSeries) > 4 { + for j := range c.ExpectedSeries { + testutil.Equals(t, c.ExpectedSeries[j].Labels, srv.SeriesSet[j].Labels, "%v series chunks mismatch", j) + if len(c.ExpectedSeries[j].Chunks) > 20 { + testutil.Equals(t, len(c.ExpectedSeries[j].Chunks), len(srv.SeriesSet[j].Chunks), "%v series chunks number mismatch", j) + } + testutil.Equals(t, c.ExpectedSeries[j].Chunks, srv.SeriesSet[j].Chunks, "%v series chunks mismatch", j) + } + } else { + testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet) + } var actualHints []hintspb.SeriesResponseHints for _, anyHints := range srv.HintsSet { diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index cbc5c7a5e14..3061f2ec3f2 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -5,6 +5,7 @@ package store import ( "context" + "io" "math" "sort" @@ -50,11 +51,13 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com logger = log.NewNopLogger() } return &TSDBStore{ - logger: logger, - db: db, - component: component, - externalLabels: externalLabels, - maxBytesPerFrame: 1024 * 1024, // 1MB as recommended by gRPC. + logger: logger, + db: db, + component: component, + externalLabels: externalLabels, + // TODO(bwplotka): Consider 1MB as recommended by gRPC. Initial benchmarks not involving + // networking and marshaling suggests no framing being a better option. + maxBytesPerFrame: math.MaxInt64, } } @@ -89,6 +92,12 @@ func (s *TSDBStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.In return res, nil } +// CloseDelegator allows to delegate close (releasing resources used by request to the server). +// This is useful when we invoke StoreAPI within another StoreAPI and results are ephemeral until copied. +type CloseDelegator interface { + Delegate(io.Closer) +} + // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { @@ -114,30 +123,33 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer if err != nil { return status.Error(codes.Internal, err.Error()) } - defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier series") - var ( - set = q.Select(false, nil, matchers...) - respSeries storepb.Series - ) + if cd, ok := srv.(CloseDelegator); ok { + cd.Delegate(q) + } else { + defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb chunk querier series") + } + + set := q.Select(false, nil, matchers...) // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) - respSeries.Chunks = respSeries.Chunks[:0] + seriesLabels := storepb.Series{Labels: s.translateAndExtendLabels(series.Labels(), s.externalLabels)} if r.SkipChunks { - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + if err := srv.Send(storepb.NewSeriesResponse(&seriesLabels)); err != nil { return status.Error(codes.Aborted, err.Error()) } continue } - frameBytesLeft := s.maxBytesPerFrame - for _, lbl := range respSeries.Labels { - frameBytesLeft -= lbl.Size() + bytesLeftForChunks := s.maxBytesPerFrame + for _, lbl := range seriesLabels.Labels { + bytesLeftForChunks -= lbl.Size() } + frameBytesLeft := bytesLeftForChunks + seriesChunks := []storepb.AggrChunk{} chIter := series.Iterator() isNext := chIter.Next() for isNext { @@ -146,15 +158,16 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) } - respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{ + c := storepb.AggrChunk{ MinTime: chk.MinTime, MaxTime: chk.MaxTime, Raw: &storepb.Chunk{ Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one. Data: chk.Chunk.Bytes(), }, - }) - frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size() + } + frameBytesLeft -= c.Size() + seriesChunks = append(seriesChunks, c) // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. isNext = chIter.Next() @@ -162,10 +175,12 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer continue } - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + if err := srv.Send(storepb.NewSeriesResponse(&storepb.Series{Labels: seriesLabels.Labels, Chunks: seriesChunks})); err != nil { return status.Error(codes.Aborted, err.Error()) } - respSeries.Chunks = respSeries.Chunks[:0] + frameBytesLeft = bytesLeftForChunks + // TODO(bwplotka): Consider preallocating & benchmark. + seriesChunks = []storepb.AggrChunk{} } if err := chIter.Err(); err != nil { return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error()) diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index cf4413492d5..83fcb2a9899 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -5,15 +5,23 @@ package store import ( "context" + "fmt" + "io" + "io/ioutil" "math" + "math/rand" + "os" + "sort" "testing" "time" + "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" - + "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -395,3 +403,415 @@ func TestTSDBStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T) { return tsdbStore }) } + +type delegatorServer struct { + *storetestutil.SeriesServer + + closers []io.Closer +} + +func (s *delegatorServer) Delegate(c io.Closer) { + s.closers = append(s.closers, c) +} + +// Regression test for: https://github.com/thanos-io/thanos/issues/3013 . +func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + var ( + random = rand.New(rand.NewSource(120)) + logger = log.NewNopLogger() + ) + + // Generate one series in two parts. Put first part in block, second in just WAL. + head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: 300, + Series: 2, + Random: random, + SkipChunks: true, + }) + _ = createBlockFromHead(t, tmpDir, head) + testutil.Ok(t, head.Close()) + + head, _ = storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: 300, + Series: 2, + WithWAL: true, + Random: random, + SkipChunks: true, + }) + testutil.Ok(t, head.Close()) + + db, err := tsdb.OpenDBReadOnly(tmpDir, logger) + testutil.Ok(t, err) + t.Cleanup(func() { + if db != nil { + testutil.Ok(t, db.Close()) + } + }) + + extLabels := labels.FromStrings("ext", "1") + store := NewTSDBStore(logger, nil, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + + srv := storetestutil.NewSeriesServer(context.Background()) + csrv := &delegatorServer{SeriesServer: srv} + t.Run("call series and access results", func(t *testing.T) { + testutil.Ok(t, store.Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, csrv)) + testutil.Equals(t, 0, len(srv.Warnings)) + testutil.Equals(t, 0, len(srv.HintsSet)) + testutil.Equals(t, 4, len(srv.SeriesSet)) + + // All chunks should be accessible for read, but not necessarily for write. + for _, s := range srv.SeriesSet { + testutil.Equals(t, 3, len(s.Chunks)) + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + } + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) + + flushDone := make(chan struct{}) + t.Run("flush WAL and access results", func(t *testing.T) { + go func() { + // This should block until all queries are closed. + testutil.Ok(t, db.FlushWAL(tmpDir)) + close(flushDone) + }() + // All chunks should be still accessible for read, but not necessarily for write. + for _, s := range srv.SeriesSet { + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + } + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) + select { + case _, ok := <-flushDone: + if !ok { + t.Fatal("expected flush to be blocked, but it seems it completed.") + } + default: + } + + closeDone := make(chan struct{}) + t.Run("close db with block readers and access results", func(t *testing.T) { + go func() { + // This should block until all queries are closed. + testutil.Ok(t, db.Close()) + db = nil + close(closeDone) + }() + // All chunks should be still accessible for read, but not necessarily for write. + for _, s := range srv.SeriesSet { + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + } + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) + select { + case _, ok := <-closeDone: + if !ok { + t.Fatal("expected db to be closed, but it seems it completed.") + } + default: + } + + t.Run("close querier and access results", func(t *testing.T) { + // Let's close pending querier! + testutil.Equals(t, 1, len(csrv.closers)) + testutil.Ok(t, csrv.closers[0].Close()) + + // Expect flush and close to be unblocked. + <-flushDone + <-closeDone + + // Expect segfault on read and write. + t.Run("non delegatable", func(t *testing.T) { + for _, s := range srv.SeriesSet { + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + _ = string(s.Chunks[0].Raw.Data) // Access bytes by converting them to different type. + _ = string(s.Chunks[1].Raw.Data) + _ = string(s.Chunks[2].Raw.Data) + })) + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) + }) +} + +func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + var ( + random = rand.New(rand.NewSource(120)) + logger = log.NewNopLogger() + ) + + // Generate one series in two parts. Put first part in block, second in just WAL. + head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: 300, + Series: 2, + Random: random, + SkipChunks: true, + }) + _ = createBlockFromHead(t, tmpDir, head) + testutil.Ok(t, head.Close()) + + head, _ = storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: 300, + Series: 2, + WithWAL: true, + Random: random, + SkipChunks: true, + }) + testutil.Ok(t, head.Close()) + + db, err := tsdb.OpenDBReadOnly(tmpDir, logger) + testutil.Ok(t, err) + t.Cleanup(func() { + if db != nil { + testutil.Ok(t, db.Close()) + } + }) + + extLabels := labels.FromStrings("ext", "1") + store := NewTSDBStore(logger, nil, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + + srv := storetestutil.NewSeriesServer(context.Background()) + t.Run("call series and access results", func(t *testing.T) { + testutil.Ok(t, store.Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, srv)) + testutil.Equals(t, 0, len(srv.Warnings)) + testutil.Equals(t, 0, len(srv.HintsSet)) + testutil.Equals(t, 4, len(srv.SeriesSet)) + + // All chunks should be accessible for read, but not necessarily for write. + for _, s := range srv.SeriesSet { + testutil.Equals(t, 3, len(s.Chunks)) + for _, c := range s.Chunks { + testutil.Ok(t, testutil.FaultOrPanicToErr(func() { + _ = string(c.Raw.Data) // Access bytes by converting them to different type. + })) + } + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) + + t.Run("flush WAL and access results", func(t *testing.T) { + // This should NOT block as close was not delegated. + testutil.Ok(t, db.FlushWAL(tmpDir)) + + // Expect segfault on read and write. + for _, s := range srv.SeriesSet { + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + _ = string(s.Chunks[0].Raw.Data) // Access bytes by converting them to different type. + _ = string(s.Chunks[1].Raw.Data) + _ = string(s.Chunks[2].Raw.Data) + })) + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) + t.Run("close db with block readers and access results", func(t *testing.T) { + // This should NOT block as close was not delegated. + testutil.Ok(t, db.Close()) + db = nil + + // Expect segfault on read and write. + for _, s := range srv.SeriesSet { + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + _ = string(s.Chunks[0].Raw.Data) // Access bytes by converting them to different type. + _ = string(s.Chunks[1].Raw.Data) + _ = string(s.Chunks[2].Raw.Data) + })) + testutil.NotOk(t, testutil.FaultOrPanicToErr(func() { + s.Chunks[0].Raw.Data[0] = 0 // Check if we can write to the byte range. + s.Chunks[1].Raw.Data[0] = 0 + s.Chunks[2].Raw.Data[0] = 0 + })) + } + }) +} + +func TestTSDBStoreSeries(t *testing.T) { + tb := testutil.NewTB(t) + // Make sure there are more samples, so we can check framing code. + storetestutil.RunSeriesInterestingCases(tb, 10e6, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchTSDBStoreSeries(t, samplesPerSeries, series) + }) +} + +func BenchmarkTSDBStoreSeries(b *testing.B) { + tb := testutil.NewTB(b) + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + benchTSDBStoreSeries(t, samplesPerSeries, series) + }) +} + +func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { + tmpDir, err := ioutil.TempDir("", "testorbench-testtsdbseries") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + // This means 3 blocks and the head. + const numOfBlocks = 4 + + samplesPerSeriesPerBlock := totalSamples / numOfBlocks + if samplesPerSeriesPerBlock == 0 { + samplesPerSeriesPerBlock = 1 + } + seriesPerBlock := totalSeries / numOfBlocks + if seriesPerBlock == 0 { + seriesPerBlock = 1 + } + + var ( + resps = make([][]*storepb.SeriesResponse, 4) + random = rand.New(rand.NewSource(120)) + logger = log.NewNopLogger() + ) + + for j := 0; j < 3; j++ { + head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerBlock, + Series: seriesPerBlock, + Random: random, + SkipChunks: t.IsBenchmark(), + }) + for i := 0; i < len(created); i++ { + resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i])) + } + + _ = createBlockFromHead(t, tmpDir, head) + t.Cleanup(func() { + testutil.Ok(t, head.Close()) + }) + + } + + head2, created := storetestutil.CreateHeadWithSeries(t, 3, storetestutil.HeadGenOptions{ + TSDBDir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerBlock, + Series: seriesPerBlock, + WithWAL: true, + Random: random, + SkipChunks: t.IsBenchmark(), + }) + t.Cleanup(func() { + testutil.Ok(t, head2.Close()) + }) + + for i := 0; i < len(created); i++ { + resps[3] = append(resps[3], storepb.NewSeriesResponse(created[i])) + } + + db, err := tsdb.OpenDBReadOnly(tmpDir, logger) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, db.Close()) }() + + extLabels := labels.FromStrings("ext", "1") + store := NewTSDBStore(logger, nil, &mockedStartTimeDB{DBReadOnly: db, startTime: 0}, component.Receive, extLabels) + + var expected []*storepb.Series + for _, resp := range resps { + for _, r := range resp { + // Add external labels. + s := r.GetSeries() + + lbls := make([]storepb.Label, 0, len(s.Labels)+len(extLabels)) + for _, l := range s.Labels { + lbls = append(lbls, storepb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + for _, l := range extLabels { + lbls = append(lbls, storepb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Slice(lbls, func(i, j int) bool { + return lbls[i].Name < lbls[j].Name + }) + + s.Labels = lbls + expected = append(expected, s) + } + } + + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%d blocks and one WAL with %d samples, %d series each", numOfBlocks-1, samplesPerSeriesPerBlock, seriesPerBlock), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + ExpectedSeries: expected, + }, + ) +} diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index d98e02f60f1..e33aad00485 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -8,9 +8,11 @@ import ( "path/filepath" "reflect" "runtime" + "runtime/debug" "testing" "github.com/davecgh/go-spew/spew" + "github.com/pkg/errors" "github.com/pmezard/go-difflib/difflib" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -74,7 +76,15 @@ func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d:"+msg+"\n\n\texp: %#v\n\n\tgot: %#v%s\033[39m\n\n", filepath.Base(file), line, exp, act, diff(exp, act)) + tb.Fatal(sprintfWithLimit("\033[31m%s:%d:"+msg+"\n\n\texp: %#v\n\n\tgot: %#v%s\033[39m\n\n", filepath.Base(file), line, exp, act, diff(exp, act))) +} + +func sprintfWithLimit(act string, v ...interface{}) string { + s := fmt.Sprintf(act, v...) + if len(s) > 1000 { + return s[:1000] + "...(output trimmed)" + } + return s } func typeAndKind(v interface{}) (reflect.Type, reflect.Kind) { @@ -177,3 +187,19 @@ func TolerantVerifyLeak(t *testing.T) { goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"), ) } + +// FaultOrPanicToErr returns error if panic of fault was triggered during execution of function. +func FaultOrPanicToErr(f func()) (err error) { + // Set this go routine to panic on segfault to allow asserting on those. + debug.SetPanicOnFault(true) + defer func() { + if r := recover(); r != nil { + err = errors.Errorf("invoked function panicked or caused segmentation fault: %v", r) + } + debug.SetPanicOnFault(false) + }() + + f() + + return err +}