From e37610c86d58283ced639242c1b1560ee1c6f099 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 30 Aug 2019 18:26:54 +0200 Subject: [PATCH 1/9] receive: Add support for TSDB per tenant Signed-off-by: Frederic Branczyk --- Makefile | 2 +- cmd/thanos/receive.go | 42 ++++--- go.sum | 3 + pkg/receive/handler.go | 65 ++++++----- pkg/receive/handler_test.go | 16 +-- pkg/receive/multitsdb.go | 214 ++++++++++++++++++++++++++++++++++++ pkg/receive/writer.go | 42 +++++-- pkg/server/grpc/grpc.go | 2 +- pkg/store/multitsdb.go | 149 +++++++++++++++++++++++++ scripts/quickstart.sh | 54 ++++++--- test/e2e/query_test.go | 2 + test/e2e/receive_test.go | 10 +- 12 files changed, 518 insertions(+), 83 deletions(-) create mode 100644 pkg/receive/multitsdb.go create mode 100644 pkg/store/multitsdb.go diff --git a/Makefile b/Makefile index 0376f310bc..df832d9be6 100644 --- a/Makefile +++ b/Makefile @@ -252,7 +252,7 @@ test-e2e: docker @echo ">> running /test/e2e tests." # NOTE(bwplotka): # * If you see errors on CI (timeouts), but not locally, try to add -parallel 1 to limit to single CPU to reproduce small 1CPU machine. - @go test -failfast -timeout 10m -v ./test/e2e/... + @go test -failfast -timeout 10m $(RUN_ARGS) -v ./test/e2e/... .PHONY: install-deps install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests. diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 2296e4649e..e0e3780e8c 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -72,6 +72,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String() + defaultTenantID := cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(receive.DefaultTenant).String() + + tenantLabelName := cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).String() + replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).String() replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() @@ -144,6 +148,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { cw, *local, *tenantHeader, + *defaultTenantID, + *tenantLabelName, *replicaHeader, *replicationFactor, comp, @@ -179,6 +185,8 @@ func runReceive( cw *receive.ConfigWatcher, endpoint string, tenantHeader string, + defaultTenantID string, + tenantLabelName string, replicaHeader string, replicationFactor uint64, comp component.SourceStoreAPI, @@ -186,7 +194,6 @@ func runReceive( logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") - localStorage := &tsdb.ReadyStorage{} rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA) if err != nil { return err @@ -196,11 +203,15 @@ func runReceive( return err } + dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName) + writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ + Writer: writer, ListenAddress: rwAddress, Registry: reg, Endpoint: endpoint, TenantHeader: tenantHeader, + DefaultTenantID: defaultTenantID, ReplicaHeader: replicaHeader, ReplicationFactor: replicationFactor, Tracer: tracer, @@ -250,24 +261,13 @@ func runReceive( { // TSDB. cancel := make(chan struct{}) - startTimeMargin := int64(2 * time.Duration(tsdbOpts.MinBlockDuration).Seconds() * 1000) g.Add(func() error { defer close(dbReady) defer close(uploadC) - // Before actually starting, we need to make sure the - // WAL is flushed. The WAL is flushed after the - // hashring is loaded. - db := receive.NewFlushableStorage( - dataDir, - log.With(logger, "component", "tsdb"), - reg, - tsdbOpts, - ) - // Before quitting, ensure the WAL is flushed and the DB is closed. defer func() { - if err := db.Flush(); err != nil { + if err := dbs.Flush(); err != nil { level.Warn(logger).Log("err", err, "msg", "failed to flush storage") } }() @@ -283,10 +283,10 @@ func runReceive( level.Info(logger).Log("msg", "updating DB") - if err := db.Flush(); err != nil { + if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") } - if err := db.Open(); err != nil { + if err := dbs.Open(); err != nil { return errors.Wrap(err, "opening storage") } if upload { @@ -294,8 +294,6 @@ func runReceive( <-uploadDone } level.Info(logger).Log("msg", "tsdb started") - localStorage.Set(db.Get(), startTimeMargin) - webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage)) statusProber.Ready() level.Info(logger).Log("msg", "server is ready to receive web requests") dbReady <- struct{}{} @@ -303,8 +301,7 @@ func runReceive( } }, func(err error) { close(cancel) - }, - ) + }) } level.Debug(logger).Log("msg", "setting up hashring") @@ -349,7 +346,6 @@ func runReceive( if !ok { return nil } - webHandler.SetWriter(nil) webHandler.Hashring(h) msg := "hashring has changed; server is not ready to receive web requests." statusProber.NotReady(errors.New(msg)) @@ -397,9 +393,10 @@ func runReceive( if s != nil { s.Shutdown(errors.New("reload hashrings")) } - tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset) + + multiStore := store.NewMultiTSDBStore(logger, reg, comp, dbs.TSDBStores) rw := store.ReadWriteTSDBStore{ - StoreServer: tsdbStore, + StoreServer: multiStore, WriteableStoreServer: webHandler, } @@ -419,6 +416,7 @@ func runReceive( // whenever the DB changes, thus it needs its own run group. g.Add(func() error { for range startGRPC { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", grpcBindAddr) if err := s.ListenAndServe(); err != nil { return errors.Wrap(err, "serve gRPC") } diff --git a/go.sum b/go.sum index 80a19ed222..b76c14ebdc 100644 --- a/go.sum +++ b/go.sum @@ -649,8 +649,10 @@ github.com/sercand/kuberesolver v2.1.0+incompatible h1:iJ1oCzPQ/aacsbCWLfJW1hPKk github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= +github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -663,6 +665,7 @@ github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUr github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index fa31ae58d7..45de62ffa0 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "google.golang.org/grpc" @@ -40,6 +41,10 @@ import ( const ( // DefaultTenantHeader is the default header used to designate the tenant making a write request. DefaultTenantHeader = "THANOS-TENANT" + // DefaultTenant is the default value used for when no tenant is passed via the tenant header. + DefaultTenant = "default-tenant" + // DefaultTenantLabel is the default label-name used for when no tenant is passed via the tenant header. + DefaultTenantLabel = "tenant_id" // DefaultReplicaHeader is the default header used to designate the replica count of a write request. DefaultReplicaHeader = "THANOS-REPLICA" ) @@ -55,6 +60,7 @@ type Options struct { ListenAddress string Registry prometheus.Registerer TenantHeader string + DefaultTenantID string ReplicaHeader string Endpoint string ReplicationFactor uint64 @@ -116,16 +122,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { return h } -// SetWriter sets the writer. -// The writer must be set to a non-nil value in order for the -// handler to be ready and usable. -// If the writer is nil, then the handler is marked as not ready. -func (h *Handler) SetWriter(w *Writer) { - h.mtx.Lock() - defer h.mtx.Unlock() - h.writer = w -} - // Hashring sets the hashring for the handler and marks the hashring as ready. // The hashring must be set to a non-nil value in order for the // handler to be ready and usable. @@ -266,11 +262,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } tenant := r.Header.Get(h.options.TenantHeader) + if len(tenant) == 0 { + tenant = h.options.DefaultTenantID + } err = h.handleRequest(r.Context(), rep, tenant, &wreq) switch err { case nil: return + case tsdb.ErrNotReady: + http.Error(w, err.Error(), http.StatusServiceUnavailable) case conflictErr: http.Error(w, err.Error(), http.StatusConflict) case errBadReplica: @@ -358,26 +359,22 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic if endpoint == h.options.Endpoint { go func(endpoint string) { var err error - h.mtx.RLock() - if h.writer == nil { - err = errors.New("storage is not ready") - } else { - // Create a span to track writing the request into TSDB. - tracing.DoInSpan(ctx, "receive_tsdb_write", func(ctx context.Context) { - - err = h.writer.Write(wreqs[endpoint]) - }) - // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. - // To avoid breaking the counting logic, we need to flatten the error. - if errs, ok := err.(terrors.MultiError); ok { - if countCause(errs, isConflict) > 0 { - err = errors.Wrap(conflictErr, errs.Error()) - } else { - err = errors.New(errs.Error()) - } + + tracing.DoInSpan(ctx, "receive_tsdb_write", func(ctx context.Context) { + err = h.writer.Write(tenant, wreqs[endpoint]) + }) + + // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. + // To avoid breaking the counting logic, we need to flatten the error. + if errs, ok := err.(terrors.MultiError); ok { + if countCause(errs, isConflict) > 0 { + err = errors.Wrap(conflictErr, errs.Error()) + } else if countCause(errs, isNotReady) > 0 { + err = tsdb.ErrNotReady + } else { + err = errors.New(errs.Error()) } } - h.mtx.RUnlock() if err != nil { level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint) } @@ -469,6 +466,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) if errs, ok := err.(terrors.MultiError); ok { + if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 { + return tsdb.ErrNotReady + } if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 { return errors.Wrap(conflictErr, "did not meet replication threshold") } @@ -486,6 +486,8 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st switch err { case nil: return &storepb.WriteResponse{}, nil + case tsdb.ErrNotReady: + return nil, status.Error(codes.Unavailable, err.Error()) case conflictErr: return nil, status.Error(codes.AlreadyExists, err.Error()) case errBadReplica: @@ -526,6 +528,13 @@ func isConflict(err error) bool { status.Code(err) == codes.AlreadyExists } +// isNotReady returns whether or not the given error represents a not ready error. +func isNotReady(err error) bool { + return err == tsdb.ErrNotReady || + err.Error() == strconv.Itoa(http.StatusConflict) || + status.Code(err) == codes.Unavailable +} + func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { return &peerGroup{ dialOpts: dialOpts, diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index f4f39d611b..89d35c572b 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -163,7 +163,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - Writer: NewWriter(log.NewNopLogger(), appendables[i]), + Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) h.peers = peers @@ -479,12 +479,12 @@ func TestReceive(t *testing.T) { // on which node is erroring and which node is receiving. for i, handler := range handlers { // Test that the correct status is returned. - status, err := makeRequest(handler, tenant, tc.wreq) + rec, err := makeRequest(handler, tenant, tc.wreq) if err != nil { t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) } - if status != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d", i, tc.status, status) + if rec.Code != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) } } // Test that each time series is stored @@ -547,14 +547,14 @@ func cycleErrors(errs []error) func() error { } // makeRequest is a helper to make a correct request against a remote write endpoint given a request. -func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, error) { +func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptest.ResponseRecorder, error) { buf, err := proto.Marshal(wreq) if err != nil { - return 0, errors.Wrap(err, "marshal request") + return nil, errors.Wrap(err, "marshal request") } req, err := http.NewRequest("POST", h.options.Endpoint, bytes.NewBuffer(snappy.Encode(nil, buf))) if err != nil { - return 0, errors.Wrap(err, "create request") + return nil, errors.Wrap(err, "create request") } req.Header.Add(h.options.TenantHeader, tenant) @@ -562,7 +562,7 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, err h.receiveHTTP(rec, req) rec.Flush() - return rec.Code, nil + return rec, nil } func randomAddr() string { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go new file mode 100644 index 0000000000..aaa5817022 --- /dev/null +++ b/pkg/receive/multitsdb.go @@ -0,0 +1,214 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "io/ioutil" + "os" + "path" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage/tsdb" + terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store" + "golang.org/x/sync/errgroup" +) + +type MultiTSDB struct { + dataDir string + logger log.Logger + reg prometheus.Registerer + tsdbCfg *tsdb.Options + tenantLabelName string + labels labels.Labels + + mtx *sync.RWMutex + dbs map[string]*FlushableStorage + appendables map[string]*tsdb.ReadyStorage + stores map[string]*store.TSDBStore +} + +func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string) *MultiTSDB { + if l == nil { + l = log.NewNopLogger() + } + + return &MultiTSDB{ + dataDir: dataDir, + logger: l, + reg: reg, + tsdbCfg: tsdbCfg, + mtx: &sync.RWMutex{}, + dbs: map[string]*FlushableStorage{}, + stores: map[string]*store.TSDBStore{}, + appendables: map[string]*tsdb.ReadyStorage{}, + labels: labels, + tenantLabelName: tenantLabelName, + } +} + +func (t *MultiTSDB) Open() error { + if err := os.MkdirAll(t.dataDir, 0777); err != nil { + return err + } + + return t.openTSDBs() +} + +func (t *MultiTSDB) Close() error { + t.mtx.Lock() + defer t.mtx.Unlock() + + errmtx := &sync.Mutex{} + merr := terrors.MultiError{} + wg := &sync.WaitGroup{} + for _, tsdb := range t.dbs { + tsdb := tsdb + wg.Add(1) + go func() { + if err := tsdb.Close(); err != nil { + errmtx.Lock() + merr.Add(err) + errmtx.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + return merr.Err() +} + +func (t *MultiTSDB) Flush() error { + t.mtx.Lock() + defer t.mtx.Unlock() + + errmtx := &sync.Mutex{} + merr := terrors.MultiError{} + wg := &sync.WaitGroup{} + for _, tsdb := range t.dbs { + tsdb := tsdb + wg.Add(1) + go func() { + if err := tsdb.Flush(); err != nil { + errmtx.Lock() + merr.Add(err) + errmtx.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + return merr.Err() +} + +func (t *MultiTSDB) openTSDBs() error { + files, err := ioutil.ReadDir(t.dataDir) + if err != nil { + return err + } + + var g errgroup.Group + for _, f := range files { + // See: https://golang.org/doc/faq#closures_and_goroutines. + f := f + if !f.IsDir() { + continue + } + + g.Go(func() error { + tenantId := f.Name() + _, err := t.getOrLoadTenant(tenantId) + return err + }) + } + + return g.Wait() +} + +func (t *MultiTSDB) TSDBStores() []*store.TSDBStore { + t.mtx.RLock() + res := make([]*store.TSDBStore, 0, len(t.stores)) + for _, s := range t.stores { + res = append(res, s) + } + defer t.mtx.RUnlock() + return res +} + +func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error) { + // Fast path, as creating tenants is a very rare operation. + t.mtx.RLock() + db, exist := t.appendables[tenantID] + t.mtx.RUnlock() + if exist { + return db, nil + } + + // Slow path needs to lock fully and attempt to read again to prevent race + // conditions, where since the fast path was tried, there may have actually + // been the same tenant inserted in the map. + t.mtx.Lock() + db, exist = t.appendables[tenantID] + if exist { + t.mtx.Unlock() + return db, nil + } + + rs := &tsdb.ReadyStorage{} + t.appendables[tenantID] = rs + t.mtx.Unlock() + + go func() { + s := NewFlushableStorage( + path.Join(t.dataDir, tenantID), + log.With(t.logger, "tenant", tenantID), + prometheus.WrapRegistererWith(prometheus.Labels{ + "tenant": tenantID, + }, t.reg), + t.tsdbCfg, + ) + + if err := s.Open(); err != nil { + level.Error(t.logger).Log("msg", "failed to open tsdb", "err", err) + t.mtx.Lock() + delete(t.appendables, tenantID) + delete(t.stores, tenantID) + t.mtx.Unlock() + if err := s.Close(); err != nil { + level.Error(t.logger).Log("msg", "failed to close tsdb", "err", err) + } + return + } + + tstore := store.NewTSDBStore( + log.With(t.logger, "component", "thanos-tsdb-store", "tenant", tenantID), + prometheus.WrapRegistererWith(prometheus.Labels{ + "tenant": tenantID, + }, t.reg), + s.Get(), + component.Receive, + append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}), + ) + + t.mtx.Lock() + rs.Set(s.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) + t.stores[tenantID] = tstore + t.dbs[tenantID] = s + t.mtx.Unlock() + }() + + return rs, nil +} + +func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) { + return t.getOrLoadTenant(tenantID) +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 920e8c1c06..c2781336d8 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -21,29 +22,44 @@ type Appendable interface { Appender() (storage.Appender, error) } +type TenantStorage interface { + TenantAppendable(string) (Appendable, error) +} + type Writer struct { - logger log.Logger - append Appendable + logger log.Logger + multitsdb TenantStorage } -func NewWriter(logger log.Logger, app Appendable) *Writer { +func NewWriter(logger log.Logger, multitsdb TenantStorage) *Writer { return &Writer{ - logger: logger, - append: app, + logger: logger, + multitsdb: multitsdb, } } -func (r *Writer) Write(wreq *prompb.WriteRequest) error { +func (r *Writer) Write(tenantID string, wreq *prompb.WriteRequest) error { var ( numOutOfOrder = 0 numDuplicates = 0 numOutOfBounds = 0 ) - app, err := r.append.Appender() + s, err := r.multitsdb.TenantAppendable(tenantID) + if err != nil { + return errors.Wrap(err, "get tenant appendable") + } + + app, err := s.Appender() + if err == tsdb.ErrNotReady { + return err + } if err != nil { return errors.Wrap(err, "get appender") } + if app == nil { + return errors.New("tsdb not ready yet to be appended to") + } var errs terrors.MultiError for _, t := range wreq.Timeseries { @@ -94,6 +110,18 @@ func (r *Writer) Write(wreq *prompb.WriteRequest) error { return errs.Err() } +type fakeTenantAppendable struct { + f *fakeAppendable +} + +func newFakeTenantAppendable(f *fakeAppendable) *fakeTenantAppendable { + return &fakeTenantAppendable{f: f} +} + +func (t *fakeTenantAppendable) TenantAppendable(tenantID string) (Appendable, error) { + return t.f, nil +} + type fakeAppendable struct { appender storage.Appender appenderErr func() error diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index c8ba582728..ee07d932d4 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -103,7 +103,7 @@ func (s *Server) ListenAndServe() error { } s.listener = l - level.Info(s.logger).Log("msg", "listening for StoreAPI gRPC", "address", s.opts.listen) + level.Info(s.logger).Log("msg", "listening for serving gRPC", "address", s.opts.listen) return errors.Wrap(s.srv.Serve(s.listener), "serve gRPC") } diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go new file mode 100644 index 0000000000..1f7db5065b --- /dev/null +++ b/pkg/store/multitsdb.go @@ -0,0 +1,149 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type MultiTSDBStore struct { + logger log.Logger + component component.SourceStoreAPI + tsdbStores func() []*TSDBStore +} + +// NewMultiTSDBStore creates a new TSDBStore. +func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() []*TSDBStore) *MultiTSDBStore { + if logger == nil { + logger = log.NewNopLogger() + } + return &MultiTSDBStore{ + logger: logger, + component: component, + tsdbStores: tsdbStores, + } +} + +// Info returns store information about the Prometheus instance. +func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storepb.InfoResponse, error) { + stores := s.tsdbStores() + + resp := &storepb.InfoResponse{ + StoreType: s.component.ToProto(), + } + if len(stores) == 0 { + return resp, nil + } + + infos := make([]*storepb.InfoResponse, 0, len(stores)) + for _, store := range stores { + info, err := store.Info(ctx, req) + if err != nil { + return nil, err + } + infos = append(infos, info) + } + + resp.MinTime = infos[0].MinTime + resp.MaxTime = infos[0].MaxTime + + for i := 1; i < len(infos); i++ { + if resp.MinTime > infos[i].MinTime { + resp.MinTime = infos[i].MinTime + } + if resp.MaxTime < infos[i].MaxTime { + resp.MaxTime = infos[i].MaxTime + } + } + + // We can rely on every underlying TSDB to only have one labelset, so this + // will always allocate the correct length immediately. + resp.LabelSets = make([]storepb.LabelSet, 0, len(infos)) + for _, info := range infos { + resp.LabelSets = append(resp.LabelSets, info.LabelSets...) + } + + return resp, nil +} + +// Series returns all series for a requested time range and label matcher. The returned data may +// exceed the requested time bounds. +func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + stores := s.tsdbStores() + for _, store := range stores { + err := store.Series(r, srv) + if err != nil { + return err + } + } + return nil +} + +// LabelNames returns all known label names. +func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + names := map[string]struct{}{} + warnings := map[string]struct{}{} + + stores := s.tsdbStores() + for _, store := range stores { + r, err := store.LabelNames(ctx, req) + if err != nil { + return nil, err + } + + for _, l := range r.Names { + names[l] = struct{}{} + } + + for _, l := range r.Warnings { + warnings[l] = struct{}{} + } + } + + return &storepb.LabelNamesResponse{ + Names: keys(names), + Warnings: keys(warnings), + }, nil +} + +func keys(m map[string]struct{}) []string { + res := make([]string, 0, len(m)) + for k := range m { + res = append(res, k) + } + + return res +} + +// LabelValues returns all known label values for a given label name. +func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + values := map[string]struct{}{} + warnings := map[string]struct{}{} + + stores := s.tsdbStores() + for _, store := range stores { + r, err := store.LabelValues(ctx, req) + if err != nil { + return nil, err + } + + for _, l := range r.Values { + values[l] = struct{}{} + } + + for _, l := range r.Warnings { + warnings[l] = struct{}{} + } + } + + return &storepb.LabelValuesResponse{ + Values: keys(values), + Warnings: keys(warnings), + }, nil +} diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index 898455bc2c..4c78b123b9 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -84,6 +84,9 @@ scrape_configs: static_configs: - targets: - "localhost:909${i}" + - "localhost:5909${i}" + - "localhost:5909${i}" + - "localhost:5909${i}" - job_name: thanos-sidecar scrape_interval: 5s static_configs: @@ -99,6 +102,8 @@ scrape_configs: static_configs: - targets: - "localhost:10909" + - "localhost:11909" + - "localhost:12909" - job_name: thanos-query scrape_interval: 5s static_configs: @@ -163,37 +168,55 @@ fi sleep 0.5 if [ -n "${REMOTE_WRITE_ENABLED}" ]; then + +cat <<-EOF > ./data/hashring.json +[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}] +EOF + +for i in $(seq 0 1 2); do ${THANOS_EXECUTABLE} receive \ - --debug.name receive \ + --debug.name receive${i} \ --log.level debug \ - --tsdb.path "./data/remote-write-receive-data" \ - --grpc-address 0.0.0.0:10907 \ + --tsdb.path "./data/remote-write-receive-${i}-data" \ + --grpc-address 0.0.0.0:1${i}907 \ --grpc-grace-period 1s \ - --http-address 0.0.0.0:10909 \ + --http-address 0.0.0.0:1${i}909 \ --http-grace-period 1s \ - --label "receive=\"true\"" \ + --receive.replication-factor 3 \ + --label "receive_replica=\"${i}\"" \ + --receive.local-endpoint 127.0.0.1:1${i}907 \ + --receive.hashrings-file ./data/hashring.json \ ${OBJSTORECFG} \ - --remote-write.address 0.0.0.0:10908 & + --remote-write.address 0.0.0.0:1${i}908 & + + STORES="${STORES} --store 127.0.0.1:1${i}907" +done - mkdir -p "data/local-prometheus-data/" - cat <data/local-prometheus-data/prometheus.yml +for i in $(seq 0 1 2); do + mkdir -p "data/local-prometheus-${i}-data/" + cat <data/local-prometheus-${i}-data/prometheus.yml +global: + external_labels: + prometheus: prom${i} + replica: 1 # When the Thanos remote-write-receive component is started, # this is an example configuration of a Prometheus server that # would scrape a local node-exporter and replicate its data to # the remote write endpoint. scrape_configs: - - job_name: node + - job_name: test scrape_interval: 1s static_configs: - - targets: ['localhost:9100'] + - targets: + - fake remote_write: -- url: http://localhost:10908/api/v1/receive +- url: http://localhost:1${i}908/api/v1/receive EOF ${PROMETHEUS_EXECUTABLE} \ - --config.file data/local-prometheus-data/prometheus.yml \ - --storage.tsdb.path "data/local-prometheus-data/" & - - STORES="${STORES} --store 127.0.0.1:10907" + --web.listen-address ":5909${i}" \ + --config.file data/local-prometheus-${i}-data/prometheus.yml \ + --storage.tsdb.path "data/local-prometheus-${i}-data/" & +done fi sleep 0.5 @@ -218,6 +241,7 @@ for i in $(seq 0 1); do --http-grace-period 1s \ --query.replica-label prometheus \ --tracing.config="${QUERIER_JAEGER_CONFIG}" \ + --query.replica-label receive_replica \ ${STORES} & done diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 553ae4d602..b61228e0c9 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -112,6 +112,7 @@ func TestQuery(t *testing.T) { "prometheus": "prom-both-remote-write-and-sidecar", "receive": "1", "replica": "1234", + "tenant_id": "default-tenant", }, { "job": "myself", @@ -142,6 +143,7 @@ func TestQuery(t *testing.T) { "job": "myself", "prometheus": "prom-both-remote-write-and-sidecar", "receive": "1", + "tenant_id": "default-tenant", }, { "job": "myself", diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 9c741c9115..b7296efaf8 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -81,18 +81,21 @@ func TestReceive(t *testing.T) { "prometheus": "prom1", "receive": "2", "replica": "0", + "tenant_id": "default-tenant", }, { "job": "myself", "prometheus": "prom2", "receive": "1", "replica": "0", + "tenant_id": "default-tenant", }, { "job": "myself", "prometheus": "prom3", - "receive": "3", + "receive": "2", "replica": "0", + "tenant_id": "default-tenant", }, }) }) @@ -153,18 +156,21 @@ func TestReceive(t *testing.T) { "prometheus": "prom1", "receive": "1", "replica": "0", + "tenant_id": "default-tenant", }, { "job": "myself", "prometheus": "prom1", "receive": "2", "replica": "0", + "tenant_id": "default-tenant", }, { "job": "myself", "prometheus": "prom1", "receive": "3", "replica": "0", + "tenant_id": "default-tenant", }, }) }) @@ -222,12 +228,14 @@ func TestReceive(t *testing.T) { "prometheus": "prom1", "receive": "1", "replica": "0", + "tenant_id": "default-tenant", }, { "job": "myself", "prometheus": "prom1", "receive": "2", "replica": "0", + "tenant_id": "default-tenant", }, }) }) From d797dd07396351fc5154d2b1c5557ee01d7b5556 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 30 Mar 2020 09:46:47 +0200 Subject: [PATCH 2/9] pkg/store: Merge SeriesSets of multiple TSDB stores This is required as the Series gRPC method of the StoreAPI requires the Series returned to be sorted. Signed-off-by: Frederic Branczyk --- pkg/receive/multitsdb.go | 8 +-- pkg/store/multitsdb.go | 147 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 145 insertions(+), 10 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index aaa5817022..13f7bfe1e6 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -134,11 +134,11 @@ func (t *MultiTSDB) openTSDBs() error { return g.Wait() } -func (t *MultiTSDB) TSDBStores() []*store.TSDBStore { +func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore { t.mtx.RLock() - res := make([]*store.TSDBStore, 0, len(t.stores)) - for _, s := range t.stores { - res = append(res, s) + res := make(map[string]*store.TSDBStore, len(t.stores)) + for k, v := range t.stores { + res[k] = v } defer t.mtx.RUnlock() return res diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 1f7db5065b..0f6b1001c6 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -5,21 +5,30 @@ package store import ( "context" + "sync" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type MultiTSDBStore struct { logger log.Logger component component.SourceStoreAPI - tsdbStores func() []*TSDBStore + tsdbStores func() map[string]*TSDBStore } // NewMultiTSDBStore creates a new TSDBStore. -func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() []*TSDBStore) *MultiTSDBStore { +func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore { if logger == nil { logger = log.NewNopLogger() } @@ -72,16 +81,142 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s return resp, nil } +type seriesSetServer struct { + grpc.ServerStream + + ctx context.Context + + warnCh warnSender + recv chan *storepb.Series + cur *storepb.Series + + errMtx *sync.Mutex + err error +} + +func newSeriesSetServer( + ctx context.Context, + warnCh warnSender, +) *seriesSetServer { + return &seriesSetServer{ + ctx: ctx, + warnCh: warnCh, + recv: make(chan *storepb.Series), + errMtx: &sync.Mutex{}, + } +} + +func (s *seriesSetServer) Context() context.Context { + return s.ctx +} + +func (s *seriesSetServer) Run(store *TSDBStore, r *storepb.SeriesRequest) { + err := store.Series(r, s) + if err != nil { + if r.PartialResponseDisabled { + s.errMtx.Lock() + s.err = err + s.errMtx.Unlock() + } else { + s.warnCh.send(storepb.NewWarnSeriesResponse(err)) + } + } + + close(s.recv) +} + +func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error { + series := r.GetSeries() + chunks := make([]storepb.AggrChunk, len(series.Chunks)) + copy(chunks, series.Chunks) + s.recv <- &storepb.Series{ + Labels: series.Labels, + Chunks: chunks, + } + return nil +} + +func (s *seriesSetServer) Next() (ok bool) { + s.cur, ok = <-s.recv + return ok +} + +func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { + if s.cur == nil { + return nil, nil + } + return s.cur.Labels, s.cur.Chunks +} + +func (s *seriesSetServer) Err() error { + s.errMtx.Lock() + defer s.errMtx.Unlock() + return s.err +} + // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { stores := s.tsdbStores() - for _, store := range stores { - err := store.Series(r, srv) - if err != nil { - return err + if len(stores) == 0 { + return nil + } + + var ( + g, gctx = errgroup.WithContext(srv.Context()) + + // Allow to buffer max 10 series response. + // Each might be quite large (multi chunk long series given by sidecar). + respSender, respRecv, closeFn = newRespCh(gctx, 10) + ) + + g.Go(func() error { + var ( + seriesSet []storepb.SeriesSet + wg = &sync.WaitGroup{} + ) + + defer func() { + wg.Wait() + closeFn() + }() + + for tenant, store := range stores { + store := store + seriesCtx, closeSeries := context.WithCancel(gctx) + seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ + "tenant": tenant, + }) + defer closeSeries() + ss := newSeriesSetServer(seriesCtx, respSender) + wg.Add(1) + go func() { + defer wg.Done() + ss.Run(store, r) + }() + + seriesSet = append(seriesSet, ss) + } + + mergedSet := storepb.MergeSeriesSets(seriesSet...) + for mergedSet.Next() { + var series storepb.Series + series.Labels, series.Chunks = mergedSet.At() + respSender.send(storepb.NewSeriesResponse(&series)) + } + return mergedSet.Err() + }) + + for resp := range respRecv { + if err := srv.Send(resp); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) } } + + if err := g.Wait(); err != nil { + level.Error(s.logger).Log("err", err) + return err + } return nil } From 0ca745a237602dc39ae4f9581be10c14555a9550 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 3 Apr 2020 17:24:22 +0200 Subject: [PATCH 3/9] pkg/receive: Add multitsdb shipper support Signed-off-by: Frederic Branczyk --- cmd/thanos/receive.go | 86 ++++++++++++++++++++++------------------ pkg/receive/multitsdb.go | 83 +++++++++++++++++++++++++++++++------- scripts/quickstart.sh | 4 +- 3 files changed, 119 insertions(+), 54 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e0e3780e8c..7f273dbdce 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -20,18 +20,17 @@ import ( "github.com/prometheus/prometheus/storage/tsdb" kingpin "gopkg.in/alecthomas/kingpin.v2" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" - "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/tls" ) @@ -203,7 +202,43 @@ func runReceive( return err } - dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName) + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + upload := true + if len(confContentYaml) == 0 { + level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled") + upload = false + } + + if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { + if !ignoreBlockSize { + return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ + "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) + } + level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") + } + + var bkt objstore.Bucket + if upload { + // The background shipper continuously scans the data directory and uploads + // new blocks to Google Cloud Storage or an S3-compatible storage service. + bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) + if err != nil { + return err + } + } + + dbs := receive.NewMultiTSDB( + dataDir, + logger, + reg, + tsdbOpts, + lset, + tenantLabelName, + bkt, + ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ Writer: writer, @@ -227,24 +262,6 @@ func runReceive( prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) - confContentYaml, err := objStoreConfig.Content() - if err != nil { - return err - } - upload := true - if len(confContentYaml) == 0 { - level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled") - upload = false - } - - if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { - if !ignoreBlockSize { - return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ - "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) - } - level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") - } - // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. @@ -438,18 +455,9 @@ func runReceive( } if upload { - // The background shipper continuously scans the data directory and uploads - // new blocks to Google Cloud Storage or an S3-compatible storage service. - bkt, err := client.NewBucket(logger, confContentYaml, reg, comp.String()) - if err != nil { - return err - } - - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource) - - // Before starting, ensure any old blocks are uploaded. - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) + level.Debug(logger).Log("msg", "upload enabled") + if err := dbs.Upload(context.Background()); err != nil { + level.Warn(logger).Log("err", err) } { @@ -457,8 +465,8 @@ func runReceive( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if uploaded, err := s.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err, "uploaded", uploaded) + if err := dbs.Upload(ctx); err != nil { + level.Warn(logger).Log("err", err) } return nil @@ -479,8 +487,8 @@ func runReceive( // Before quitting, ensure all blocks are uploaded. defer func() { <-uploadC - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) + if err := dbs.Upload(context.Background()); err != nil { + level.Warn(logger).Log("err", err) } }() defer close(uploadDone) @@ -494,8 +502,8 @@ func runReceive( case <-ctx.Done(): return nil case <-uploadC: - if uploaded, err := s.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) + if err := dbs.Upload(ctx); err != nil { + level.Warn(logger).Log("err", err) } uploadDone <- struct{}{} } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 13f7bfe1e6..bcc6e9501d 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -4,6 +4,8 @@ package receive import ( + "context" + "fmt" "io/ioutil" "os" "path" @@ -16,7 +18,10 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "golang.org/x/sync/errgroup" ) @@ -28,14 +33,17 @@ type MultiTSDB struct { tsdbCfg *tsdb.Options tenantLabelName string labels labels.Labels + bucket objstore.Bucket + upload bool mtx *sync.RWMutex dbs map[string]*FlushableStorage appendables map[string]*tsdb.ReadyStorage stores map[string]*store.TSDBStore + shippers map[string]*shipper.Shipper } -func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string) *MultiTSDB { +func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string, bucket objstore.Bucket) *MultiTSDB { if l == nil { l = log.NewNopLogger() } @@ -49,8 +57,11 @@ func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbC dbs: map[string]*FlushableStorage{}, stores: map[string]*store.TSDBStore{}, appendables: map[string]*tsdb.ReadyStorage{}, + shippers: map[string]*shipper.Shipper{}, labels: labels, tenantLabelName: tenantLabelName, + bucket: bucket, + upload: bucket != nil, } } @@ -110,6 +121,35 @@ func (t *MultiTSDB) Flush() error { return merr.Err() } +func (t *MultiTSDB) Upload(ctx context.Context) error { + if !t.upload { + return nil + } + + t.mtx.Lock() + defer t.mtx.Unlock() + + errmtx := &sync.Mutex{} + merr := terrors.MultiError{} + wg := &sync.WaitGroup{} + for tenant, s := range t.shippers { + level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenant) + s := s + wg.Add(1) + go func() { + if uploaded, err := s.Sync(ctx); err != nil { + errmtx.Lock() + merr.Add(fmt.Errorf("failed to upload %d: %w", uploaded, err)) + errmtx.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + return merr.Err() +} + func (t *MultiTSDB) openTSDBs() error { files, err := ioutil.ReadDir(t.dataDir) if err != nil { @@ -118,7 +158,6 @@ func (t *MultiTSDB) openTSDBs() error { var g errgroup.Group for _, f := range files { - // See: https://golang.org/doc/faq#closures_and_goroutines. f := f if !f.IsDir() { continue @@ -168,41 +207,57 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error) t.mtx.Unlock() go func() { + reg := prometheus.WrapRegistererWith(prometheus.Labels{ + "tenant": tenantID, + }, t.reg) + logger := log.With(t.logger, "tenant", tenantID) + lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) + dataDir := path.Join(t.dataDir, tenantID) + + var ship *shipper.Shipper + if t.upload { + ship = shipper.New( + logger, + reg, + dataDir, + t.bucket, + func() labels.Labels { return lbls }, + metadata.ReceiveSource, + ) + } + s := NewFlushableStorage( - path.Join(t.dataDir, tenantID), - log.With(t.logger, "tenant", tenantID), - prometheus.WrapRegistererWith(prometheus.Labels{ - "tenant": tenantID, - }, t.reg), + dataDir, + logger, + reg, t.tsdbCfg, ) if err := s.Open(); err != nil { - level.Error(t.logger).Log("msg", "failed to open tsdb", "err", err) + level.Error(logger).Log("msg", "failed to open tsdb", "err", err) t.mtx.Lock() delete(t.appendables, tenantID) delete(t.stores, tenantID) t.mtx.Unlock() if err := s.Close(); err != nil { - level.Error(t.logger).Log("msg", "failed to close tsdb", "err", err) + level.Error(logger).Log("msg", "failed to close tsdb", "err", err) } return } tstore := store.NewTSDBStore( - log.With(t.logger, "component", "thanos-tsdb-store", "tenant", tenantID), - prometheus.WrapRegistererWith(prometheus.Labels{ - "tenant": tenantID, - }, t.reg), + logger, + reg, s.Get(), component.Receive, - append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}), + lbls, ) t.mtx.Lock() rs.Set(s.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) t.stores[tenantID] = tstore t.dbs[tenantID] = s + t.shippers[tenantID] = ship t.mtx.Unlock() }() diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index 4c78b123b9..813b4623b5 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -182,7 +182,9 @@ for i in $(seq 0 1 2); do --grpc-grace-period 1s \ --http-address 0.0.0.0:1${i}909 \ --http-grace-period 1s \ - --receive.replication-factor 3 \ + --receive.replication-factor 1 \ + --tsdb.min-block-duration 5m \ + --tsdb.max-block-duration 5m \ --label "receive_replica=\"${i}\"" \ --receive.local-endpoint 127.0.0.1:1${i}907 \ --receive.hashrings-file ./data/hashring.json \ From 0e53d8d2c583a48a56d98619962055a2c407ce97 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 14 Apr 2020 16:37:47 +0200 Subject: [PATCH 4/9] Address comments Signed-off-by: Frederic Branczyk --- Makefile | 4 +- cmd/thanos/receive.go | 53 +++++---- pkg/receive/handler.go | 1 - pkg/receive/multitsdb.go | 224 ++++++++++++++++++++++++--------------- pkg/receive/writer.go | 11 +- pkg/store/multitsdb.go | 16 +-- 6 files changed, 176 insertions(+), 133 deletions(-) diff --git a/Makefile b/Makefile index df832d9be6..0fd01fafe2 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,8 @@ export GO111MODULE GOPROXY ?= https://proxy.golang.org export GOPROXY +GOTEST_OPTS ?= -failfast -timeout 10m -v + # Tools. EMBEDMD ?= $(GOBIN)/embedmd-$(EMBEDMD_VERSION) # v2.0.0 @@ -252,7 +254,7 @@ test-e2e: docker @echo ">> running /test/e2e tests." # NOTE(bwplotka): # * If you see errors on CI (timeouts), but not locally, try to add -parallel 1 to limit to single CPU to reproduce small 1CPU machine. - @go test -failfast -timeout 10m $(RUN_ARGS) -v ./test/e2e/... + @go test $(GOTEST_OPTS) ./test/e2e/... .PHONY: install-deps install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests. diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 7f273dbdce..79a3ddeccb 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -202,32 +202,28 @@ func runReceive( return err } + var bkt objstore.Bucket confContentYaml, err := objStoreConfig.Content() if err != nil { return err } - upload := true - if len(confContentYaml) == 0 { - level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled") - upload = false - } - - if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { - if !ignoreBlockSize { - return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ - "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) - } - level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") - } - - var bkt objstore.Bucket + upload := len(confContentYaml) > 0 if upload { + if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { + if !ignoreBlockSize { + return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ + "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) + } + level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") + } // The background shipper continuously scans the data directory and uploads - // new blocks to Google Cloud Storage or an S3-compatible storage service. + // new blocks to object storage service. bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) if err != nil { return err } + } else { + level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") } dbs := receive.NewMultiTSDB( @@ -310,9 +306,8 @@ func runReceive( uploadC <- struct{}{} <-uploadDone } - level.Info(logger).Log("msg", "tsdb started") statusProber.Ready() - level.Info(logger).Log("msg", "server is ready to receive web requests") + level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests") dbReady <- struct{}{} } } @@ -411,9 +406,13 @@ func runReceive( s.Shutdown(errors.New("reload hashrings")) } - multiStore := store.NewMultiTSDBStore(logger, reg, comp, dbs.TSDBStores) rw := store.ReadWriteTSDBStore{ - StoreServer: multiStore, + StoreServer: store.NewMultiTSDBStore( + logger, + reg, + comp, + dbs.TSDBStores, + ), WriteableStoreServer: webHandler, } @@ -456,8 +455,8 @@ func runReceive( if upload { level.Debug(logger).Log("msg", "upload enabled") - if err := dbs.Upload(context.Background()); err != nil { - level.Warn(logger).Log("err", err) + if err := dbs.Sync(context.Background()); err != nil { + level.Warn(logger).Log("msg", "initial upload failed", "err", err) } { @@ -465,8 +464,8 @@ func runReceive( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if err := dbs.Upload(ctx); err != nil { - level.Warn(logger).Log("err", err) + if err := dbs.Sync(ctx); err != nil { + level.Warn(logger).Log("msg", "interval upload failed", "err", err) } return nil @@ -487,8 +486,8 @@ func runReceive( // Before quitting, ensure all blocks are uploaded. defer func() { <-uploadC - if err := dbs.Upload(context.Background()); err != nil { - level.Warn(logger).Log("err", err) + if err := dbs.Sync(context.Background()); err != nil { + level.Warn(logger).Log("msg", "on demnad upload failed", "err", err) } }() defer close(uploadDone) @@ -502,7 +501,7 @@ func runReceive( case <-ctx.Done(): return nil case <-uploadC: - if err := dbs.Upload(ctx); err != nil { + if err := dbs.Sync(ctx); err != nil { level.Warn(logger).Log("err", err) } uploadDone <- struct{}{} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 45de62ffa0..5b1aa2daf4 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -531,7 +531,6 @@ func isConflict(err error) bool { // isNotReady returns whether or not the given error represents a not ready error. func isNotReady(err error) bool { return err == tsdb.ErrNotReady || - err.Error() == strconv.Itoa(http.StatusConflict) || status.Code(err) == codes.Unavailable } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index bcc6e9501d..8490a0d2e9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -5,7 +5,6 @@ package receive import ( "context" - "fmt" "io/ioutil" "os" "path" @@ -14,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage/tsdb" @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "golang.org/x/sync/errgroup" @@ -34,16 +35,70 @@ type MultiTSDB struct { tenantLabelName string labels labels.Labels bucket objstore.Bucket - upload bool - mtx *sync.RWMutex - dbs map[string]*FlushableStorage - appendables map[string]*tsdb.ReadyStorage - stores map[string]*store.TSDBStore - shippers map[string]*shipper.Shipper + mtx *sync.RWMutex + tenants map[string]*tenant } -func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string, bucket objstore.Bucket) *MultiTSDB { +type tenant struct { + tsdbCfg *tsdb.Options + + readyS *tsdb.ReadyStorage + fs *FlushableStorage + s *store.TSDBStore + ship *shipper.Shipper + + mtx *sync.RWMutex +} + +func newTenant(tsdbCfg *tsdb.Options) *tenant { + return &tenant{ + tsdbCfg: tsdbCfg, + readyS: &tsdb.ReadyStorage{}, + mtx: &sync.RWMutex{}, + } +} + +func (t *tenant) readyStorage() *tsdb.ReadyStorage { + return t.readyS +} + +func (t *tenant) store() *store.TSDBStore { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.s +} + +func (t *tenant) shipper() *shipper.Shipper { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.ship +} + +func (t *tenant) flushableStorage() *FlushableStorage { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.fs +} + +func (t *tenant) set(tstore *store.TSDBStore, fs *FlushableStorage, ship *shipper.Shipper) { + t.readyS.Set(fs.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) + t.mtx.Lock() + t.fs = fs + t.s = tstore + t.ship = ship + t.mtx.Unlock() +} + +func NewMultiTSDB( + dataDir string, + l log.Logger, + reg prometheus.Registerer, + tsdbCfg *tsdb.Options, + labels labels.Labels, + tenantLabelName string, + bucket objstore.Bucket, +) *MultiTSDB { if l == nil { l = log.NewNopLogger() } @@ -54,14 +109,10 @@ func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbC reg: reg, tsdbCfg: tsdbCfg, mtx: &sync.RWMutex{}, - dbs: map[string]*FlushableStorage{}, - stores: map[string]*store.TSDBStore{}, - appendables: map[string]*tsdb.ReadyStorage{}, - shippers: map[string]*shipper.Shipper{}, + tenants: map[string]*tenant{}, labels: labels, tenantLabelName: tenantLabelName, bucket: bucket, - upload: bucket != nil, } } @@ -70,7 +121,25 @@ func (t *MultiTSDB) Open() error { return err } - return t.openTSDBs() + files, err := ioutil.ReadDir(t.dataDir) + if err != nil { + return err + } + + var g errgroup.Group + for _, f := range files { + f := f + if !f.IsDir() { + continue + } + + g.Go(func() error { + _, err := t.getOrLoadTenant(f.Name()) + return err + }) + } + + return g.Wait() } func (t *MultiTSDB) Close() error { @@ -80,11 +149,15 @@ func (t *MultiTSDB) Close() error { errmtx := &sync.Mutex{} merr := terrors.MultiError{} wg := &sync.WaitGroup{} - for _, tsdb := range t.dbs { - tsdb := tsdb + for _, tenant := range t.tenants { + s := tenant.flushableStorage() + if s == nil { + continue + } + wg.Add(1) go func() { - if err := tsdb.Close(); err != nil { + if err := s.Close(); err != nil { errmtx.Lock() merr.Add(err) errmtx.Unlock() @@ -104,11 +177,15 @@ func (t *MultiTSDB) Flush() error { errmtx := &sync.Mutex{} merr := terrors.MultiError{} wg := &sync.WaitGroup{} - for _, tsdb := range t.dbs { - tsdb := tsdb + for _, tenant := range t.tenants { + s := tenant.flushableStorage() + if s == nil { + continue + } + wg.Add(1) go func() { - if err := tsdb.Flush(); err != nil { + if err := s.Flush(); err != nil { errmtx.Lock() merr.Add(err) errmtx.Unlock() @@ -121,25 +198,25 @@ func (t *MultiTSDB) Flush() error { return merr.Err() } -func (t *MultiTSDB) Upload(ctx context.Context) error { - if !t.upload { - return nil - } - +func (t *MultiTSDB) Sync(ctx context.Context) error { t.mtx.Lock() defer t.mtx.Unlock() errmtx := &sync.Mutex{} merr := terrors.MultiError{} wg := &sync.WaitGroup{} - for tenant, s := range t.shippers { - level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenant) - s := s + for tenantID, tenant := range t.tenants { + level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenantID) + s := tenant.shipper() + if s == nil { + continue + } + wg.Add(1) go func() { if uploaded, err := s.Sync(ctx); err != nil { errmtx.Lock() - merr.Add(fmt.Errorf("failed to upload %d: %w", uploaded, err)) + merr.Add(errors.Wrapf(err, "upload %d", uploaded)) errmtx.Unlock() } wg.Done() @@ -150,60 +227,41 @@ func (t *MultiTSDB) Upload(ctx context.Context) error { return merr.Err() } -func (t *MultiTSDB) openTSDBs() error { - files, err := ioutil.ReadDir(t.dataDir) - if err != nil { - return err - } - - var g errgroup.Group - for _, f := range files { - f := f - if !f.IsDir() { - continue - } - - g.Go(func() error { - tenantId := f.Name() - _, err := t.getOrLoadTenant(tenantId) - return err - }) - } - - return g.Wait() -} - func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore { t.mtx.RLock() - res := make(map[string]*store.TSDBStore, len(t.stores)) - for k, v := range t.stores { - res[k] = v - } defer t.mtx.RUnlock() + + res := make(map[string]*store.TSDBStore, len(t.tenants)) + for k, tenant := range t.tenants { + s := tenant.store() + if s != nil { + res[k] = s + } + } return res } -func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error) { +func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tenant, error) { // Fast path, as creating tenants is a very rare operation. t.mtx.RLock() - db, exist := t.appendables[tenantID] + tenant, exist := t.tenants[tenantID] t.mtx.RUnlock() if exist { - return db, nil + return tenant, nil } // Slow path needs to lock fully and attempt to read again to prevent race // conditions, where since the fast path was tried, there may have actually // been the same tenant inserted in the map. t.mtx.Lock() - db, exist = t.appendables[tenantID] + tenant, exist = t.tenants[tenantID] if exist { t.mtx.Unlock() - return db, nil + return tenant, nil } - rs := &tsdb.ReadyStorage{} - t.appendables[tenantID] = rs + tenant = newTenant(t.tsdbCfg) + t.tenants[tenantID] = tenant t.mtx.Unlock() go func() { @@ -215,7 +273,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error) dataDir := path.Join(t.dataDir, tenantID) var ship *shipper.Shipper - if t.upload { + if t.bucket != nil { ship = shipper.New( logger, reg, @@ -236,34 +294,32 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error) if err := s.Open(); err != nil { level.Error(logger).Log("msg", "failed to open tsdb", "err", err) t.mtx.Lock() - delete(t.appendables, tenantID) - delete(t.stores, tenantID) + delete(t.tenants, tenantID) t.mtx.Unlock() - if err := s.Close(); err != nil { - level.Error(logger).Log("msg", "failed to close tsdb", "err", err) - } + runutil.CloseWithLogOnErr(logger, s, "failed to close tsdb") return } - tstore := store.NewTSDBStore( - logger, - reg, - s.Get(), - component.Receive, - lbls, + tenant.set( + store.NewTSDBStore( + logger, + reg, + s.Get(), + component.Receive, + lbls, + ), + s, + ship, ) - - t.mtx.Lock() - rs.Set(s.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) - t.stores[tenantID] = tstore - t.dbs[tenantID] = s - t.shippers[tenantID] = ship - t.mtx.Unlock() }() - return rs, nil + return tenant, nil } func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) { - return t.getOrLoadTenant(tenantID) + tenant, err := t.getOrLoadTenant(tenantID) + if err != nil { + return nil, err + } + return tenant.readyStorage(), nil } diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index c2781336d8..820ed91c79 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -28,13 +28,13 @@ type TenantStorage interface { type Writer struct { logger log.Logger - multitsdb TenantStorage + multiTSDB TenantStorage } -func NewWriter(logger log.Logger, multitsdb TenantStorage) *Writer { +func NewWriter(logger log.Logger, multiTSDB TenantStorage) *Writer { return &Writer{ logger: logger, - multitsdb: multitsdb, + multiTSDB: multiTSDB, } } @@ -45,7 +45,7 @@ func (r *Writer) Write(tenantID string, wreq *prompb.WriteRequest) error { numOutOfBounds = 0 ) - s, err := r.multitsdb.TenantAppendable(tenantID) + s, err := r.multiTSDB.TenantAppendable(tenantID) if err != nil { return errors.Wrap(err, "get tenant appendable") } @@ -57,9 +57,6 @@ func (r *Writer) Write(tenantID string, wreq *prompb.WriteRequest) error { if err != nil { return errors.Wrap(err, "get appender") } - if app == nil { - return errors.New("tsdb not ready yet to be appended to") - } var errs terrors.MultiError for _, t := range wreq.Timeseries { diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 0f6b1001c6..ca3414d33c 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -102,7 +101,6 @@ func newSeriesSetServer( ctx: ctx, warnCh: warnCh, recv: make(chan *storepb.Series), - errMtx: &sync.Mutex{}, } } @@ -110,13 +108,11 @@ func (s *seriesSetServer) Context() context.Context { return s.ctx } -func (s *seriesSetServer) Run(store *TSDBStore, r *storepb.SeriesRequest) { +func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { err := store.Series(r, s) if err != nil { if r.PartialResponseDisabled { - s.errMtx.Lock() s.err = err - s.errMtx.Unlock() } else { s.warnCh.send(storepb.NewWarnSeriesResponse(err)) } @@ -149,8 +145,6 @@ func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { } func (s *seriesSetServer) Err() error { - s.errMtx.Lock() - defer s.errMtx.Unlock() return s.err } @@ -192,7 +186,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri wg.Add(1) go func() { defer wg.Done() - ss.Run(store, r) + ss.Series(store, r) }() seriesSet = append(seriesSet, ss) @@ -213,11 +207,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri } } - if err := g.Wait(); err != nil { - level.Error(s.logger).Log("err", err) - return err - } - return nil + return g.Wait() } // LabelNames returns all known label names. From be255c006ab71ee9c47abe2d702b8a8a32c8a218 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 14 Apr 2020 16:48:50 +0200 Subject: [PATCH 5/9] Add more comments on types and functions Signed-off-by: Frederic Branczyk --- pkg/store/multitsdb.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index ca3414d33c..be4adfe65c 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -20,13 +20,14 @@ import ( "google.golang.org/grpc/status" ) +// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances. type MultiTSDBStore struct { logger log.Logger component component.SourceStoreAPI tsdbStores func() map[string]*TSDBStore } -// NewMultiTSDBStore creates a new TSDBStore. +// NewMultiTSDBStore creates a new MultiTSDBStore. func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore { if logger == nil { logger = log.NewNopLogger() @@ -38,7 +39,7 @@ func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component com } } -// Info returns store information about the Prometheus instance. +// Info returns store merged information about the underlying TSDBStore instances. func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*storepb.InfoResponse, error) { stores := s.tsdbStores() @@ -148,8 +149,9 @@ func (s *seriesSetServer) Err() error { return s.err } -// Series returns all series for a requested time range and label matcher. The returned data may -// exceed the requested time bounds. +// Series returns all series for a requested time range and label matcher. The +// returned data may exceed the requested time bounds. The data returned may +// have been read and merged from multiple underlying TSDBStore instances. func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { stores := s.tsdbStores() if len(stores) == 0 { From b691fd0aad1e71c6309cf70f69d0ff38e2721c74 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 14 Apr 2020 16:58:55 +0200 Subject: [PATCH 6/9] pkg/store/multitsdb.go: Remove unused struct field Signed-off-by: Frederic Branczyk --- pkg/store/multitsdb.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index be4adfe65c..3a80907a88 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -90,8 +90,7 @@ type seriesSetServer struct { recv chan *storepb.Series cur *storepb.Series - errMtx *sync.Mutex - err error + err error } func newSeriesSetServer( From 47dc82aaa6af5021e626fab46d95ee54a2dbe744 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 14 Apr 2020 17:25:54 +0200 Subject: [PATCH 7/9] pkg/receive/multitsdb.go: Remove unused Close method TSDBs are implicitly closed by flushing the database, which is ensured on shutdown, hence there is no need to have the explicit close method. Signed-off-by: Frederic Branczyk --- pkg/receive/multitsdb.go | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 8490a0d2e9..f00cf561c9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -142,34 +142,6 @@ func (t *MultiTSDB) Open() error { return g.Wait() } -func (t *MultiTSDB) Close() error { - t.mtx.Lock() - defer t.mtx.Unlock() - - errmtx := &sync.Mutex{} - merr := terrors.MultiError{} - wg := &sync.WaitGroup{} - for _, tenant := range t.tenants { - s := tenant.flushableStorage() - if s == nil { - continue - } - - wg.Add(1) - go func() { - if err := s.Close(); err != nil { - errmtx.Lock() - merr.Add(err) - errmtx.Unlock() - } - wg.Done() - }() - } - - wg.Wait() - return merr.Err() -} - func (t *MultiTSDB) Flush() error { t.mtx.Lock() defer t.mtx.Unlock() From eadc0a19bd158da7c829b52b82f6640e0119a3be Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Thu, 16 Apr 2020 10:39:57 +0200 Subject: [PATCH 8/9] pkg/store/multitsdb.go: Make errors and warnings tenant aware Signed-off-by: Frederic Branczyk --- pkg/store/multitsdb.go | 53 ++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 3a80907a88..847cf1721b 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -5,6 +5,7 @@ package store import ( "context" + "fmt" "sync" "github.com/go-kit/kit/log" @@ -51,10 +52,10 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s } infos := make([]*storepb.InfoResponse, 0, len(stores)) - for _, store := range stores { + for tenant, store := range stores { info, err := store.Info(ctx, req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "get info for tenant %s", tenant) } infos = append(infos, info) } @@ -81,7 +82,7 @@ func (s *MultiTSDBStore) Info(ctx context.Context, req *storepb.InfoRequest) (*s return resp, nil } -type seriesSetServer struct { +type tenantSeriesSetServer struct { grpc.ServerStream ctx context.Context @@ -90,27 +91,31 @@ type seriesSetServer struct { recv chan *storepb.Series cur *storepb.Series - err error + err error + tenant string } -func newSeriesSetServer( +func newTenantSeriesSetServer( ctx context.Context, + tenant string, warnCh warnSender, -) *seriesSetServer { - return &seriesSetServer{ +) *tenantSeriesSetServer { + return &tenantSeriesSetServer{ ctx: ctx, + tenant: tenant, warnCh: warnCh, recv: make(chan *storepb.Series), } } -func (s *seriesSetServer) Context() context.Context { +func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx } -func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { +func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { err := store.Series(r, s) if err != nil { + err = errors.Wrapf(s.err, "get series for tenant %s", s.tenant) if r.PartialResponseDisabled { s.err = err } else { @@ -121,7 +126,7 @@ func (s *seriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { close(s.recv) } -func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error { +func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error { series := r.GetSeries() chunks := make([]storepb.AggrChunk, len(series.Chunks)) copy(chunks, series.Chunks) @@ -132,19 +137,19 @@ func (s *seriesSetServer) Send(r *storepb.SeriesResponse) error { return nil } -func (s *seriesSetServer) Next() (ok bool) { +func (s *tenantSeriesSetServer) Next() (ok bool) { s.cur, ok = <-s.recv return ok } -func (s *seriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { +func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { if s.cur == nil { return nil, nil } return s.cur.Labels, s.cur.Chunks } -func (s *seriesSetServer) Err() error { +func (s *tenantSeriesSetServer) Err() error { return s.err } @@ -178,12 +183,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri for tenant, store := range stores { store := store - seriesCtx, closeSeries := context.WithCancel(gctx) + seriesCtx, cancelSeries := context.WithCancel(gctx) seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ "tenant": tenant, }) - defer closeSeries() - ss := newSeriesSetServer(seriesCtx, respSender) + defer cancelSeries() + ss := newTenantSeriesSetServer(seriesCtx, tenant, respSender) wg.Add(1) go func() { defer wg.Done() @@ -217,10 +222,10 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames warnings := map[string]struct{}{} stores := s.tsdbStores() - for _, store := range stores { + for tenant, store := range stores { r, err := store.LabelNames(ctx, req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "get label names for tenant %s", tenant) } for _, l := range r.Names { @@ -228,7 +233,7 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames } for _, l := range r.Warnings { - warnings[l] = struct{}{} + warnings[prefixTenantWarning(tenant, l)] = struct{}{} } } @@ -238,6 +243,10 @@ func (s *MultiTSDBStore) LabelNames(ctx context.Context, req *storepb.LabelNames }, nil } +func prefixTenantWarning(tenant, s string) string { + return fmt.Sprintf("[%s] %s", tenant, s) +} + func keys(m map[string]struct{}) []string { res := make([]string, 0, len(m)) for k := range m { @@ -253,10 +262,10 @@ func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValu warnings := map[string]struct{}{} stores := s.tsdbStores() - for _, store := range stores { + for tenant, store := range stores { r, err := store.LabelValues(ctx, req) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "get label values for tenant %s", tenant) } for _, l := range r.Values { @@ -264,7 +273,7 @@ func (s *MultiTSDBStore) LabelValues(ctx context.Context, req *storepb.LabelValu } for _, l := range r.Warnings { - warnings[l] = struct{}{} + warnings[prefixTenantWarning(tenant, l)] = struct{}{} } } From 4b44b2de3495e1c854fe0308eeb3854bba235ed2 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 17 Apr 2020 12:16:02 +0200 Subject: [PATCH 9/9] pkg/store/multitsdb.go: Consistent tenant aware errors and warnings Signed-off-by: Frederic Branczyk --- pkg/store/multitsdb.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index 847cf1721b..e1acea5514 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -115,10 +115,11 @@ func (s *tenantSeriesSetServer) Context() context.Context { func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { err := store.Series(r, s) if err != nil { - err = errors.Wrapf(s.err, "get series for tenant %s", s.tenant) if r.PartialResponseDisabled { - s.err = err + s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant) } else { + // Consistently prefix tenant specific warnings as done in various other places. + err = errors.New(prefixTenantWarning(s.tenant, err.Error())) s.warnCh.send(storepb.NewWarnSeriesResponse(err)) } }