diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 0769df4216d..1abdf16c6aa 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -84,8 +84,6 @@ func registerReceive(app *extkingpin.App) { forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) - hashringConfigHeader := cmd.Flag("receive.hashring-config-header", "HTTP header to specify hashring configuration for write requests.").Default(receive.DefaultHashringConfigHeader).String() - tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() @@ -164,7 +162,6 @@ func registerReceive(app *extkingpin.App) { *tenantLabelName, *replicaHeader, *replicationFactor, - *hashringConfigHeader, time.Duration(*forwardTimeout), *allowOutOfOrderUpload, component.Receive, @@ -204,7 +201,6 @@ func runReceive( tenantLabelName string, replicaHeader string, replicationFactor uint64, - hashringConfigHeader string, forwardTimeout time.Duration, allowOutOfOrderUpload bool, comp component.SourceStoreAPI, @@ -262,19 +258,18 @@ func runReceive( ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Writer: writer, - ListenAddress: rwAddress, - Registry: reg, - Endpoint: endpoint, - TenantHeader: tenantHeader, - DefaultTenantID: defaultTenantID, - ReplicaHeader: replicaHeader, - ReplicationFactor: replicationFactor, - HashringConfigHeader: hashringConfigHeader, - Tracer: tracer, - TLSConfig: rwTLSConfig, - DialOpts: dialOpts, - ForwardTimeout: forwardTimeout, + Writer: writer, + ListenAddress: rwAddress, + Registry: reg, + Endpoint: endpoint, + TenantHeader: tenantHeader, + DefaultTenantID: defaultTenantID, + ReplicaHeader: replicaHeader, + ReplicationFactor: replicationFactor, + Tracer: tracer, + TLSConfig: rwTLSConfig, + DialOpts: dialOpts, + ForwardTimeout: forwardTimeout, }) grpcProbe := prober.NewGRPC() diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index f98df4a4271..2abd451f738 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -50,8 +50,6 @@ const ( DefaultTenantLabel = "tenant_id" // DefaultReplicaHeader is the default header used to designate the replica count of a write request. DefaultReplicaHeader = "THANOS-REPLICA" - // DefaultHashringConfigHeader is the default header used to designate the hashring config hash of a write request. - DefaultHashringConfigHeader = "THANOS-HASHRING-CONFIG" // Labels for metrics. labelSuccess = "success" @@ -62,27 +60,25 @@ var ( // conflictErr is returned whenever an operation fails due to any conflict-type error. conflictErr = errors.New("conflict") - errBadReplica = errors.New("replica count exceeds replication factor") - errNotReady = errors.New("target not ready") - errUnavailable = errors.New("target not available") - errHashringConfigMismatch = errors.New("hashring configuration does not match") + errBadReplica = errors.New("replica count exceeds replication factor") + errNotReady = errors.New("target not ready") + errUnavailable = errors.New("target not available") ) // Options for the web Handler. type Options struct { - Writer *Writer - ListenAddress string - Registry prometheus.Registerer - TenantHeader string - DefaultTenantID string - ReplicaHeader string - Endpoint string - HashringConfigHeader string - ReplicationFactor uint64 - Tracer opentracing.Tracer - TLSConfig *tls.Config - DialOpts []grpc.DialOption - ForwardTimeout time.Duration + Writer *Writer + ListenAddress string + Registry prometheus.Registerer + TenantHeader string + DefaultTenantID string + ReplicaHeader string + Endpoint string + ReplicationFactor uint64 + Tracer opentracing.Tracer + TLSConfig *tls.Config + DialOpts []grpc.DialOption + ForwardTimeout time.Duration } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -262,7 +258,8 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, } if h.hashring.ConfigHash() != config { - return errHashringConfigMismatch + // TODO(kakkoyun): Add a metric? + level.Warn(h.logger).Log("msg", "hasring configuration mismatch", "current", h.hashring.ConfigHash(), "received", config) } r := replica{ @@ -324,12 +321,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { tenant = h.options.DefaultTenantID } - config := r.Header.Get(h.options.HashringConfigHeader) - if len(config) == 0 { - config = h.hashring.ConfigHash() - } - - err = h.handleRequest(ctx, rep, tenant, config, &wreq) + err = h.handleRequest(ctx, rep, tenant, h.hashring.ConfigHash(), &wreq) switch err { case nil: return @@ -341,8 +333,6 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusConflict) case errBadReplica: http.Error(w, err.Error(), http.StatusBadRequest) - case errHashringConfigMismatch: - http.Error(w, err.Error(), http.StatusPreconditionFailed) // TODO(kakkoyun): ??? default: level.Error(h.logger).Log("err", err, "msg", "internal server error") http.Error(w, err.Error(), http.StatusInternalServerError) @@ -663,8 +653,6 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st return nil, status.Error(codes.AlreadyExists, err.Error()) case errBadReplica: return nil, status.Error(codes.InvalidArgument, err.Error()) - case errHashringConfigMismatch: - return nil, status.Error(codes.FailedPrecondition, err.Error()) // TODO(kakkoyun): ??? default: return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index add3028cbab..efd12ce7ce2 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -162,12 +162,11 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) for i := range appendables { h := NewHandler(nil, &Options{ - TenantHeader: DefaultTenantHeader, - HashringConfigHeader: DefaultHashringConfigHeader, - ReplicaHeader: DefaultReplicaHeader, - ReplicationFactor: replicationFactor, - ForwardTimeout: 5 * time.Second, - Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), + TenantHeader: DefaultTenantHeader, + ReplicaHeader: DefaultReplicaHeader, + ReplicationFactor: replicationFactor, + ForwardTimeout: 5 * time.Second, + Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), }) handlers = append(handlers, h) h.peers = peers