Skip to content

Commit

Permalink
receive: Added more observability, fixed leaktest, to actually check …
Browse files Browse the repository at this point in the history
…leaks ):

Reason: Missing (), probably we need linter for this.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jun 30, 2020
1 parent 789ef71 commit cc357f9
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 47 deletions.
35 changes: 24 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -205,8 +206,7 @@ func runReceive(
allowOutOfOrderUpload bool,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

level.Warn(logger).Log("msg", "setting up receive")
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
Expand Down Expand Up @@ -285,38 +285,50 @@ func runReceive(

// dbReady signals when TSDB is ready and the Store gRPC server can start.
dbReady := make(chan struct{}, 1)
// updateDB signals when TSDB needs to be flushed and updated.
updateDB := make(chan struct{}, 1)
// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change
hashringChangedChan := make(chan struct{}, 1)
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up tsdb")
{
// TSDB.
dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_attempted_total",
Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes",
})
dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_completed_total",
Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes",
})

// TSDBs reload logic, listening on hashring changes.
cancel := make(chan struct{})
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before quitting, ensure the WAL is flushed and the DB is closed.
// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
if err := dbs.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close multi db")
}
}()

for {
select {
case <-cancel:
return nil
case _, ok := <-updateDB:
case _, ok := <-hashringChangedChan:
if !ok {
return nil
}

level.Info(logger).Log("msg", "updating DB")
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating Multi DB")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
Expand All @@ -330,6 +342,7 @@ func runReceive(
}
statusProber.Ready()
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
dbUpdatesCompleted.Inc()
dbReady <- struct{}{}
}
}
Expand Down Expand Up @@ -373,7 +386,7 @@ func runReceive(

cancel := make(chan struct{})
g.Add(func() error {
defer close(updateDB)
defer close(hashringChangedChan)
for {
select {
case h, ok := <-updates:
Expand All @@ -384,7 +397,7 @@ func runReceive(
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
updateDB <- struct{}{}
hashringChangedChan <- struct{}{}
case <-cancel:
return nil
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
}

func TestReceiveQuorum(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

defer leaktest.CheckTimeout(t, 10*time.Second)()
appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
commitErrFn := func() error { return errors.New("failed to commit") }
Expand Down Expand Up @@ -521,8 +520,7 @@ func TestReceiveQuorum(t *testing.T) {
}

func TestReceiveWithConsistencyDelay(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

defer leaktest.CheckTimeout(t, 10*time.Second)()
appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
commitErrFn := func() error { return errors.New("failed to commit") }
Expand Down
54 changes: 30 additions & 24 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func NewMultiTSDB(

type tenant struct {
readyS *ReadyStorage
tsdb *tsdb.DB
storeTSDB *store.TSDBStore
ship *shipper.Shipper

Expand Down Expand Up @@ -100,16 +99,9 @@ func (t *tenant) shipper() *shipper.Shipper {
return t.ship
}

func (t *tenant) db() *tsdb.DB {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.tsdb
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.tsdb = tenantTSDB
t.storeTSDB = storeTSDB
t.ship = ship
t.mtx.Unlock()
Expand Down Expand Up @@ -148,17 +140,17 @@ func (t *MultiTSDB) Flush() error {
errmtx := &sync.Mutex{}
merr := terrors.MultiError{}
wg := &sync.WaitGroup{}
for _, tenant := range t.tenants {
db := tenant.db()
for id, tenant := range t.tenants {
db := tenant.readyStorage().Get()
if db == nil {
level.Error(t.logger).Log("msg", "flushing TSDB failed; not ready", "tenant", id)
continue
}

level.Info(t.logger).Log("msg", "flushing TSDB", "tenant", id)
wg.Add(1)
go func() {
head := db.Head()
mint, maxt := head.MinTime(), head.MaxTime()
if err := db.CompactHead(tsdb.NewRangeHead(head, mint, maxt-1)); err != nil {
if err := db.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime()-1)); err != nil {
errmtx.Lock()
merr.Add(err)
errmtx.Unlock()
Expand All @@ -171,7 +163,28 @@ func (t *MultiTSDB) Flush() error {
return merr.Err()
}

func (t *MultiTSDB) Close() error {
t.mtx.Lock()
defer t.mtx.Unlock()

merr := terrors.MultiError{}
for id, tenant := range t.tenants {
db := tenant.readyStorage().Get()
if db == nil {
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id)
continue
}
level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id)
merr.Add(db.Close())
}
return merr.Err()
}

func (t *MultiTSDB) Sync(ctx context.Context) error {
if t.bucket == nil {
return errors.New("bucket is not specified, Sync should not be invoked")
}

t.mtx.RLock()
defer t.mtx.RUnlock()

Expand All @@ -184,7 +197,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
if s == nil {
continue
}

wg.Add(1)
go func() {
if uploaded, err := s.Sync(ctx); err != nil {
Expand All @@ -195,7 +207,6 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
wg.Done()
}()
}

wg.Wait()
return merr.Err()
}
Expand All @@ -219,6 +230,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
dataDir := path.Join(t.dataDir, tenantID)

level.Info(logger).Log("msg", "opening TSDB")
opts := *t.tsdbOpts
s, err := tsdb.Open(
dataDir,
Expand All @@ -232,7 +244,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.mtx.Unlock()
return err
}

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
Expand All @@ -245,16 +256,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.allowOutOfOrderUpload,
)
}
tenant.set(store.NewTSDBStore(
logger,
reg,
s,
component.Receive,
lbls,
), s, ship)

tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship)
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}

func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) {
// Fast path, as creating tenants is a very rare operation.
t.mtx.RLock()
Expand Down
12 changes: 4 additions & 8 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package receive

import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
Expand All @@ -25,13 +24,12 @@ import (
)

func TestMultiTSDB(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)

defer leaktest.CheckTimeout(t, 10*time.Second)()
dir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stderr)
t.Run("run fresh", func(t *testing.T) {
m := NewMultiTSDB(
dir, logger, prometheus.NewRegistry(), &tsdb.Options{
Expand All @@ -45,7 +43,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
)
defer testutil.Ok(t, m.Flush())
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())
Expand Down Expand Up @@ -112,7 +110,7 @@ func TestMultiTSDB(t *testing.T) {
nil,
false,
)
defer testutil.Ok(t, m.Flush())
defer func() { testutil.Ok(t, m.Close()) }()

testutil.Ok(t, m.Flush())
testutil.Ok(t, m.Open())
Expand Down Expand Up @@ -202,13 +200,11 @@ Outer:
if !ok {
break Outer
}
fmt.Println(r[0].String())
testutil.Equals(t, expectedFooResp, r)
case r, ok := <-respBar:
if !ok {
break Outer
}
fmt.Println(r[0].String())
testutil.Equals(t, expectedBarResp, r)
}
}
Expand Down

0 comments on commit cc357f9

Please sign in to comment.