Skip to content

Commit

Permalink
Address review issues
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Sep 9, 2020
1 parent 9a31938 commit c603aad
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 53 deletions.
29 changes: 12 additions & 17 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func registerReceive(app *extkingpin.App) {

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

hashringConfigHeader := cmd.Flag("receive.hashring-config-header", "HTTP header to specify hashring configuration for write requests.").Default(receive.DefaultHashringConfigHeader).String()

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
Expand Down Expand Up @@ -164,7 +162,6 @@ func registerReceive(app *extkingpin.App) {
*tenantLabelName,
*replicaHeader,
*replicationFactor,
*hashringConfigHeader,
time.Duration(*forwardTimeout),
*allowOutOfOrderUpload,
component.Receive,
Expand Down Expand Up @@ -204,7 +201,6 @@ func runReceive(
tenantLabelName string,
replicaHeader string,
replicationFactor uint64,
hashringConfigHeader string,
forwardTimeout time.Duration,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
Expand Down Expand Up @@ -262,19 +258,18 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
HashringConfigHeader: hashringConfigHeader,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: forwardTimeout,
Writer: writer,
ListenAddress: rwAddress,
Registry: reg,
Endpoint: endpoint,
TenantHeader: tenantHeader,
DefaultTenantID: defaultTenantID,
ReplicaHeader: replicaHeader,
ReplicationFactor: replicationFactor,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: forwardTimeout,
})

grpcProbe := prober.NewGRPC()
Expand Down
48 changes: 18 additions & 30 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
DefaultTenantLabel = "tenant_id"
// DefaultReplicaHeader is the default header used to designate the replica count of a write request.
DefaultReplicaHeader = "THANOS-REPLICA"
// DefaultHashringConfigHeader is the default header used to designate the hashring config hash of a write request.
DefaultHashringConfigHeader = "THANOS-HASHRING-CONFIG"

// Labels for metrics.
labelSuccess = "success"
Expand All @@ -62,27 +60,25 @@ var (
// conflictErr is returned whenever an operation fails due to any conflict-type error.
conflictErr = errors.New("conflict")

errBadReplica = errors.New("replica count exceeds replication factor")
errNotReady = errors.New("target not ready")
errUnavailable = errors.New("target not available")
errHashringConfigMismatch = errors.New("hashring configuration does not match")
errBadReplica = errors.New("replica count exceeds replication factor")
errNotReady = errors.New("target not ready")
errUnavailable = errors.New("target not available")
)

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Endpoint string
HashringConfigHeader string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
Writer *Writer
ListenAddress string
Registry prometheus.Registerer
TenantHeader string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -262,7 +258,8 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
}

if h.hashring.ConfigHash() != config {
return errHashringConfigMismatch
// TODO(kakkoyun): Add a metric?
level.Warn(h.logger).Log("msg", "hasring configuration mismatch", "current", h.hashring.ConfigHash(), "received", config)
}

r := replica{
Expand Down Expand Up @@ -324,12 +321,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

config := r.Header.Get(h.options.HashringConfigHeader)
if len(config) == 0 {
config = h.hashring.ConfigHash()
}

err = h.handleRequest(ctx, rep, tenant, config, &wreq)
err = h.handleRequest(ctx, rep, tenant, h.hashring.ConfigHash(), &wreq)
switch err {
case nil:
return
Expand All @@ -341,8 +333,6 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusConflict)
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
case errHashringConfigMismatch:
http.Error(w, err.Error(), http.StatusPreconditionFailed) // TODO(kakkoyun): ???
default:
level.Error(h.logger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -663,8 +653,6 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
return nil, status.Error(codes.AlreadyExists, err.Error())
case errBadReplica:
return nil, status.Error(codes.InvalidArgument, err.Error())
case errHashringConfigMismatch:
return nil, status.Error(codes.FailedPrecondition, err.Error()) // TODO(kakkoyun): ???
default:
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,11 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)

for i := range appendables {
h := NewHandler(nil, &Options{
TenantHeader: DefaultTenantHeader,
HashringConfigHeader: DefaultHashringConfigHeader,
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Second,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
TenantHeader: DefaultTenantHeader,
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Second,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])),
})
handlers = append(handlers, h)
h.peers = peers
Expand Down

0 comments on commit c603aad

Please sign in to comment.