Skip to content

Commit

Permalink
pkg/store: Merge SeriesSets of multiple TSDB stores
Browse files Browse the repository at this point in the history
This is required as the Series gRPC method of the StoreAPI requires the
Series returned to be sorted.

Signed-off-by: Frederic Branczyk <[email protected]>
  • Loading branch information
brancz committed Mar 31, 2020
1 parent 2713e07 commit 3954d5f
Showing 1 changed file with 129 additions and 4 deletions.
133 changes: 129 additions & 4 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ package store

import (
"context"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type MultiTSDBStore struct {
Expand Down Expand Up @@ -72,15 +79,133 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s
return resp, nil
}

type seriesServerSeriesSet struct {
grpc.ServerStream

ctx context.Context
logger log.Logger

warnCh warnSender
recv chan *storepb.SeriesResponse
cur *storepb.SeriesResponse

errMtx *sync.Mutex
err error
}

func newSeriesSetServer(
ctx context.Context,
logger log.Logger,
warnCh warnSender,
) *seriesServerSeriesSet {
return &seriesServerSeriesSet{
ctx: ctx,
logger: logger,
warnCh: warnCh,
recv: make(chan *storepb.SeriesResponse),
errMtx: &sync.Mutex{},
}
}

func (s *seriesServerSeriesSet) Run(store *TSDBStore, r *storepb.SeriesRequest) {
defer close(s.recv)
err := store.Series(r, s)
if err != nil {
if r.PartialResponseDisabled {
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
return
}
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
}
}

func (s *seriesServerSeriesSet) Send(r *storepb.SeriesResponse) error {
s.recv <- r
return nil
}

func (s *seriesServerSeriesSet) Next() bool {
var ok bool
s.cur, ok = <-s.recv
if w := s.cur.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
}
return ok
}

func (s *seriesServerSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
if s.cur == nil {
return nil, nil
}
return s.cur.GetSeries().Labels, s.cur.GetSeries().Chunks
}

func (s *seriesServerSeriesSet) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
return s.err
}

// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
stores := s.tsdbStores()
for _, store := range stores {
err := store.Series(r, srv)
if err != nil {
return err
if len(stores) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)

g.Go(func() error {
var (
seriesSet []storepb.SeriesSet
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
}()

for _, store := range stores {
seriesCtx, closeSeries := context.WithCancel(gctx)
defer closeSeries()
ss := newSeriesSetServer(seriesCtx, s.logger, respSender)
wg.Add(1)
go func() {
defer wg.Done()
ss.Run(store, r)
}()

seriesSet = append(seriesSet, ss)
}

mergedSet := storepb.MergeSeriesSets(seriesSet...)
for mergedSet.Next() {
var series storepb.Series
series.Labels, series.Chunks = mergedSet.At()
respSender.send(storepb.NewSeriesResponse(&series))
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}

if err := g.Wait(); err != nil {
level.Error(s.logger).Log("err", err)
return err
}
return nil
}
Expand Down

0 comments on commit 3954d5f

Please sign in to comment.