Skip to content

Commit

Permalink
pkg/receive: Add multitsdb shipper support
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Branczyk <[email protected]>
  • Loading branch information
brancz committed Apr 6, 2020
1 parent d797dd0 commit 0ca745a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 54 deletions.
86 changes: 47 additions & 39 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -203,7 +202,43 @@ func runReceive(
return err
}

dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName)
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !ignoreBlockSize {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
}

var bkt objstore.Bucket
if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String())
if err != nil {
return err
}
}

dbs := receive.NewMultiTSDB(
dataDir,
logger,
reg,
tsdbOpts,
lset,
tenantLabelName,
bkt,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
Expand All @@ -227,24 +262,6 @@ func runReceive(
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !ignoreBlockSize {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
}

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.

Expand Down Expand Up @@ -438,27 +455,18 @@ func runReceive(
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, comp.String())
if err != nil {
return err
}

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
level.Debug(logger).Log("msg", "upload enabled")
if err := dbs.Upload(context.Background()); err != nil {
level.Warn(logger).Log("err", err)
}

{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
if err := dbs.Upload(ctx); err != nil {
level.Warn(logger).Log("err", err)
}

return nil
Expand All @@ -479,8 +487,8 @@ func runReceive(
// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
if err := dbs.Upload(context.Background()); err != nil {
level.Warn(logger).Log("err", err)
}
}()
defer close(uploadDone)
Expand All @@ -494,8 +502,8 @@ func runReceive(
case <-ctx.Done():
return nil
case <-uploadC:
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
if err := dbs.Upload(ctx); err != nil {
level.Warn(logger).Log("err", err)
}
uploadDone <- struct{}{}
}
Expand Down
83 changes: 69 additions & 14 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package receive

import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
Expand All @@ -16,7 +18,10 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"golang.org/x/sync/errgroup"
)
Expand All @@ -28,14 +33,17 @@ type MultiTSDB struct {
tsdbCfg *tsdb.Options
tenantLabelName string
labels labels.Labels
bucket objstore.Bucket
upload bool

mtx *sync.RWMutex
dbs map[string]*FlushableStorage
appendables map[string]*tsdb.ReadyStorage
stores map[string]*store.TSDBStore
shippers map[string]*shipper.Shipper
}

func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string) *MultiTSDB {
func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string, bucket objstore.Bucket) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
}
Expand All @@ -49,8 +57,11 @@ func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbC
dbs: map[string]*FlushableStorage{},
stores: map[string]*store.TSDBStore{},
appendables: map[string]*tsdb.ReadyStorage{},
shippers: map[string]*shipper.Shipper{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
upload: bucket != nil,
}
}

Expand Down Expand Up @@ -110,6 +121,35 @@ func (t *MultiTSDB) Flush() error {
return merr.Err()
}

func (t *MultiTSDB) Upload(ctx context.Context) error {
if !t.upload {
return nil
}

t.mtx.Lock()
defer t.mtx.Unlock()

errmtx := &sync.Mutex{}
merr := terrors.MultiError{}
wg := &sync.WaitGroup{}
for tenant, s := range t.shippers {
level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenant)
s := s
wg.Add(1)
go func() {
if uploaded, err := s.Sync(ctx); err != nil {
errmtx.Lock()
merr.Add(fmt.Errorf("failed to upload %d: %w", uploaded, err))
errmtx.Unlock()
}
wg.Done()
}()
}

wg.Wait()
return merr.Err()
}

func (t *MultiTSDB) openTSDBs() error {
files, err := ioutil.ReadDir(t.dataDir)
if err != nil {
Expand All @@ -118,7 +158,6 @@ func (t *MultiTSDB) openTSDBs() error {

var g errgroup.Group
for _, f := range files {
// See: https://golang.org/doc/faq#closures_and_goroutines.
f := f
if !f.IsDir() {
continue
Expand Down Expand Up @@ -168,41 +207,57 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error)
t.mtx.Unlock()

go func() {
reg := prometheus.WrapRegistererWith(prometheus.Labels{
"tenant": tenantID,
}, t.reg)
logger := log.With(t.logger, "tenant", tenantID)
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
dataDir := path.Join(t.dataDir, tenantID)

var ship *shipper.Shipper
if t.upload {
ship = shipper.New(
logger,
reg,
dataDir,
t.bucket,
func() labels.Labels { return lbls },
metadata.ReceiveSource,
)
}

s := NewFlushableStorage(
path.Join(t.dataDir, tenantID),
log.With(t.logger, "tenant", tenantID),
prometheus.WrapRegistererWith(prometheus.Labels{
"tenant": tenantID,
}, t.reg),
dataDir,
logger,
reg,
t.tsdbCfg,
)

if err := s.Open(); err != nil {
level.Error(t.logger).Log("msg", "failed to open tsdb", "err", err)
level.Error(logger).Log("msg", "failed to open tsdb", "err", err)
t.mtx.Lock()
delete(t.appendables, tenantID)
delete(t.stores, tenantID)
t.mtx.Unlock()
if err := s.Close(); err != nil {
level.Error(t.logger).Log("msg", "failed to close tsdb", "err", err)
level.Error(logger).Log("msg", "failed to close tsdb", "err", err)
}
return
}

tstore := store.NewTSDBStore(
log.With(t.logger, "component", "thanos-tsdb-store", "tenant", tenantID),
prometheus.WrapRegistererWith(prometheus.Labels{
"tenant": tenantID,
}, t.reg),
logger,
reg,
s.Get(),
component.Receive,
append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}),
lbls,
)

t.mtx.Lock()
rs.Set(s.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000))
t.stores[tenantID] = tstore
t.dbs[tenantID] = s
t.shippers[tenantID] = ship
t.mtx.Unlock()
}()

Expand Down
4 changes: 3 additions & 1 deletion scripts/quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ for i in $(seq 0 1 2); do
--grpc-grace-period 1s \
--http-address 0.0.0.0:1${i}909 \
--http-grace-period 1s \
--receive.replication-factor 3 \
--receive.replication-factor 1 \
--tsdb.min-block-duration 5m \
--tsdb.max-block-duration 5m \
--label "receive_replica=\"${i}\"" \
--receive.local-endpoint 127.0.0.1:1${i}907 \
--receive.hashrings-file ./data/hashring.json \
Expand Down

0 comments on commit 0ca745a

Please sign in to comment.