Skip to content

Commit

Permalink
receive: Fixed small options race; Removed unused StartTime feature. (#…
Browse files Browse the repository at this point in the history
…2816)

startTimeMargin and StartTime is used only by Prometheus remote read, Thanos does not use it.
Fixed following race:

```
=== RUN   TestMultiTSDB/run_on_existing_storage
==================
WARNING: DATA RACE
Read at 0x00c00073ae80 by goroutine 69:
  github.com/prometheus/prometheus/tsdb.validateOpts()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/[email protected]/tsdb/db.go:510 +0x55
  github.com/prometheus/prometheus/tsdb.Open()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/[email protected]/tsdb/db.go:502 +0x61
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant.func1()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:268 +0x56b
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:302 +0x4ef
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open.func1()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:142 +0x66
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x85

Previous write at 0x00c00073ae80 by goroutine 57:
  github.com/prometheus/prometheus/tsdb.validateOpts()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/[email protected]/tsdb/db.go:511 +0x1f2
  github.com/prometheus/prometheus/tsdb.Open()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/github.com/prometheus/[email protected]/tsdb/db.go:502 +0x61
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant.func1()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:268 +0x56b
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).getOrLoadTenant()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:302 +0x4ef
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open.func1()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:142 +0x66
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x85

Goroutine 69 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x73
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:141 +0x2af
  github.com/thanos-io/thanos/pkg/receive.TestMultiTSDB.func3()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb_test.go:118 +0x6d3
  testing.tRunner()
      /home/bwplotka/.gvm/gos/go1.14.2/src/testing/testing.go:991 +0x1eb

Goroutine 57 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x73
  github.com/thanos-io/thanos/pkg/receive.(*MultiTSDB).Open()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb.go:141 +0x2af
  github.com/thanos-io/thanos/pkg/receive.TestMultiTSDB.func3()
      /home/bwplotka/Repos/thanos/pkg/receive/multitsdb_test.go:118 +0x6d3
  testing.tRunner()
      /home/bwplotka/.gvm/gos/go1.14.2/src/testing/testing.go:991 +0x1eb
==================
```

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Jun 29, 2020
1 parent e24ce43 commit 789ef71
Showing 1 changed file with 58 additions and 84 deletions.
142 changes: 58 additions & 84 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@ import (
"os"
"path"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -72,8 +69,6 @@ func NewMultiTSDB(
}

type tenant struct {
tsdbOpts *tsdb.Options

readyS *ReadyStorage
tsdb *tsdb.DB
storeTSDB *store.TSDBStore
Expand All @@ -82,11 +77,10 @@ type tenant struct {
mtx *sync.RWMutex
}

func newTenant(tsdbOpts *tsdb.Options) *tenant {
func newTenant() *tenant {
return &tenant{
tsdbOpts: tsdbOpts,
readyS: &ReadyStorage{},
mtx: &sync.RWMutex{},
readyS: &ReadyStorage{},
mtx: &sync.RWMutex{},
}
}

Expand All @@ -113,7 +107,7 @@ func (t *tenant) db() *tsdb.DB {
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper) {
t.readyS.Set(tenantTSDB, int64(2*time.Duration(t.tsdbOpts.MinBlockDuration).Seconds()*1000))
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.tsdb = tenantTSDB
t.storeTSDB = storeTSDB
Expand Down Expand Up @@ -220,6 +214,47 @@ func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
return res
}

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
dataDir := path.Join(t.dataDir, tenantID)

opts := *t.tsdbOpts
s, err := tsdb.Open(
dataDir,
logger,
&UnRegisterer{Registerer: reg},
&opts,
)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.mtx.Unlock()
return err
}

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
logger,
reg,
dataDir,
t.bucket,
func() labels.Labels { return lbls },
metadata.ReceiveSource,
t.allowOutOfOrderUpload,
)
}
tenant.set(store.NewTSDBStore(
logger,
reg,
s,
component.Receive,
lbls,
), s, ship)

return nil
}
func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) {
// Fast path, as creating tenants is a very rare operation.
t.mtx.RLock()
Expand All @@ -239,68 +274,20 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
return tenant, nil
}

tenant = newTenant(t.tsdbOpts)
tenant = newTenant()
t.tenants[tenantID] = tenant
t.mtx.Unlock()

var err error
startTSDB := func() {
reg := prometheus.WrapRegistererWith(prometheus.Labels{
"tenant": tenantID,
}, t.reg)
logger := log.With(t.logger, "tenant", tenantID)
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
dataDir := path.Join(t.dataDir, tenantID)

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
logger,
reg,
dataDir,
t.bucket,
func() labels.Labels { return lbls },
metadata.ReceiveSource,
t.allowOutOfOrderUpload,
)
}

s, err := tsdb.Open(
dataDir,
logger,
&UnRegisterer{Registerer: reg},
t.tsdbOpts,
)

// Assign to outer error to report in blocking case.
if err != nil {
level.Error(logger).Log("msg", "failed to open tsdb", "err", err)
t.mtx.Lock()
delete(t.tenants, tenantID)
t.mtx.Unlock()
runutil.CloseWithLogOnErr(logger, s, "failed to close tsdb")
return
}

tenant.set(
store.NewTSDBStore(
logger,
reg,
s,
component.Receive,
lbls,
),
s,
ship,
)
}
logger := log.With(t.logger, "tenant", tenantID)
if !blockingStart {
go startTSDB()
go func() {
if err := t.startTSDB(logger, tenantID, tenant); err != nil {
level.Error(logger).Log("msg", "failed to start tsdb asynchronously", "err", err)
}
}()
return tenant, nil
}

startTSDB()
return tenant, err
return tenant, t.startTSDB(logger, tenantID, tenant)
}

func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) {
Expand All @@ -323,11 +310,11 @@ type ReadyStorage struct {
}

// Set the storage.
func (s *ReadyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
func (s *ReadyStorage) Set(db *tsdb.DB) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.a = &adapter{db: db, startTimeMargin: startTimeMargin}
s.a = &adapter{db: db}
}

// Get the storage.
Expand All @@ -347,10 +334,7 @@ func (s *ReadyStorage) get() *adapter {

// StartTime implements the Storage interface.
func (s *ReadyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil {
return x.StartTime()
}
return int64(model.Latest), ErrNotReady
return 0, errors.New("not implemented")
}

// Querier implements the Storage interface.
Expand Down Expand Up @@ -379,22 +363,12 @@ func (s *ReadyStorage) Close() error {

// adapter implements a storage.Storage around TSDB.
type adapter struct {
db *tsdb.DB
startTimeMargin int64
db *tsdb.DB
}

// StartTime implements the Storage interface.
func (a adapter) StartTime() (int64, error) {
var startTime int64

if len(a.db.Blocks()) > 0 {
startTime = a.db.Blocks()[0].Meta().MinTime
} else {
startTime = time.Now().Unix() * 1000
}

// Add a safety margin as it may take a few minutes for everything to spin up.
return startTime + a.startTimeMargin, nil
return 0, errors.New("not implemented")
}

func (a adapter) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
Expand Down

0 comments on commit 789ef71

Please sign in to comment.