Skip to content

Commit

Permalink
Include current config hash in forward requests
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Sep 8, 2020
1 parent fdbea8a commit 9a31938
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 116 deletions.
30 changes: 18 additions & 12 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -83,6 +84,8 @@ func registerReceive(app *extkingpin.App) {

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

hashringConfigHeader := cmd.Flag("receive.hashring-config-header", "HTTP header to specify hashring configuration for write requests.").Default(receive.DefaultHashringConfigHeader).String()

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
Expand Down Expand Up @@ -161,6 +164,7 @@ func registerReceive(app *extkingpin.App) {
*tenantLabelName,
*replicaHeader,
*replicationFactor,
*hashringConfigHeader,
time.Duration(*forwardTimeout),
*allowOutOfOrderUpload,
component.Receive,
Expand Down Expand Up @@ -200,6 +204,7 @@ func runReceive(
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
hashringConfigHeader string,
forwardTimeout time.Duration,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
Expand Down Expand Up @@ -257,18 +262,19 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: forwardTimeout,
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
HashringConfigHeader: hashringConfigHeader,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: forwardTimeout,
})

grpcProbe := prober.NewGRPC()
Expand Down
55 changes: 37 additions & 18 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
// DefaultHashringConfigHeader is the default header used to designate the hashring config hash of a write request.
DefaultHashringConfigHeader = "THANOS-HASHRING-CONFIG"

// Labels for metrics.
labelSuccess = "success"
labelError = "error"
Expand All @@ -59,25 +62,27 @@ var (
// conflictErr is returned whenever an operation fails due to any conflict-type error.
conflictErr = errors.New("conflict")

errBadReplica = errors.New("replica count exceeds replication factor")
errNotReady = errors.New("target not ready")
errUnavailable = errors.New("target not available")
errBadReplica = errors.New("replica count exceeds replication factor")
errNotReady = errors.New("target not ready")
errUnavailable = errors.New("target not available")
errHashringConfigMismatch = errors.New("hashring configuration does not match")
)

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Endpoint string
HashringConfigHeader string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -250,12 +255,16 @@ type replica struct {
replicated bool
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, config string, wreq *prompb.WriteRequest) error {
// The replica value in the header is one-indexed, thus we need >.
if rep > h.options.ReplicationFactor {
return errBadReplica
}

if h.hashring.ConfigHash() != config {
return errHashringConfigMismatch
}

r := replica{
n: rep,
replicated: rep != 0,
Expand Down Expand Up @@ -315,7 +324,12 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
config := r.Header.Get(h.options.HashringConfigHeader)
if len(config) == 0 {
config = h.hashring.ConfigHash()
}

err = h.handleRequest(ctx, rep, tenant, config, &wreq)
switch err {
case nil:
return
Expand All @@ -327,6 +341,8 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
case errHashringConfigMismatch:
http.Error(w, err.Error(), http.StatusPreconditionFailed) // TODO(kakkoyun): ???
default:
level.Error(h.logger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -513,6 +529,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(replicas[endpoint].n + 1),
Config: h.hashring.ConfigHash(),
})
})
if err != nil {
Expand Down Expand Up @@ -634,7 +651,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, r.Config, &prompb.WriteRequest{Timeseries: r.Timeseries})
switch err {
case nil:
return &storepb.WriteResponse{}, nil
Expand All @@ -646,6 +663,8 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
return nil, status.Error(codes.AlreadyExists, err.Error())
case errBadReplica:
return nil, status.Error(codes.InvalidArgument, err.Error())
case errHashringConfigMismatch:
return nil, status.Error(codes.FailedPrecondition, err.Error()) // TODO(kakkoyun): ???
default:
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)

for i := range appendables {
h := NewHandler(nil, &Options{
TenantHeader: DefaultTenantHeader,
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Second,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
TenantHeader: DefaultTenantHeader,
HashringConfigHeader: DefaultHashringConfigHeader,
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Second,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
})
handlers = append(handlers, h)
h.peers = peers
Expand Down
27 changes: 27 additions & 0 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package receive

import (
"context"
"crypto/sha256"
"fmt"
"sort"
"sync"

"github.com/cespare/xxhash"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand All @@ -36,6 +38,8 @@ type Hashring interface {
Get(tenant string, timeSeries *prompb.TimeSeries) (string, error)
// GetN returns the nth node that should handle the given tenant and time series.
GetN(tenant string, timeSeries *prompb.TimeSeries, n uint64) (string, error)
// ConfigHash string returns the hash of the loaded configuration.
ConfigHash() string
}

// hash returns a hash for the given tenant and time series.
Expand Down Expand Up @@ -71,6 +75,11 @@ func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (stri
return string(s), nil
}

// ConfigHash implements the Hashring interface.
func (s SingleNodeHashring) ConfigHash() string {
return string(s)
}

// simpleHashring represents a group of nodes handling write requests.
type simpleHashring []string

Expand All @@ -87,6 +96,15 @@ func (s simpleHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return s[(hash(tenant, ts)+n)%uint64(len(s))], nil
}

// ConfigHash string returns the hash of the loaded configuration.
func (s simpleHashring) ConfigHash() string {
h := sha256.New()
for _, v := range s {
h.Write([]byte(v))
}
return string(h.Sum(nil))
}

// multiHashring represents a set of hashrings.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
Expand Down Expand Up @@ -135,6 +153,15 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
return "", errors.New("no matching hashring to handle tenant")
}

// ConfigHash string returns the hash of the loaded configuration.
func (m *multiHashring) ConfigHash() string {
h := sha256.New()
for _, v := range m.hashrings {
h.Write([]byte(v.ConfigHash()))
}
return string(h.Sum(nil))
}

// newMultiHashring creates a multi-tenant hashring for a given slice of
// groups.
// Which hashring to use for a tenant is determined
Expand Down
Loading

0 comments on commit 9a31938

Please sign in to comment.