Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Add support for TSDB per tenant #2012

Merged
merged 9 commits into from
May 4, 2020
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ test-e2e: docker
@echo ">> running /test/e2e tests."
# NOTE(bwplotka):
# * If you see errors on CI (timeouts), but not locally, try to add -parallel 1 to limit to single CPU to reproduce small 1CPU machine.
@go test -failfast -timeout 10m -v ./test/e2e/...
brancz marked this conversation as resolved.
Show resolved Hide resolved
@go test -failfast -timeout 10m $(RUN_ARGS) -v ./test/e2e/...

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
126 changes: 66 additions & 60 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -72,6 +71,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String()

defaultTenantID := cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(receive.DefaultTenant).String()

tenantLabelName := cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).String()

replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).String()

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()
Expand Down Expand Up @@ -144,6 +147,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
cw,
*local,
*tenantHeader,
*defaultTenantID,
*tenantLabelName,
*replicaHeader,
*replicationFactor,
comp,
Expand Down Expand Up @@ -179,14 +184,15 @@ func runReceive(
cw *receive.ConfigWatcher,
endpoint string,
tenantHeader string,
defaultTenantID string,
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

localStorage := &tsdb.ReadyStorage{}
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
Expand All @@ -196,11 +202,51 @@ func runReceive(
return err
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
brancz marked this conversation as resolved.
Show resolved Hide resolved
upload = false
}

if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !ignoreBlockSize {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
}

var bkt objstore.Bucket
if upload {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
brancz marked this conversation as resolved.
Show resolved Hide resolved
bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String())
if err != nil {
return err
}
}

dbs := receive.NewMultiTSDB(
dataDir,
logger,
reg,
tsdbOpts,
lset,
tenantLabelName,
bkt,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
Expand All @@ -216,24 +262,6 @@ func runReceive(
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
upload := true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
upload = false
}

if upload && tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !ignoreBlockSize {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration)
}
level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.")
}

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.

Expand All @@ -250,24 +278,13 @@ func runReceive(
{
// TSDB.
cancel := make(chan struct{})
startTimeMargin := int64(2 * time.Duration(tsdbOpts.MinBlockDuration).Seconds() * 1000)
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring is loaded.
db := receive.NewFlushableStorage(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can ignore this initial flushing because now we know for sure that tenant data will not be mixed, whereas before it could have been 👍

dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbOpts,
)

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
}()
Expand All @@ -283,28 +300,25 @@ func runReceive(

level.Info(logger).Log("msg", "updating DB")

if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
brancz marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "flushing storage")
}
if err := db.Open(); err != nil {
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
brancz marked this conversation as resolved.
Show resolved Hide resolved
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests")
dbReady <- struct{}{}
}
}
}, func(err error) {
close(cancel)
},
)
})
}

level.Debug(logger).Log("msg", "setting up hashring")
Expand Down Expand Up @@ -349,7 +363,6 @@ func runReceive(
if !ok {
return nil
}
webHandler.SetWriter(nil)
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
Expand Down Expand Up @@ -397,9 +410,10 @@ func runReceive(
if s != nil {
s.Shutdown(errors.New("reload hashrings"))
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset)

multiStore := store.NewMultiTSDBStore(logger, reg, comp, dbs.TSDBStores)
brancz marked this conversation as resolved.
Show resolved Hide resolved
rw := store.ReadWriteTSDBStore{
StoreServer: tsdbStore,
StoreServer: multiStore,
WriteableStoreServer: webHandler,
}

Expand All @@ -419,6 +433,7 @@ func runReceive(
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", grpcBindAddr)
if err := s.ListenAndServe(); err != nil {
return errors.Wrap(err, "serve gRPC")
}
Expand All @@ -440,27 +455,18 @@ func runReceive(
}

if upload {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, comp.String())
if err != nil {
return err
}

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource)

// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
level.Debug(logger).Log("msg", "upload enabled")
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
if err := dbs.Upload(context.Background()); err != nil {
brancz marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(logger).Log("err", err)
}

{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
if err := dbs.Upload(ctx); err != nil {
level.Warn(logger).Log("err", err)
}

return nil
Expand All @@ -481,8 +487,8 @@ func runReceive(
// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
if err := dbs.Upload(context.Background()); err != nil {
level.Warn(logger).Log("err", err)
brancz marked this conversation as resolved.
Show resolved Hide resolved
}
}()
defer close(uploadDone)
Expand All @@ -496,8 +502,8 @@ func runReceive(
case <-ctx.Done():
return nil
case <-uploadC:
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "failed to upload", uploaded)
if err := dbs.Upload(ctx); err != nil {
level.Warn(logger).Log("err", err)
}
uploadDone <- struct{}{}
}
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,10 @@ github.com/sercand/kuberesolver v2.1.0+incompatible h1:iJ1oCzPQ/aacsbCWLfJW1hPKk
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand All @@ -663,6 +665,7 @@ github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUr
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down
Loading