diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e0e3780e8c..7f273dbdce 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -20,18 +20,17 @@ import ( "github.com/prometheus/prometheus/storage/tsdb" kingpin "gopkg.in/alecthomas/kingpin.v2" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" - "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/tls" ) @@ -203,7 +202,43 @@ func runReceive( return err } - dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName) + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + upload := true + if len(confContentYaml) == 0 { + level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled") + upload = false + } + + if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { + if !ignoreBlockSize { + return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ + "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) + } + level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") + } + + var bkt objstore.Bucket + if upload { + // The background shipper continuously scans the data directory and uploads + // new blocks to Google Cloud Storage or an S3-compatible storage service. + bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) + if err != nil { + return err + } + } + + dbs := receive.NewMultiTSDB( + dataDir, + logger, + reg, + tsdbOpts, + lset, + tenantLabelName, + bkt, + ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ Writer: writer, @@ -227,24 +262,6 @@ func runReceive( prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) - confContentYaml, err := objStoreConfig.Content() - if err != nil { - return err - } - upload := true - if len(confContentYaml) == 0 { - level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled") - upload = false - } - - if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { - if !ignoreBlockSize { - return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ - "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) - } - level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") - } - // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. @@ -438,18 +455,9 @@ func runReceive( } if upload { - // The background shipper continuously scans the data directory and uploads - // new blocks to Google Cloud Storage or an S3-compatible storage service. - bkt, err := client.NewBucket(logger, confContentYaml, reg, comp.String()) - if err != nil { - return err - } - - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource) - - // Before starting, ensure any old blocks are uploaded. - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) + level.Debug(logger).Log("msg", "upload enabled") + if err := dbs.Upload(context.Background()); err != nil { + level.Warn(logger).Log("err", err) } { @@ -457,8 +465,8 @@ func runReceive( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if uploaded, err := s.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err, "uploaded", uploaded) + if err := dbs.Upload(ctx); err != nil { + level.Warn(logger).Log("err", err) } return nil @@ -479,8 +487,8 @@ func runReceive( // Before quitting, ensure all blocks are uploaded. defer func() { <-uploadC - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) + if err := dbs.Upload(context.Background()); err != nil { + level.Warn(logger).Log("err", err) } }() defer close(uploadDone) @@ -494,8 +502,8 @@ func runReceive( case <-ctx.Done(): return nil case <-uploadC: - if uploaded, err := s.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) + if err := dbs.Upload(ctx); err != nil { + level.Warn(logger).Log("err", err) } uploadDone <- struct{}{} } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 13f7bfe1e6..bcc6e9501d 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -4,6 +4,8 @@ package receive import ( + "context" + "fmt" "io/ioutil" "os" "path" @@ -16,7 +18,10 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "golang.org/x/sync/errgroup" ) @@ -28,14 +33,17 @@ type MultiTSDB struct { tsdbCfg *tsdb.Options tenantLabelName string labels labels.Labels + bucket objstore.Bucket + upload bool mtx *sync.RWMutex dbs map[string]*FlushableStorage appendables map[string]*tsdb.ReadyStorage stores map[string]*store.TSDBStore + shippers map[string]*shipper.Shipper } -func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string) *MultiTSDB { +func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbCfg *tsdb.Options, labels labels.Labels, tenantLabelName string, bucket objstore.Bucket) *MultiTSDB { if l == nil { l = log.NewNopLogger() } @@ -49,8 +57,11 @@ func NewMultiTSDB(dataDir string, l log.Logger, reg prometheus.Registerer, tsdbC dbs: map[string]*FlushableStorage{}, stores: map[string]*store.TSDBStore{}, appendables: map[string]*tsdb.ReadyStorage{}, + shippers: map[string]*shipper.Shipper{}, labels: labels, tenantLabelName: tenantLabelName, + bucket: bucket, + upload: bucket != nil, } } @@ -110,6 +121,35 @@ func (t *MultiTSDB) Flush() error { return merr.Err() } +func (t *MultiTSDB) Upload(ctx context.Context) error { + if !t.upload { + return nil + } + + t.mtx.Lock() + defer t.mtx.Unlock() + + errmtx := &sync.Mutex{} + merr := terrors.MultiError{} + wg := &sync.WaitGroup{} + for tenant, s := range t.shippers { + level.Debug(t.logger).Log("msg", "uploading block for tenant", "tenant", tenant) + s := s + wg.Add(1) + go func() { + if uploaded, err := s.Sync(ctx); err != nil { + errmtx.Lock() + merr.Add(fmt.Errorf("failed to upload %d: %w", uploaded, err)) + errmtx.Unlock() + } + wg.Done() + }() + } + + wg.Wait() + return merr.Err() +} + func (t *MultiTSDB) openTSDBs() error { files, err := ioutil.ReadDir(t.dataDir) if err != nil { @@ -118,7 +158,6 @@ func (t *MultiTSDB) openTSDBs() error { var g errgroup.Group for _, f := range files { - // See: https://golang.org/doc/faq#closures_and_goroutines. f := f if !f.IsDir() { continue @@ -168,41 +207,57 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tsdb.ReadyStorage, error) t.mtx.Unlock() go func() { + reg := prometheus.WrapRegistererWith(prometheus.Labels{ + "tenant": tenantID, + }, t.reg) + logger := log.With(t.logger, "tenant", tenantID) + lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) + dataDir := path.Join(t.dataDir, tenantID) + + var ship *shipper.Shipper + if t.upload { + ship = shipper.New( + logger, + reg, + dataDir, + t.bucket, + func() labels.Labels { return lbls }, + metadata.ReceiveSource, + ) + } + s := NewFlushableStorage( - path.Join(t.dataDir, tenantID), - log.With(t.logger, "tenant", tenantID), - prometheus.WrapRegistererWith(prometheus.Labels{ - "tenant": tenantID, - }, t.reg), + dataDir, + logger, + reg, t.tsdbCfg, ) if err := s.Open(); err != nil { - level.Error(t.logger).Log("msg", "failed to open tsdb", "err", err) + level.Error(logger).Log("msg", "failed to open tsdb", "err", err) t.mtx.Lock() delete(t.appendables, tenantID) delete(t.stores, tenantID) t.mtx.Unlock() if err := s.Close(); err != nil { - level.Error(t.logger).Log("msg", "failed to close tsdb", "err", err) + level.Error(logger).Log("msg", "failed to close tsdb", "err", err) } return } tstore := store.NewTSDBStore( - log.With(t.logger, "component", "thanos-tsdb-store", "tenant", tenantID), - prometheus.WrapRegistererWith(prometheus.Labels{ - "tenant": tenantID, - }, t.reg), + logger, + reg, s.Get(), component.Receive, - append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}), + lbls, ) t.mtx.Lock() rs.Set(s.Get(), int64(2*time.Duration(t.tsdbCfg.MinBlockDuration).Seconds()*1000)) t.stores[tenantID] = tstore t.dbs[tenantID] = s + t.shippers[tenantID] = ship t.mtx.Unlock() }() diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index 4c78b123b9..813b4623b5 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -182,7 +182,9 @@ for i in $(seq 0 1 2); do --grpc-grace-period 1s \ --http-address 0.0.0.0:1${i}909 \ --http-grace-period 1s \ - --receive.replication-factor 3 \ + --receive.replication-factor 1 \ + --tsdb.min-block-duration 5m \ + --tsdb.max-block-duration 5m \ --label "receive_replica=\"${i}\"" \ --receive.local-endpoint 127.0.0.1:1${i}907 \ --receive.hashrings-file ./data/hashring.json \