Skip to content

Commit

Permalink
pkg/store: Merge SeriesSets of multiple TSDB stores
Browse files Browse the repository at this point in the history
This is required as the Series gRPC method of the StoreAPI requires the
Series returned to be sorted.

Signed-off-by: Frederic Branczyk <[email protected]>
  • Loading branch information
brancz committed Apr 6, 2020
1 parent e37610c commit d797dd0
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 10 deletions.
8 changes: 4 additions & 4 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func (t *MultiTSDB) openTSDBs() error {
return g.Wait()
}

func (t *MultiTSDB) TSDBStores() []*store.TSDBStore {
func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
t.mtx.RLock()
res := make([]*store.TSDBStore, 0, len(t.stores))
for _, s := range t.stores {
res = append(res, s)
res := make(map[string]*store.TSDBStore, len(t.stores))
for k, v := range t.stores {
res[k] = v
}
defer t.mtx.RUnlock()
return res
Expand Down
147 changes: 141 additions & 6 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@ package store

import (
"context"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type MultiTSDBStore struct {
logger log.Logger
component component.SourceStoreAPI
tsdbStores func() []*TSDBStore
tsdbStores func() map[string]*TSDBStore
}

// NewMultiTSDBStore creates a new TSDBStore.
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() []*TSDBStore) *MultiTSDBStore {
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -72,16 +81,142 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s
return resp, nil
}

type seriesSetServer struct {
grpc.ServerStream

ctx context.Context

warnCh warnSender
recv chan *storepb.Series
cur *storepb.Series

errMtx *sync.Mutex
err error
}

func newSeriesSetServer(
ctx context.Context,
warnCh warnSender,
) *seriesSetServer {
return &seriesSetServer{
ctx: ctx,
warnCh: warnCh,
recv: make(chan *storepb.Series),
errMtx: &sync.Mutex{},
}
}

func (s *seriesSetServer) Context() context.Context {
return s.ctx
}

func (s *seriesSetServer) Run(store *TSDBStore, r *storepb.SeriesRequest) {
err := store.Series(r, s)
if err != nil {
if r.PartialResponseDisabled {
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
} else {
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
}
}

close(s.recv)
}

func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error {
series := r.GetSeries()
chunks := make([]storepb.AggrChunk, len(series.Chunks))
copy(chunks, series.Chunks)
s.recv <- &storepb.Series{
Labels: series.Labels,
Chunks: chunks,
}
return nil
}

func (s *seriesSetServer) Next() (ok bool) {
s.cur, ok = <-s.recv
return ok
}

func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
if s.cur == nil {
return nil, nil
}
return s.cur.Labels, s.cur.Chunks
}

func (s *seriesSetServer) Err() error {
s.errMtx.Lock()
defer s.errMtx.Unlock()
return s.err
}

// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
stores := s.tsdbStores()
for _, store := range stores {
err := store.Series(r, srv)
if err != nil {
return err
if len(stores) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)

g.Go(func() error {
var (
seriesSet []storepb.SeriesSet
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
}()

for tenant, store := range stores {
store := store
seriesCtx, closeSeries := context.WithCancel(gctx)
seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{
"tenant": tenant,
})
defer closeSeries()
ss := newSeriesSetServer(seriesCtx, respSender)
wg.Add(1)
go func() {
defer wg.Done()
ss.Run(store, r)
}()

seriesSet = append(seriesSet, ss)
}

mergedSet := storepb.MergeSeriesSets(seriesSet...)
for mergedSet.Next() {
var series storepb.Series
series.Labels, series.Chunks = mergedSet.At()
respSender.send(storepb.NewSeriesResponse(&series))
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}

if err := g.Wait(); err != nil {
level.Error(s.logger).Log("err", err)
return err
}
return nil
}

Expand Down

0 comments on commit d797dd0

Please sign in to comment.