From 4c2a9b52a504e5cee27f920344eae1552e4b3e26 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 29 Jun 2020 17:37:11 +0200 Subject: [PATCH] receive: Fixed small options race; Removed unused StartTime feature. startTimeMargin and StartTime is used only by Prometheus remote read, Thanos does not use it. Fixed following race: ``` === RUN TestMultiTSDB/run_on_existing_storage ================== WARNING: DATA RACE Read at 0x00c00073ae80 by goroutine 69: github.com/prometheus/prometheus/tsdb.validateOpts() /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/prometheus@v1.8.2-0.20200609165731-66dfb951c4ca/tsdb/db.go:510 +0x55 github.com/prometheus/prometheus/tsdb.Open() /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/prometheus@v1.8.2-0.20200609165731-66dfb951c4ca/tsdb/db.go:502 +0x61 github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant.func1() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:268 +0x56b github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:302 +0x4ef github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open.func1() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:142 +0x66 golang.org/x/sync/errgroup.(*Group).Go.func1() /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/sync@v0.0.0-20200317015054-43a5402ce75a/errgroup/errgroup.go:57 +0x85 Previous write at 0x00c00073ae80 by goroutine 57: github.com/prometheus/prometheus/tsdb.validateOpts() /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/prometheus@v1.8.2-0.20200609165731-66dfb951c4ca/tsdb/db.go:511 +0x1f2 github.com/prometheus/prometheus/tsdb.Open() /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/prometheus@v1.8.2-0.20200609165731-66dfb951c4ca/tsdb/db.go:502 +0x61 github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant.func1() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:268 +0x56b github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:302 +0x4ef github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open.func1() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:142 +0x66 golang.org/x/sync/errgroup.(*Group).Go.func1() /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/sync@v0.0.0-20200317015054-43a5402ce75a/errgroup/errgroup.go:57 +0x85 Goroutine 69 (running) created at: golang.org/x/sync/errgroup.(*Group).Go() /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/sync@v0.0.0-20200317015054-43a5402ce75a/errgroup/errgroup.go:54 +0x73 github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:141 +0x2af github.com/thanos-io/thanos/pkg/receive.TestMultiTSDB.func3() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb_test.go:118 +0x6d3 testing.tRunner() /home/bwplotka/.gvm/gos/go1.14.2/src/testing/testing.go:991 +0x1eb Goroutine 57 (running) created at: golang.org/x/sync/errgroup.(*Group).Go() /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/sync@v0.0.0-20200317015054-43a5402ce75a/errgroup/errgroup.go:54 +0x73 github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:141 +0x2af github.com/thanos-io/thanos/pkg/receive.TestMultiTSDB.func3() /home/bwplotka/Repos/thanos/pkg/receive/multitsdb_test.go:118 +0x6d3 testing.tRunner() /home/bwplotka/.gvm/gos/go1.14.2/src/testing/testing.go:991 +0x1eb ================== ``` Signed-off-by: Bartlomiej Plotka --- pkg/receive/multitsdb.go | 142 ++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 84 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index d2589f1c4f..596f6953cc 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -9,13 +9,11 @@ import ( "os" "path" "sync" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -23,7 +21,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "golang.org/x/sync/errgroup" @@ -72,8 +69,6 @@ func NewMultiTSDB( } type tenant struct { - tsdbOpts *tsdb.Options - readyS *ReadyStorage tsdb *tsdb.DB storeTSDB *store.TSDBStore @@ -82,11 +77,10 @@ type tenant struct { mtx *sync.RWMutex } -func newTenant(tsdbOpts *tsdb.Options) *tenant { +func newTenant() *tenant { return &tenant{ - tsdbOpts: tsdbOpts, - readyS: &ReadyStorage{}, - mtx: &sync.RWMutex{}, + readyS: &ReadyStorage{}, + mtx: &sync.RWMutex{}, } } @@ -113,7 +107,7 @@ func (t *tenant) db() *tsdb.DB { } func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper) { - t.readyS.Set(tenantTSDB, int64(2*time.Duration(t.tsdbOpts.MinBlockDuration).Seconds()*1000)) + t.readyS.Set(tenantTSDB) t.mtx.Lock() t.tsdb = tenantTSDB t.storeTSDB = storeTSDB @@ -220,6 +214,47 @@ func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore { return res } +func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error { + reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg) + lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) + dataDir := path.Join(t.dataDir, tenantID) + + opts := *t.tsdbOpts + s, err := tsdb.Open( + dataDir, + logger, + &UnRegisterer{Registerer: reg}, + &opts, + ) + if err != nil { + t.mtx.Lock() + delete(t.tenants, tenantID) + t.mtx.Unlock() + return err + } + + var ship *shipper.Shipper + if t.bucket != nil { + ship = shipper.New( + logger, + reg, + dataDir, + t.bucket, + func() labels.Labels { return lbls }, + metadata.ReceiveSource, + t.allowOutOfOrderUpload, + ) + } + tenant.set(store.NewTSDBStore( + logger, + reg, + s, + component.Receive, + lbls, + ), s, ship) + + return nil +} func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) { // Fast path, as creating tenants is a very rare operation. t.mtx.RLock() @@ -239,68 +274,20 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan return tenant, nil } - tenant = newTenant(t.tsdbOpts) + tenant = newTenant() t.tenants[tenantID] = tenant t.mtx.Unlock() - var err error - startTSDB := func() { - reg := prometheus.WrapRegistererWith(prometheus.Labels{ - "tenant": tenantID, - }, t.reg) - logger := log.With(t.logger, "tenant", tenantID) - lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) - dataDir := path.Join(t.dataDir, tenantID) - - var ship *shipper.Shipper - if t.bucket != nil { - ship = shipper.New( - logger, - reg, - dataDir, - t.bucket, - func() labels.Labels { return lbls }, - metadata.ReceiveSource, - t.allowOutOfOrderUpload, - ) - } - - s, err := tsdb.Open( - dataDir, - logger, - &UnRegisterer{Registerer: reg}, - t.tsdbOpts, - ) - - // Assign to outer error to report in blocking case. - if err != nil { - level.Error(logger).Log("msg", "failed to open tsdb", "err", err) - t.mtx.Lock() - delete(t.tenants, tenantID) - t.mtx.Unlock() - runutil.CloseWithLogOnErr(logger, s, "failed to close tsdb") - return - } - - tenant.set( - store.NewTSDBStore( - logger, - reg, - s, - component.Receive, - lbls, - ), - s, - ship, - ) - } + logger := log.With(t.logger, "tenant", tenantID) if !blockingStart { - go startTSDB() + go func() { + if err := t.startTSDB(logger, tenantID, tenant); err != nil { + level.Error(logger).Log("msg", "failed to start tsdb asynchronously", "err", err) + } + }() return tenant, nil } - - startTSDB() - return tenant, err + return tenant, t.startTSDB(logger, tenantID, tenant) } func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) { @@ -323,11 +310,11 @@ type ReadyStorage struct { } // Set the storage. -func (s *ReadyStorage) Set(db *tsdb.DB, startTimeMargin int64) { +func (s *ReadyStorage) Set(db *tsdb.DB) { s.mtx.Lock() defer s.mtx.Unlock() - s.a = &adapter{db: db, startTimeMargin: startTimeMargin} + s.a = &adapter{db: db} } // Get the storage. @@ -347,10 +334,7 @@ func (s *ReadyStorage) get() *adapter { // StartTime implements the Storage interface. func (s *ReadyStorage) StartTime() (int64, error) { - if x := s.get(); x != nil { - return x.StartTime() - } - return int64(model.Latest), ErrNotReady + return 0, errors.New("not implemented") } // Querier implements the Storage interface. @@ -379,22 +363,12 @@ func (s *ReadyStorage) Close() error { // adapter implements a storage.Storage around TSDB. type adapter struct { - db *tsdb.DB - startTimeMargin int64 + db *tsdb.DB } // StartTime implements the Storage interface. func (a adapter) StartTime() (int64, error) { - var startTime int64 - - if len(a.db.Blocks()) > 0 { - startTime = a.db.Blocks()[0].Meta().MinTime - } else { - startTime = time.Now().Unix() * 1000 - } - - // Add a safety margin as it may take a few minutes for everything to spin up. - return startTime + a.startTimeMargin, nil + return 0, errors.New("not implemented") } func (a adapter) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {