Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Add support for TSDB per tenant #2012

Merged
merged 9 commits into from
May 4, 2020
53 changes: 31 additions & 22 deletions pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package store

import (
"context"
"fmt"
"sync"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -51,10 +52,10 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s
}

infos := make([]*storepb.InfoResponse, 0, len(stores))
for _, store := range stores {
for tenant, store := range stores {
info, err := store.Info(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "get info for tenant %s", tenant)
}
infos = append(infos, info)
}
Expand All @@ -81,7 +82,7 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s
return resp, nil
}

type seriesSetServer struct {
type tenantSeriesSetServer struct {
grpc.ServerStream

ctx context.Context
Expand All @@ -90,27 +91,31 @@ type seriesSetServer struct {
recv chan *storepb.Series
cur *storepb.Series

err error
err error
tenant string
}

func newSeriesSetServer(
func newTenantSeriesSetServer(
ctx context.Context,
tenant string,
warnCh warnSender,
) *seriesSetServer {
return &seriesSetServer{
) *tenantSeriesSetServer {
return &tenantSeriesSetServer{
ctx: ctx,
tenant: tenant,
warnCh: warnCh,
recv: make(chan *storepb.Series),
}
}

func (s *seriesSetServer) Context() context.Context {
func (s *tenantSeriesSetServer) Context() context.Context {
return s.ctx
}

func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
err := store.Series(r, s)
if err != nil {
err = errors.Wrapf(s.err, "get series for tenant %s", s.tenant)
if r.PartialResponseDisabled {
s.err = err
} else {
Expand All @@ -121,7 +126,7 @@ func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
close(s.recv)
}

func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error {
func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
series := r.GetSeries()
chunks := make([]storepb.AggrChunk, len(series.Chunks))
copy(chunks, series.Chunks)
Expand All @@ -132,19 +137,19 @@ func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error {
return nil
}

func (s *seriesSetServer) Next() (ok bool) {
func (s *tenantSeriesSetServer) Next() (ok bool) {
s.cur, ok = <-s.recv
return ok
}

func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
if s.cur == nil {
return nil, nil
}
return s.cur.Labels, s.cur.Chunks
}

func (s *seriesSetServer) Err() error {
func (s *tenantSeriesSetServer) Err() error {
return s.err
}

Expand Down Expand Up @@ -178,12 +183,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri

for tenant, store := range stores {
store := store
seriesCtx, closeSeries := context.WithCancel(gctx)
seriesCtx, cancelSeries := context.WithCancel(gctx)
seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{
"tenant": tenant,
})
defer closeSeries()
ss := newSeriesSetServer(seriesCtx, respSender)
defer cancelSeries()
ss := newTenantSeriesSetServer(seriesCtx, tenant, respSender)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about warnings on series? are those wrapped with prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it was already the case, but slightly inconsistent (just generically wrapped the error and used the same in warnings and error handling, but that's not how we prefix other warnings in other places, so will do that instead)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -217,18 +222,18 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames
warnings := map[string]struct{}{}

stores := s.tsdbStores()
for _, store := range stores {
for tenant, store := range stores {
r, err := store.LabelNames(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "get label names for tenant %s", tenant)
}

for _, l := range r.Names {
names[l] = struct{}{}
}

for _, l := range r.Warnings {
warnings[l] = struct{}{}
warnings[prefixTenantWarning(tenant, l)] = struct{}{}
}
}

Expand All @@ -238,6 +243,10 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames
}, nil
}

func prefixTenantWarning(tenant, s string) string {
return fmt.Sprintf("[%s] %s", tenant, s)
}

func keys(m map[string]struct{}) []string {
res := make([]string, 0, len(m))
for k := range m {
Expand All @@ -253,18 +262,18 @@ func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValu
warnings := map[string]struct{}{}

stores := s.tsdbStores()
for _, store := range stores {
for tenant, store := range stores {
r, err := store.LabelValues(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "get label values for tenant %s", tenant)
}

for _, l := range r.Values {
values[l] = struct{}{}
}

for _, l := range r.Warnings {
warnings[l] = struct{}{}
warnings[prefixTenantWarning(tenant, l)] = struct{}{}
}
}

Expand Down