Skip to content

Commit

Permalink
receive: Add support for TSDB per tenant
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Branczyk <[email protected]>
  • Loading branch information
brancz committed Mar 31, 2020
1 parent 465b55a commit 2713e07
Show file tree
Hide file tree
Showing 11 changed files with 513 additions and 79 deletions.
42 changes: 20 additions & 22 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String()

defaultTenantID := cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(receive.DefaultTenant).String()

tenantLabelName := cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).String()

replicaHeader := cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).String()

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()
Expand Down Expand Up @@ -144,6 +148,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
cw,
*local,
*tenantHeader,
*defaultTenantID,
*tenantLabelName,
*replicaHeader,
*replicationFactor,
comp,
Expand Down Expand Up @@ -179,14 +185,15 @@ func runReceive(
cw *receive.ConfigWatcher,
endpoint string,
tenantHeader string,
defaultTenantID string,
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice")

localStorage := &tsdb.ReadyStorage{}
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA)
if err != nil {
return err
Expand All @@ -196,11 +203,15 @@ func runReceive(
return err
}

dbs := receive.NewMultiTSDB(dataDir, logger, reg, tsdbOpts, lset, tenantLabelName)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
Expand Down Expand Up @@ -250,24 +261,13 @@ func runReceive(
{
// TSDB.
cancel := make(chan struct{})
startTimeMargin := int64(2 * time.Duration(tsdbOpts.MinBlockDuration).Seconds() * 1000)
g.Add(func() error {
defer close(dbReady)
defer close(uploadC)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring is loaded.
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbOpts,
)

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
}
}()
Expand All @@ -283,28 +283,25 @@ func runReceive(

level.Info(logger).Log("msg", "updating DB")

if err := db.Flush(); err != nil {
if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := db.Open(); err != nil {
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
level.Info(logger).Log("msg", "tsdb started")
localStorage.Set(db.Get(), startTimeMargin)
webHandler.SetWriter(receive.NewWriter(log.With(logger, "component", "receive-writer"), localStorage))
statusProber.Ready()
level.Info(logger).Log("msg", "server is ready to receive web requests")
dbReady <- struct{}{}
}
}
}, func(err error) {
close(cancel)
},
)
})
}

level.Debug(logger).Log("msg", "setting up hashring")
Expand Down Expand Up @@ -349,7 +346,6 @@ func runReceive(
if !ok {
return nil
}
webHandler.SetWriter(nil)
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
statusProber.NotReady(errors.New(msg))
Expand Down Expand Up @@ -397,9 +393,10 @@ func runReceive(
if s != nil {
s.Shutdown(errors.New("reload hashrings"))
}
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset)

multiStore := store.NewMultiTSDBStore(logger, reg, comp, dbs.TSDBClients)
rw := store.ReadWriteTSDBStore{
StoreServer: tsdbStore,
StoreServer: multiStore,
WriteableStoreServer: webHandler,
}

Expand All @@ -419,6 +416,7 @@ func runReceive(
// whenever the DB changes, thus it needs its own run group.
g.Add(func() error {
for range startGRPC {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", grpcBindAddr)
if err := s.ListenAndServe(); err != nil {
return errors.Wrap(err, "serve gRPC")
}
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,10 @@ github.com/sercand/kuberesolver v2.1.0+incompatible h1:iJ1oCzPQ/aacsbCWLfJW1hPKk
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20180825020608-02ddb050ef6b/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand All @@ -663,6 +665,7 @@ github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUr
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
Expand Down
58 changes: 33 additions & 25 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"google.golang.org/grpc"
Expand All @@ -40,6 +41,10 @@ import (
const (
// DefaultTenantHeader is the default header used to designate the tenant making a write request.
DefaultTenantHeader = "THANOS-TENANT"
// DefaultTenant is the default value used for when no tenant is passed via the tenant header.
DefaultTenant = "default-tenant"
// DefaultTenantLabel is the default label-name used for when no tenant is passed via the tenant header.
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
)
Expand All @@ -55,6 +60,7 @@ type Options struct {
ListenAddress string
Registry prometheus.Registerer
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
Expand Down Expand Up @@ -116,16 +122,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
return h
}

// SetWriter sets the writer.
// The writer must be set to a non-nil value in order for the
// handler to be ready and usable.
// If the writer is nil, then the handler is marked as not ready.
func (h *Handler) SetWriter(w *Writer) {
h.mtx.Lock()
defer h.mtx.Unlock()
h.writer = w
}

// Hashring sets the hashring for the handler and marks the hashring as ready.
// The hashring must be set to a non-nil value in order for the
// handler to be ready and usable.
Expand Down Expand Up @@ -266,11 +262,16 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}

tenant := r.Header.Get(h.options.TenantHeader)
if len(tenant) == 0 {
tenant = h.options.DefaultTenantID
}

err = h.handleRequest(r.Context(), rep, tenant, &wreq)
switch err {
case nil:
return
case tsdb.ErrNotReady:
http.Error(w, err.Error(), http.StatusServiceUnavailable)
case conflictErr:
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
Expand Down Expand Up @@ -357,23 +358,18 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
// can be ignored if the replication factor is met.
if endpoint == h.options.Endpoint {
go func(endpoint string) {
var err error
h.mtx.RLock()
if h.writer == nil {
err = errors.New("storage is not ready")
} else {
err = h.writer.Write(wreqs[endpoint])
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
if errs, ok := err.(terrors.MultiError); ok {
if countCause(errs, isConflict) > 0 {
err = errors.Wrap(conflictErr, errs.Error())
} else {
err = errors.New(errs.Error())
}
err := h.writer.Write(tenant, wreqs[endpoint])
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
if errs, ok := err.(terrors.MultiError); ok {
if countCause(errs, isConflict) > 0 {
err = errors.Wrap(conflictErr, errs.Error())
} else if countCause(errs, isNotReady) > 0 {
err = tsdb.ErrNotReady
} else {
err = errors.New(errs.Error())
}
}
h.mtx.RUnlock()
if err != nil {
level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint)
}
Expand Down Expand Up @@ -465,6 +461,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri

err := h.parallelizeRequests(ctx, tenant, replicas, wreqs)
if errs, ok := err.(terrors.MultiError); ok {
if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 {
return tsdb.ErrNotReady
}
if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 {
return errors.Wrap(conflictErr, "did not meet replication threshold")
}
Expand All @@ -482,6 +481,8 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
switch err {
case nil:
return &storepb.WriteResponse{}, nil
case tsdb.ErrNotReady:
return nil, status.Error(codes.Unavailable, err.Error())
case conflictErr:
return nil, status.Error(codes.AlreadyExists, err.Error())
case errBadReplica:
Expand Down Expand Up @@ -522,6 +523,13 @@ func isConflict(err error) bool {
status.Code(err) == codes.AlreadyExists
}

// isNotReady returns whether or not the given error represents a not ready error.
func isNotReady(err error) bool {
return err == tsdb.ErrNotReady ||
err.Error() == strconv.Itoa(http.StatusConflict) ||
status.Code(err) == codes.Unavailable
}

func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
Expand Down
16 changes: 8 additions & 8 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
TenantHeader: DefaultTenantHeader,
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
Writer: NewWriter(log.NewNopLogger(), appendables[i]),
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
})
handlers = append(handlers, h)
h.peers = peers
Expand Down Expand Up @@ -479,12 +479,12 @@ func TestReceive(t *testing.T) {
// on which node is erroring and which node is receiving.
for i, handler := range handlers {
// Test that the correct status is returned.
status, err := makeRequest(handler, tenant, tc.wreq)
rec, err := makeRequest(handler, tenant, tc.wreq)
if err != nil {
t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err)
}
if status != tc.status {
t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d", i, tc.status, status)
if rec.Code != tc.status {
t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String())
}
}
// Test that each time series is stored
Expand Down Expand Up @@ -547,22 +547,22 @@ func cycleErrors(errs []error) func() error {
}

// makeRequest is a helper to make a correct request against a remote write endpoint given a request.
func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, error) {
func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptest.ResponseRecorder, error) {
buf, err := proto.Marshal(wreq)
if err != nil {
return 0, errors.Wrap(err, "marshal request")
return nil, errors.Wrap(err, "marshal request")
}
req, err := http.NewRequest("POST", h.options.Endpoint, bytes.NewBuffer(snappy.Encode(nil, buf)))
if err != nil {
return 0, errors.Wrap(err, "create request")
return nil, errors.Wrap(err, "create request")
}
req.Header.Add(h.options.TenantHeader, tenant)

rec := httptest.NewRecorder()
h.receiveHTTP(rec, req)
rec.Flush()

return rec.Code, nil
return rec, nil
}

func randomAddr() string {
Expand Down
Loading

0 comments on commit 2713e07

Please sign in to comment.