diff --git a/CHANGELOG.md b/CHANGELOG.md index 34af658e1c..985ba42fa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags - [#1967](https://github.com/thanos-io/thanos/issues/1967) Receive: Allow local TSDB compaction - [#1975](https://github.com/thanos-io/thanos/pull/1975) Store Gateway: fixed panic caused by memcached servers selector when there's 1 memcached node +- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`. ### Fixed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 2dd9c17663..349e9bf2ea 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -10,8 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -21,9 +19,12 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" + kingpin "gopkg.in/alecthomas/kingpin.v2" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/prober" @@ -34,11 +35,7 @@ import ( httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/tls" - "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) // registerQuery registers a query command. @@ -165,48 +162,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { } } -func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { - grpcMets := grpc_prometheus.NewClientMetrics() - grpcMets.EnableClientHandlingTimeHistogram( - grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), - ) - dialOpts := []grpc.DialOption{ - // We want to make sure that we can receive huge gRPC messages from storeAPI. - // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. - // Current limit is ~2GB. - // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), - grpc.WithUnaryInterceptor( - grpc_middleware.ChainUnaryClient( - grpcMets.UnaryClientInterceptor(), - tracing.UnaryClientInterceptor(tracer), - ), - ), - grpc.WithStreamInterceptor( - grpc_middleware.ChainStreamClient( - grpcMets.StreamClientInterceptor(), - tracing.StreamClientInterceptor(tracer), - ), - ), - } - - if reg != nil { - reg.MustRegister(grpcMets) - } - - if !secure { - return append(dialOpts, grpc.WithInsecure()), nil - } - - level.Info(logger).Log("msg", "enabling client to server TLS") - - tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName) - if err != nil { - return nil, err - } - return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil -} - // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured // store nodes, merging and duplicating the data to satisfy user query. func runQuery( @@ -251,7 +206,7 @@ func runQuery( }) reg.MustRegister(duplicatedStores) - dialOpts, err := storeClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName) + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, cert, key, caCert, serverName) if err != nil { return errors.Wrap(err, "building gRPC client") } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c55d328364..ba552e67e3 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -15,9 +15,12 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage/tsdb" + kingpin "gopkg.in/alecthomas/kingpin.v2" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extgrpc" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/receive" @@ -27,7 +30,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/tls" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) func registerReceive(m map[string]setupFunc, app *kingpin.Application) { @@ -189,6 +191,11 @@ func runReceive( if err != nil { return err } + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, rwServerCert != "", rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName) + if err != nil { + return err + } + webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ ListenAddress: rwAddress, Registry: reg, @@ -199,6 +206,7 @@ func runReceive( Tracer: tracer, TLSConfig: rwTLSConfig, TLSClientConfig: rwTLSClientConfig, + DialOpts: dialOpts, }) statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) @@ -376,8 +384,12 @@ func runReceive( s.Shutdown(errors.New("reload hashrings")) } tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), comp, lset) + rw := store.ReadWriteTSDBStore{ + StoreServer: tsdbStore, + WriteableStoreServer: webHandler, + } - s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, tsdbStore, + s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw, grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), diff --git a/pkg/extgrpc/client.go b/pkg/extgrpc/client.go new file mode 100644 index 0000000000..2d97f9894b --- /dev/null +++ b/pkg/extgrpc/client.go @@ -0,0 +1,59 @@ +package extgrpc + +import ( + "math" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + opentracing "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/tls" + "github.com/thanos-io/thanos/pkg/tracing" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client. +func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { + grpcMets := grpc_prometheus.NewClientMetrics() + grpcMets.EnableClientHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ) + dialOpts := []grpc.DialOption{ + // We want to make sure that we can receive huge gRPC messages from storeAPI. + // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. + // Current limit is ~2GB. + // TODO(bplotka): Split sent chunks on store node per max 4MB chunks if needed. + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + grpc.WithUnaryInterceptor( + grpc_middleware.ChainUnaryClient( + grpcMets.UnaryClientInterceptor(), + tracing.UnaryClientInterceptor(tracer), + ), + ), + grpc.WithStreamInterceptor( + grpc_middleware.ChainStreamClient( + grpcMets.StreamClientInterceptor(), + tracing.StreamClientInterceptor(tracer), + ), + ), + } + + if reg != nil { + reg.MustRegister(grpcMets) + } + + if !secure { + return append(dialOpts, grpc.WithInsecure()), nil + } + + level.Info(logger).Log("msg", "enabling client to server TLS") + + tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName) + if err != nil { + return nil, err + } + return append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))), nil +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 7d082a9de1..5ee91d1017 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1,7 +1,6 @@ package receive import ( - "bytes" "context" "crypto/tls" "fmt" @@ -24,8 +23,13 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" terrors "github.com/prometheus/prometheus/tsdb/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -39,23 +43,25 @@ const ( // conflictErr is returned whenever an operation fails due to any conflict-type error. var conflictErr = errors.New("conflict") +var errBadReplica = errors.New("replica count exceeds replication factor") + // Options for the web Handler. type Options struct { Writer *Writer ListenAddress string Registry prometheus.Registerer - Endpoint string TenantHeader string ReplicaHeader string + Endpoint string ReplicationFactor uint64 Tracer opentracing.Tracer TLSConfig *tls.Config TLSClientConfig *tls.Config + DialOpts []grpc.DialOption } // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { - client *http.Client logger log.Logger writer *Writer router *route.Router @@ -64,6 +70,7 @@ type Handler struct { mtx sync.RWMutex hashring Hashring + peers *peerGroup // Metrics. forwardRequestsTotal *prometheus.CounterVec @@ -74,19 +81,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler { logger = log.NewNopLogger() } - transport := http.DefaultTransport.(*http.Transport) - transport.TLSClientConfig = o.TLSClientConfig - client := &http.Client{Transport: transport} - if o.Tracer != nil { - client.Transport = tracing.HTTPTripperware(logger, client.Transport) - } - h := &Handler{ - client: client, logger: logger, writer: o.Writer, router: route.New(), options: o, + peers: newPeerGroup(o.DialOpts...), forwardRequestsTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -109,7 +109,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { return ins.NewHandler(name, http.HandlerFunc(next)) } - h.router.Post("/api/v1/receive", instrf("receive", readyf(h.receive))) + h.router.Post("/api/v1/receive", instrf("receive", readyf(h.receiveHTTP))) return h } @@ -199,7 +199,35 @@ type replica struct { replicated bool } -func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { +func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { + // The replica value in the header is one-indexed, thus we need >. + if rep > h.options.ReplicationFactor { + return errBadReplica + } + + r := replica{ + n: rep, + replicated: rep != 0, + } + + // on-the-wire format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated. + if r.replicated { + r.n-- + } + + // Forward any time series as necessary. All time series + // destined for the local node will be written to the receiver. + // Time series will be replicated as necessary. + if err := h.forward(ctx, tenant, r, wreq); err != nil { + if countCause(err, isConflict) > 0 { + return conflictErr + } + return err + } + return nil +} + +func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { compressed, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -219,34 +247,28 @@ func (h *Handler) receive(w http.ResponseWriter, r *http.Request) { return } - var rep replica - replicaRaw := r.Header.Get(h.options.ReplicaHeader) + rep := uint64(0) // If the header is empty, we assume the request is not yet replicated. - if replicaRaw != "" { - if rep.n, err = strconv.ParseUint(replicaRaw, 10, 64); err != nil { + if replicaRaw := r.Header.Get(h.options.ReplicaHeader); replicaRaw != "" { + if rep, err = strconv.ParseUint(replicaRaw, 10, 64); err != nil { http.Error(w, "could not parse replica header", http.StatusBadRequest) return } - rep.replicated = true - } - // The replica value in the header is zero-indexed, thus we need >=. - if rep.n >= h.options.ReplicationFactor { - http.Error(w, "replica count exceeds replication factor", http.StatusBadRequest) - return } tenant := r.Header.Get(h.options.TenantHeader) - // Forward any time series as necessary. All time series - // destined for the local node will be written to the receiver. - // Time series will be replicated as necessary. - if err := h.forward(r.Context(), tenant, rep, &wreq); err != nil { - if countCause(err, isConflict) > 0 { - http.Error(w, err.Error(), http.StatusConflict) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) + err = h.handleRequest(r.Context(), rep, tenant, &wreq) + switch err { + case nil: return + case conflictErr: + http.Error(w, err.Error(), http.StatusConflict) + case errBadReplica: + http.Error(w, err.Error(), http.StatusBadRequest) + default: + level.Error(h.logger).Log("err", err, "msg", "internal server error") + http.Error(w, err.Error(), http.StatusInternalServerError) } } @@ -352,20 +374,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic } // Make a request to the specified endpoint. go func(endpoint string) { - buf, err := proto.Marshal(wreqs[endpoint]) - if err != nil { - level.Error(h.logger).Log("msg", "marshaling proto", "err", err, "endpoint", endpoint) - ec <- err - return - } - req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(snappy.Encode(nil, buf))) - if err != nil { - level.Error(h.logger).Log("msg", "creating request", "err", err, "endpoint", endpoint) - ec <- err - return - } - req.Header.Add(h.options.TenantHeader, tenant) - req.Header.Add(h.options.ReplicaHeader, strconv.FormatUint(replicas[endpoint].n, 10)) + var err error // Increment the counters as necessary now that // the requests will go out. @@ -377,25 +386,29 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic h.forwardRequestsTotal.WithLabelValues("success").Inc() }() + cl, err := h.peers.get(ctx, endpoint) + if err != nil { + level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint) + ec <- err + return + } + // Create a span to track the request made to another receive node. span, ctx := tracing.StartSpan(ctx, "thanos_receive_forward") defer span.Finish() // Actually make the request against the endpoint // we determined should handle these time series. - var res *http.Response - res, err = h.client.Do(req.WithContext(ctx)) + _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ + Timeseries: wreqs[endpoint].Timeseries, + Tenant: tenant, + Replica: int64(replicas[endpoint].n + 1), // increment replica since on-the-wire format is 1-indexed and 0 indicates unreplicated. + }) if err != nil { level.Error(h.logger).Log("msg", "forwarding request", "err", err, "endpoint", endpoint) ec <- err return } - if res.StatusCode != http.StatusOK { - err = errors.New(strconv.Itoa(res.StatusCode)) - level.Error(h.logger).Log("msg", "forwarding returned non-200 status", "err", err, "endpoint", endpoint) - ec <- err - return - } ec <- nil }(endpoint) } @@ -456,6 +469,21 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri return errors.Wrap(err, "could not replicate write request") } +// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. +func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) { + err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries}) + switch err { + case nil: + return &storepb.WriteResponse{}, nil + case conflictErr: + return nil, status.Error(codes.AlreadyExists, err.Error()) + case errBadReplica: + return nil, status.Error(codes.InvalidArgument, err.Error()) + default: + return nil, status.Error(codes.Internal, err.Error()) + } +} + // countCause counts the number of errors within the given error // whose causes satisfy the given function. // countCause will inspect the error's cause or, if the error is a MultiError, @@ -479,5 +507,54 @@ func isConflict(err error) bool { if err == nil { return false } - return err == conflictErr || err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || err == storage.ErrOutOfBounds || err.Error() == strconv.Itoa(http.StatusConflict) + return err == conflictErr || + err == storage.ErrDuplicateSampleForTimestamp || + err == storage.ErrOutOfOrderSample || + err == storage.ErrOutOfBounds || + err.Error() == strconv.Itoa(http.StatusConflict) || + status.Code(err) == codes.AlreadyExists +} + +func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { + return &peerGroup{ + dialOpts: dialOpts, + cache: map[string]storepb.WriteableStoreClient{}, + m: sync.RWMutex{}, + dialer: grpc.DialContext, + } +} + +type peerGroup struct { + dialOpts []grpc.DialOption + cache map[string]storepb.WriteableStoreClient + m sync.RWMutex + + // dialer is used for testing. + dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) +} + +func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { + // use a RLock first to prevent blocking if we don't need to. + p.m.RLock() + c, ok := p.cache[addr] + p.m.RUnlock() + if ok { + return c, nil + } + + p.m.Lock() + defer p.m.Unlock() + // Make sure that another caller hasn't created the connection since obtaining the write lock. + c, ok = p.cache[addr] + if ok { + return c, nil + } + conn, err := p.dialer(ctx, addr, p.dialOpts...) + if err != nil { + return nil, errors.Wrap(err, "failed to dial peer") + } + + client := storepb.NewWriteableStoreClient(conn) + p.cache[addr] = client + return client, nil } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index a040a21c80..6f58b84cec 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -2,6 +2,9 @@ package receive import ( "bytes" + "context" + "fmt" + "math/rand" "net/http" "net/http/httptest" "strconv" @@ -16,6 +19,9 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" terrors "github.com/prometheus/prometheus/tsdb/errors" + "google.golang.org/grpc" + + "github.com/thanos-io/thanos/pkg/store/storepb" ) func TestCountCause(t *testing.T) { @@ -129,14 +135,27 @@ func TestCountCause(t *testing.T) { } } -func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring, func()) { +func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { cfg := []HashringConfig{ { Hashring: "test", }, } - var closers []func() var handlers []*Handler + // create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers + // This removes the network from the tests and creates a more consistent testing harness. + peers := &peerGroup{ + dialOpts: nil, + m: sync.RWMutex{}, + cache: map[string]storepb.WriteableStoreClient{}, + dialer: func(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error) { + // dialer should never be called since we are creating fake clients with fake addresses + // this protects against some leaking test that may attempt to dial random IP addresses + // which may pose a security risk. + return nil, errors.New("unexpected dial called in testing") + }, + } + for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: DefaultTenantHeader, @@ -145,21 +164,17 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) Writer: NewWriter(log.NewNopLogger(), appendables[i]), }) handlers = append(handlers, h) - ts := httptest.NewServer(h.router) - closers = append(closers, ts.Close) - h.options.Endpoint = ts.URL + "/api/v1/receive" + h.peers = peers + addr := randomAddr() + h.options.Endpoint = addr cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) + peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } hashring := newMultiHashring(cfg) for _, h := range handlers { h.Hashring(hashring) } - close := func() { - for _, closer := range closers { - closer() - } - } - return handlers, hashring, close + return handlers, hashring } func TestReceive(t *testing.T) { @@ -454,53 +469,54 @@ func TestReceive(t *testing.T) { }, }, } { - handlers, hashring, close := newHandlerHashring(tc.appendables, tc.replicationFactor) - defer close() - tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. - status, err := makeRequest(handler, tenant, tc.wreq) - if err != nil { - t.Fatalf("test case %q, handler %d: unexpectedly failed making HTTP request: %v", tc.name, tc.status, err) - } - if status != tc.status { - t.Errorf("test case %q, handler %d: got unexpected HTTP status code: expected %d, got %d", tc.name, i, tc.status, status) - } - } - // Test that each time series is stored - // the correct amount of times in each fake DB. - for _, ts := range tc.wreq.Timeseries { - lset := make(labels.Labels, len(ts.Labels)) - for j := range ts.Labels { - lset[j] = labels.Label{ - Name: ts.Labels[j].Name, - Value: ts.Labels[j].Value, + t.Run(tc.name, func(t *testing.T) { + handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + tenant := "test" + // Test from the point of view of every node + // so that we know status code does not depend + // on which node is erroring and which node is receiving. + for i, handler := range handlers { + // Test that the correct status is returned. + status, err := makeRequest(handler, tenant, tc.wreq) + if err != nil { + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) + } + if status != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d", i, tc.status, status) } } - for j, a := range tc.appendables { - var expected int - got := uint64(len(a.appender.(*fakeAppender).samples[lset.String()])) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expected = len(handlers) * len(ts.Samples) + // Test that each time series is stored + // the correct amount of times in each fake DB. + for _, ts := range tc.wreq.Timeseries { + lset := make(labels.Labels, len(ts.Labels)) + for j := range ts.Labels { + lset[j] = labels.Label{ + Name: ts.Labels[j].Name, + Value: ts.Labels[j].Value, + } } - if uint64(expected) != got { - t.Errorf("test case %q, labels %q: expected %d samples, got %d", tc.name, lset.String(), expected, got) + for j, a := range tc.appendables { + var expected int + n := a.appender.(*fakeAppender).samples[lset.String()] + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expected = len(handlers) * len(ts.Samples) + } + if uint64(expected) != got { + t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) + } } } - } + }) } } // endpointHit is a helper to determine if a given endpoint in a hashring would be selected // for a given time series, tenant, and replication factor. func endpointHit(t *testing.T, h Hashring, rf uint64, endpoint, tenant string, timeSeries *prompb.TimeSeries) bool { - var i uint64 - for i = 0; i < rf; i++ { + for i := uint64(0); i < rf; i++ { e, err := h.GetN(tenant, timeSeries, i) if err != nil { t.Fatalf("got unexpected error querying hashring: %v", err) @@ -539,9 +555,22 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (int, err return 0, errors.Wrap(err, "create request") } req.Header.Add(h.options.TenantHeader, tenant) - res, err := h.client.Do(req) - if err != nil { - return 0, errors.Wrap(err, "make request") - } - return res.StatusCode, nil + + rec := httptest.NewRecorder() + h.receiveHTTP(rec, req) + rec.Flush() + + return rec.Code, nil +} + +func randomAddr() string { + return fmt.Sprintf("http://%d.%d.%d.%d:%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(35000)+30000) +} + +type fakeRemoteWriteGRPCServer struct { + h storepb.WriteableStoreServer +} + +func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { + return f.h.RemoteWrite(ctx, in) } diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go index 21416b8fda..38b1a1a3ac 100644 --- a/pkg/server/grpc/grpc.go +++ b/pkg/server/grpc/grpc.go @@ -127,3 +127,16 @@ func (s *Server) Shutdown(err error) { cancel() } } + +// ReadWriteStoreServer is a StoreServer and a WriteableStoreServer. +type ReadWriteStoreServer interface { + storepb.StoreServer + storepb.WriteableStoreServer +} + +// NewReadWrite creates a new server that can be written to. +func NewReadWrite(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, storeSrv ReadWriteStoreServer, opts ...Option) *Server { + s := New(logger, reg, tracer, comp, storeSrv, opts...) + storepb.RegisterWriteableStoreServer(s.srv, storeSrv) + return s +} diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 77b5b5fa03..63f1ab4974 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -12,6 +12,7 @@ import ( _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + prompb "github.com/prometheus/prometheus/prompb" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -136,6 +137,81 @@ func (Aggr) EnumDescriptor() ([]byte, []int) { return fileDescriptor_77a6da22d6a3feb1, []int{2} } +type WriteResponse struct { +} + +func (m *WriteResponse) Reset() { *m = WriteResponse{} } +func (m *WriteResponse) String() string { return proto.CompactTextString(m) } +func (*WriteResponse) ProtoMessage() {} +func (*WriteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_77a6da22d6a3feb1, []int{0} +} +func (m *WriteResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *WriteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_WriteResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *WriteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteResponse.Merge(m, src) +} +func (m *WriteResponse) XXX_Size() int { + return m.Size() +} +func (m *WriteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_WriteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteResponse proto.InternalMessageInfo + +type WriteRequest struct { + Timeseries []prompb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` + Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` + Replica int64 `protobuf:"varint,3,opt,name=replica,proto3" json:"replica,omitempty"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_77a6da22d6a3feb1, []int{1} +} +func (m *WriteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *WriteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WriteRequest.Merge(m, src) +} +func (m *WriteRequest) XXX_Size() int { + return m.Size() +} +func (m *WriteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WriteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_WriteRequest proto.InternalMessageInfo + type InfoRequest struct { } @@ -143,7 +219,7 @@ func (m *InfoRequest) Reset() { *m = InfoRequest{} } func (m *InfoRequest) String() string { return proto.CompactTextString(m) } func (*InfoRequest) ProtoMessage() {} func (*InfoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{0} + return fileDescriptor_77a6da22d6a3feb1, []int{2} } func (m *InfoRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -186,7 +262,7 @@ func (m *InfoResponse) Reset() { *m = InfoResponse{} } func (m *InfoResponse) String() string { return proto.CompactTextString(m) } func (*InfoResponse) ProtoMessage() {} func (*InfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{1} + return fileDescriptor_77a6da22d6a3feb1, []int{3} } func (m *InfoResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -223,7 +299,7 @@ func (m *LabelSet) Reset() { *m = LabelSet{} } func (m *LabelSet) String() string { return proto.CompactTextString(m) } func (*LabelSet) ProtoMessage() {} func (*LabelSet) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{2} + return fileDescriptor_77a6da22d6a3feb1, []int{4} } func (m *LabelSet) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -270,7 +346,7 @@ func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } func (m *SeriesRequest) String() string { return proto.CompactTextString(m) } func (*SeriesRequest) ProtoMessage() {} func (*SeriesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{3} + return fileDescriptor_77a6da22d6a3feb1, []int{5} } func (m *SeriesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -310,7 +386,7 @@ func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } func (m *SeriesResponse) String() string { return proto.CompactTextString(m) } func (*SeriesResponse) ProtoMessage() {} func (*SeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{4} + return fileDescriptor_77a6da22d6a3feb1, []int{6} } func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -394,7 +470,7 @@ func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (m *LabelNamesRequest) String() string { return proto.CompactTextString(m) } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{5} + return fileDescriptor_77a6da22d6a3feb1, []int{7} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -432,7 +508,7 @@ func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (m *LabelNamesResponse) String() string { return proto.CompactTextString(m) } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{6} + return fileDescriptor_77a6da22d6a3feb1, []int{8} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -472,7 +548,7 @@ func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (m *LabelValuesRequest) String() string { return proto.CompactTextString(m) } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{7} + return fileDescriptor_77a6da22d6a3feb1, []int{9} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -510,7 +586,7 @@ func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (m *LabelValuesResponse) String() string { return proto.CompactTextString(m) } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_77a6da22d6a3feb1, []int{8} + return fileDescriptor_77a6da22d6a3feb1, []int{10} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -543,6 +619,8 @@ func init() { proto.RegisterEnum("thanos.StoreType", StoreType_name, StoreType_value) proto.RegisterEnum("thanos.PartialResponseStrategy", PartialResponseStrategy_name, PartialResponseStrategy_value) proto.RegisterEnum("thanos.Aggr", Aggr_name, Aggr_value) + proto.RegisterType((*WriteResponse)(nil), "thanos.WriteResponse") + proto.RegisterType((*WriteRequest)(nil), "thanos.WriteRequest") proto.RegisterType((*InfoRequest)(nil), "thanos.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.InfoResponse") proto.RegisterType((*LabelSet)(nil), "thanos.LabelSet") @@ -557,58 +635,66 @@ func init() { func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } var fileDescriptor_77a6da22d6a3feb1 = []byte{ - // 814 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0x4f, 0x6f, 0xe3, 0x44, - 0x14, 0xf7, 0xd8, 0x89, 0x13, 0xbf, 0x6c, 0x2b, 0xef, 0x34, 0xbb, 0xeb, 0x1a, 0x29, 0xad, 0x2c, - 0x21, 0x45, 0x05, 0x65, 0x21, 0x08, 0x10, 0xdc, 0x92, 0xac, 0x57, 0x1b, 0xb1, 0x4d, 0x60, 0x92, - 0x6c, 0xf8, 0x73, 0x08, 0x4e, 0x3b, 0xb8, 0xd6, 0x3a, 0xb6, 0xf1, 0x38, 0xb4, 0xbd, 0xf2, 0x09, - 0xb8, 0xf2, 0x1d, 0xf8, 0x16, 0x5c, 0x7a, 0xdc, 0x23, 0x5c, 0x10, 0xb4, 0x5f, 0x04, 0x79, 0x3c, - 0x4e, 0x62, 0x68, 0x2b, 0xad, 0x72, 0x9b, 0xf7, 0xfb, 0xbd, 0x79, 0x6f, 0xde, 0xef, 0xbd, 0x99, - 0x01, 0x2d, 0x8e, 0x4e, 0x5a, 0x51, 0x1c, 0x26, 0x21, 0x56, 0x93, 0x33, 0x27, 0x08, 0x99, 0x59, - 0x4b, 0x2e, 0x23, 0xca, 0x32, 0xd0, 0xac, 0xbb, 0xa1, 0x1b, 0xf2, 0xe5, 0xd3, 0x74, 0x95, 0xa1, - 0xd6, 0x0e, 0xd4, 0xfa, 0xc1, 0x0f, 0x21, 0xa1, 0x3f, 0x2e, 0x29, 0x4b, 0xac, 0x3f, 0x11, 0x3c, - 0xc8, 0x6c, 0x16, 0x85, 0x01, 0xa3, 0xf8, 0x3d, 0x50, 0x7d, 0x67, 0x4e, 0x7d, 0x66, 0xa0, 0x43, - 0xa5, 0x59, 0x6b, 0xef, 0xb4, 0xb2, 0xd8, 0xad, 0x97, 0x29, 0xda, 0x2d, 0x5d, 0xfd, 0x75, 0x20, - 0x11, 0xe1, 0x82, 0xf7, 0xa1, 0xba, 0xf0, 0x82, 0x59, 0xe2, 0x2d, 0xa8, 0x21, 0x1f, 0xa2, 0xa6, - 0x42, 0x2a, 0x0b, 0x2f, 0x18, 0x7b, 0x0b, 0xca, 0x29, 0xe7, 0x22, 0xa3, 0x14, 0x41, 0x39, 0x17, - 0x9c, 0x7a, 0x0a, 0x1a, 0x4b, 0xc2, 0x98, 0x8e, 0x2f, 0x23, 0x6a, 0x94, 0x0e, 0x51, 0x73, 0xb7, - 0xfd, 0x30, 0xcf, 0x32, 0xca, 0x09, 0xb2, 0xf6, 0xc1, 0x1f, 0x03, 0xf0, 0x84, 0x33, 0x46, 0x13, - 0x66, 0x94, 0xf9, 0xb9, 0xf4, 0xc2, 0xb9, 0x46, 0x34, 0x11, 0x47, 0xd3, 0x7c, 0x61, 0x33, 0xeb, - 0x53, 0xa8, 0xe6, 0xe4, 0x5b, 0x95, 0x65, 0xfd, 0xaa, 0xc0, 0xce, 0x88, 0xc6, 0x1e, 0x65, 0x42, - 0xa6, 0x42, 0xa1, 0xe8, 0xee, 0x42, 0xe5, 0x62, 0xa1, 0x9f, 0xa4, 0x54, 0x72, 0x72, 0x46, 0x63, - 0x66, 0x28, 0x3c, 0x6d, 0xbd, 0x90, 0xf6, 0x38, 0x23, 0x45, 0xf6, 0x95, 0x2f, 0x6e, 0xc3, 0xa3, - 0x34, 0x64, 0x4c, 0x59, 0xe8, 0x2f, 0x13, 0x2f, 0x0c, 0x66, 0xe7, 0x5e, 0x70, 0x1a, 0x9e, 0x73, - 0xb1, 0x14, 0xb2, 0xb7, 0x70, 0x2e, 0xc8, 0x8a, 0x9b, 0x72, 0x0a, 0xbf, 0x0f, 0xe0, 0xb8, 0x6e, - 0x4c, 0x5d, 0x27, 0xa1, 0x99, 0x46, 0xbb, 0xed, 0x07, 0x79, 0xb6, 0x8e, 0xeb, 0xc6, 0x64, 0x83, - 0xc7, 0x9f, 0xc3, 0x7e, 0xe4, 0xc4, 0x89, 0xe7, 0xf8, 0x69, 0x16, 0xde, 0xf9, 0xd9, 0xa9, 0xc7, - 0x9c, 0xb9, 0x4f, 0x4f, 0x0d, 0xf5, 0x10, 0x35, 0xab, 0xe4, 0x89, 0x70, 0xc8, 0x27, 0xe3, 0x99, - 0xa0, 0xf1, 0x77, 0xb7, 0xec, 0x65, 0x49, 0xec, 0x24, 0xd4, 0xbd, 0x34, 0x2a, 0xbc, 0x9d, 0x07, - 0x79, 0xe2, 0x2f, 0x8b, 0x31, 0x46, 0xc2, 0xed, 0x7f, 0xc1, 0x73, 0x02, 0x1f, 0x40, 0x8d, 0xbd, - 0xf6, 0xa2, 0xd9, 0xc9, 0xd9, 0x32, 0x78, 0xcd, 0x8c, 0x2a, 0x3f, 0x0a, 0xa4, 0x50, 0x8f, 0x23, - 0xd6, 0xf7, 0xb0, 0x9b, 0xb7, 0x46, 0x4c, 0x6c, 0x13, 0x54, 0xc6, 0x11, 0xde, 0x99, 0x5a, 0x7b, - 0x77, 0x35, 0x4b, 0x1c, 0x7d, 0x21, 0x11, 0xc1, 0x63, 0x13, 0x2a, 0xe7, 0x4e, 0x1c, 0x78, 0x81, - 0xcb, 0x3b, 0xa5, 0xbd, 0x90, 0x48, 0x0e, 0x74, 0xab, 0xa0, 0xc6, 0x94, 0x2d, 0xfd, 0xc4, 0xfa, - 0x0d, 0xc1, 0x43, 0xde, 0x9e, 0x81, 0xb3, 0x58, 0x4f, 0xc0, 0xbd, 0x8a, 0xa1, 0x2d, 0x14, 0x93, - 0xb7, 0x53, 0xcc, 0x7a, 0x0e, 0x78, 0xf3, 0xb4, 0x42, 0x94, 0x3a, 0x94, 0x83, 0x14, 0xe0, 0xe3, - 0xae, 0x91, 0xcc, 0xc0, 0x26, 0x54, 0x45, 0xbd, 0xcc, 0x90, 0x39, 0xb1, 0xb2, 0xad, 0xdf, 0x91, - 0x08, 0xf4, 0xca, 0xf1, 0x97, 0xeb, 0xba, 0xeb, 0x50, 0xe6, 0xb7, 0x82, 0xd7, 0xa8, 0x91, 0xcc, - 0xb8, 0x5f, 0x0d, 0x79, 0x0b, 0x35, 0x94, 0x2d, 0xd5, 0xe8, 0xc3, 0x5e, 0xa1, 0x08, 0x21, 0xc7, - 0x63, 0x50, 0x7f, 0xe2, 0x88, 0xd0, 0x43, 0x58, 0xf7, 0x09, 0x72, 0x44, 0x40, 0x5b, 0xbd, 0x46, - 0xb8, 0x06, 0x95, 0xc9, 0xe0, 0x8b, 0xc1, 0x70, 0x3a, 0xd0, 0x25, 0xac, 0x41, 0xf9, 0xab, 0x89, - 0x4d, 0xbe, 0xd1, 0x11, 0xae, 0x42, 0x89, 0x4c, 0x5e, 0xda, 0xba, 0x9c, 0x7a, 0x8c, 0xfa, 0xcf, - 0xec, 0x5e, 0x87, 0xe8, 0x4a, 0xea, 0x31, 0x1a, 0x0f, 0x89, 0xad, 0x97, 0x52, 0x9c, 0xd8, 0x3d, - 0xbb, 0xff, 0xca, 0xd6, 0xcb, 0x47, 0x2d, 0x78, 0x72, 0x47, 0x49, 0x69, 0xa4, 0x69, 0x87, 0x88, - 0xf0, 0x9d, 0xee, 0x90, 0x8c, 0x75, 0x74, 0xd4, 0x85, 0x52, 0x7a, 0x77, 0x71, 0x05, 0x14, 0xd2, - 0x99, 0x66, 0x5c, 0x6f, 0x38, 0x19, 0x8c, 0x75, 0x94, 0x62, 0xa3, 0xc9, 0xb1, 0x2e, 0xa7, 0x8b, - 0xe3, 0xfe, 0x40, 0x57, 0xf8, 0xa2, 0xf3, 0x75, 0x96, 0x93, 0x7b, 0xd9, 0x44, 0x2f, 0xb7, 0x7f, - 0x96, 0xa1, 0xcc, 0x0b, 0xc1, 0x1f, 0x42, 0x29, 0x7d, 0xeb, 0xf1, 0x5e, 0x2e, 0xef, 0xc6, 0x4f, - 0x60, 0xd6, 0x8b, 0xa0, 0x10, 0xee, 0x33, 0x50, 0xb3, 0x6b, 0x84, 0x1f, 0x15, 0xaf, 0x55, 0xbe, - 0xed, 0xf1, 0x7f, 0xe1, 0x6c, 0xe3, 0x07, 0x08, 0xf7, 0x00, 0xd6, 0x83, 0x89, 0xf7, 0x0b, 0x2f, - 0xdf, 0xe6, 0xd5, 0x32, 0xcd, 0xdb, 0x28, 0x91, 0xff, 0x39, 0xd4, 0x36, 0xfa, 0x89, 0x8b, 0xae, - 0x85, 0x49, 0x35, 0xdf, 0xb9, 0x95, 0xcb, 0xe2, 0x74, 0xdf, 0xbd, 0xfa, 0xa7, 0x21, 0x5d, 0x5d, - 0x37, 0xd0, 0x9b, 0xeb, 0x06, 0xfa, 0xfb, 0xba, 0x81, 0x7e, 0xb9, 0x69, 0x48, 0x6f, 0x6e, 0x1a, - 0xd2, 0x1f, 0x37, 0x0d, 0xe9, 0xdb, 0x0a, 0xff, 0x6b, 0xa2, 0xf9, 0x5c, 0xe5, 0x9f, 0xe4, 0x47, - 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xa5, 0x34, 0x17, 0xba, 0x5c, 0x07, 0x00, 0x00, + // 934 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0xcb, 0x6e, 0x23, 0x45, + 0x14, 0x75, 0xbb, 0xfd, 0xbc, 0x4e, 0x4c, 0x4f, 0xc5, 0xc9, 0x74, 0x8c, 0xe4, 0x58, 0x2d, 0x21, + 0x59, 0x01, 0x39, 0xe0, 0x11, 0x20, 0xd0, 0x6c, 0x6c, 0x8f, 0x47, 0x63, 0x31, 0x71, 0xa0, 0x6c, + 0x8f, 0x79, 0x2c, 0x4c, 0xdb, 0x29, 0xda, 0xad, 0x71, 0x3f, 0xe8, 0x2a, 0x93, 0x64, 0xc3, 0x82, + 0x2f, 0x60, 0xcb, 0x3f, 0xf0, 0x17, 0x6c, 0xb2, 0x9c, 0x25, 0x6c, 0x10, 0x24, 0x3f, 0x82, 0xaa, + 0xba, 0xda, 0xee, 0x0e, 0x49, 0xa4, 0x51, 0x76, 0x75, 0xcf, 0xb9, 0xbe, 0x8f, 0x53, 0xf7, 0x76, + 0x19, 0x8a, 0x81, 0x3f, 0x6f, 0xfa, 0x81, 0xc7, 0x3c, 0x94, 0x63, 0x0b, 0xd3, 0xf5, 0x68, 0xb5, + 0xc4, 0x2e, 0x7c, 0x42, 0x43, 0xb0, 0x5a, 0xb1, 0x3c, 0xcb, 0x13, 0xc7, 0x23, 0x7e, 0x92, 0xe8, + 0x13, 0xcb, 0x66, 0x8b, 0xd5, 0xac, 0x39, 0xf7, 0x9c, 0x23, 0x3f, 0xf0, 0x1c, 0xc2, 0x16, 0x64, + 0x45, 0x6f, 0x1e, 0xfd, 0xd9, 0x51, 0x2c, 0x94, 0xf1, 0x0e, 0x6c, 0x4f, 0x02, 0x9b, 0x11, 0x4c, + 0xa8, 0xef, 0xb9, 0x94, 0x18, 0x3f, 0xc3, 0x96, 0x04, 0x7e, 0x5c, 0x11, 0xca, 0xd0, 0x53, 0x00, + 0x66, 0x3b, 0x84, 0x92, 0xc0, 0x26, 0x54, 0x57, 0xea, 0x6a, 0xa3, 0xd4, 0xda, 0x6b, 0x6e, 0x82, + 0x36, 0x47, 0xb6, 0x43, 0x86, 0x82, 0xed, 0x64, 0x2e, 0xff, 0x3e, 0x48, 0xe1, 0x98, 0x3f, 0xda, + 0x83, 0x1c, 0x23, 0xae, 0xe9, 0x32, 0x3d, 0x5d, 0x57, 0x1a, 0x45, 0x2c, 0x2d, 0xa4, 0x43, 0x3e, + 0x20, 0xfe, 0xd2, 0x9e, 0x9b, 0xba, 0x5a, 0x57, 0x1a, 0x2a, 0x8e, 0x4c, 0x63, 0x1b, 0x4a, 0x7d, + 0xf7, 0x07, 0x4f, 0xa6, 0x37, 0xfe, 0x52, 0x60, 0x2b, 0xb4, 0xc3, 0xfa, 0xd0, 0xfb, 0x90, 0x5b, + 0x9a, 0x33, 0xb2, 0x8c, 0x6a, 0xd9, 0x6e, 0x86, 0x0a, 0x35, 0x5f, 0x72, 0x54, 0x96, 0x20, 0x5d, + 0xd0, 0x3e, 0x14, 0x1c, 0xdb, 0x9d, 0xf2, 0x82, 0x44, 0x01, 0x2a, 0xce, 0x3b, 0xb6, 0xcb, 0x2b, + 0x16, 0x94, 0x79, 0x1e, 0x52, 0xb2, 0x04, 0xc7, 0x3c, 0x17, 0xd4, 0x11, 0x14, 0x29, 0xf3, 0x02, + 0x32, 0xba, 0xf0, 0x89, 0x9e, 0xa9, 0x2b, 0x8d, 0x72, 0xeb, 0x51, 0x94, 0x65, 0x18, 0x11, 0x78, + 0xe3, 0x83, 0x3e, 0x06, 0x10, 0x09, 0xa7, 0x94, 0x30, 0xaa, 0x67, 0x45, 0x5d, 0x5a, 0xa2, 0xae, + 0x21, 0x61, 0xb2, 0xb4, 0xe2, 0x52, 0xda, 0xd4, 0xf8, 0x14, 0x0a, 0x11, 0xf9, 0x56, 0x6d, 0x19, + 0xbf, 0xa9, 0xb0, 0x1d, 0x4a, 0x1e, 0xdd, 0x52, 0xbc, 0x51, 0xe5, 0xee, 0x46, 0xd3, 0xc9, 0x46, + 0x3f, 0xe1, 0x14, 0x9b, 0x2f, 0x48, 0x40, 0x75, 0x55, 0xa4, 0xad, 0x24, 0xd2, 0x1e, 0x87, 0xa4, + 0xcc, 0xbe, 0xf6, 0x45, 0x2d, 0xd8, 0xe5, 0x21, 0x03, 0x42, 0xbd, 0xe5, 0x8a, 0xd9, 0x9e, 0x3b, + 0x3d, 0xb3, 0xdd, 0x53, 0xef, 0x4c, 0x88, 0xa5, 0xe2, 0x1d, 0xc7, 0x3c, 0xc7, 0x6b, 0x6e, 0x22, + 0x28, 0xf4, 0x01, 0x80, 0x69, 0x59, 0x01, 0xb1, 0x4c, 0x46, 0x42, 0x8d, 0xca, 0xad, 0xad, 0x28, + 0x5b, 0xdb, 0xb2, 0x02, 0x1c, 0xe3, 0xd1, 0xe7, 0xb0, 0xef, 0x9b, 0x01, 0xb3, 0xcd, 0x25, 0xcf, + 0x22, 0x6e, 0x7e, 0x7a, 0x6a, 0x53, 0x73, 0xb6, 0x24, 0xa7, 0x7a, 0xae, 0xae, 0x34, 0x0a, 0xf8, + 0xb1, 0x74, 0x88, 0x26, 0xe3, 0x99, 0xa4, 0xd1, 0x77, 0xb7, 0xfc, 0x96, 0xb2, 0xc0, 0x64, 0xc4, + 0xba, 0xd0, 0xf3, 0xe2, 0x3a, 0x0f, 0xa2, 0xc4, 0x5f, 0x26, 0x63, 0x0c, 0xa5, 0xdb, 0xff, 0x82, + 0x47, 0x04, 0x3a, 0x80, 0x12, 0x7d, 0x6d, 0xfb, 0xd3, 0xf9, 0x62, 0xe5, 0xbe, 0xa6, 0x7a, 0x41, + 0x94, 0x02, 0x1c, 0xea, 0x0a, 0xc4, 0xf8, 0x1e, 0xca, 0xd1, 0xd5, 0xc8, 0x89, 0x6d, 0x40, 0x6e, + 0xbd, 0x3d, 0x4a, 0xa3, 0xd4, 0x2a, 0xaf, 0x67, 0x49, 0xa0, 0x2f, 0x52, 0x58, 0xf2, 0xa8, 0x0a, + 0xf9, 0x33, 0x33, 0x70, 0x6d, 0xd7, 0x0a, 0xd7, 0xe5, 0x45, 0x0a, 0x47, 0x40, 0xa7, 0x00, 0xb9, + 0x80, 0xd0, 0xd5, 0x92, 0x19, 0xbf, 0x2b, 0xf0, 0x48, 0x5c, 0xcf, 0xc0, 0x74, 0x36, 0x13, 0x70, + 0xaf, 0x62, 0xca, 0x03, 0x14, 0x4b, 0x3f, 0x4c, 0x31, 0xe3, 0x39, 0xa0, 0x78, 0xb5, 0x52, 0x94, + 0x0a, 0x64, 0x5d, 0x0e, 0x88, 0x71, 0x2f, 0xe2, 0xd0, 0x40, 0x55, 0x28, 0xc8, 0x7e, 0xa9, 0x9e, + 0x16, 0xc4, 0xda, 0x36, 0xfe, 0x50, 0x64, 0xa0, 0x57, 0xe6, 0x72, 0xb5, 0xe9, 0xbb, 0x02, 0x59, + 0xb1, 0x15, 0xa2, 0xc7, 0x22, 0x0e, 0x8d, 0xfb, 0xd5, 0x48, 0x3f, 0x40, 0x0d, 0xf5, 0x81, 0x6a, + 0xf4, 0x61, 0x27, 0xd1, 0x84, 0x94, 0x63, 0x0f, 0x72, 0x3f, 0x09, 0x44, 0xea, 0x21, 0xad, 0xfb, + 0x04, 0x39, 0xc4, 0x50, 0x5c, 0x7f, 0x8d, 0x50, 0x09, 0xf2, 0xe3, 0xc1, 0x17, 0x83, 0x93, 0xc9, + 0x40, 0x4b, 0xa1, 0x22, 0x64, 0xbf, 0x1a, 0xf7, 0xf0, 0x37, 0x9a, 0x82, 0x0a, 0x90, 0xc1, 0xe3, + 0x97, 0x3d, 0x2d, 0xcd, 0x3d, 0x86, 0xfd, 0x67, 0xbd, 0x6e, 0x1b, 0x6b, 0x2a, 0xf7, 0x18, 0x8e, + 0x4e, 0x70, 0x4f, 0xcb, 0x70, 0x1c, 0xf7, 0xba, 0xbd, 0xfe, 0xab, 0x9e, 0x96, 0x3d, 0x6c, 0xc2, + 0xe3, 0x3b, 0x5a, 0xe2, 0x91, 0x26, 0x6d, 0x2c, 0xc3, 0xb7, 0x3b, 0x27, 0x78, 0xa4, 0x29, 0x87, + 0x1d, 0xc8, 0xf0, 0xdd, 0x45, 0x79, 0x50, 0x71, 0x7b, 0x12, 0x72, 0xdd, 0x93, 0xf1, 0x60, 0xa4, + 0x29, 0x1c, 0x1b, 0x8e, 0x8f, 0xb5, 0x34, 0x3f, 0x1c, 0xf7, 0x07, 0x9a, 0x2a, 0x0e, 0xed, 0xaf, + 0xc3, 0x9c, 0xc2, 0xab, 0x87, 0xb5, 0x6c, 0xeb, 0x97, 0x34, 0x64, 0x45, 0x23, 0xe8, 0x23, 0xc8, + 0xf0, 0x6f, 0x3d, 0xda, 0x89, 0xe4, 0x8d, 0xbd, 0x04, 0xd5, 0x4a, 0x12, 0x94, 0xc2, 0x7d, 0x06, + 0xb9, 0x70, 0x8d, 0xd0, 0x6e, 0x72, 0xad, 0xa2, 0x9f, 0xed, 0xdd, 0x84, 0xc3, 0x1f, 0x7e, 0xa8, + 0xa0, 0x2e, 0xc0, 0x66, 0x30, 0xd1, 0x7e, 0xe2, 0xcb, 0x17, 0x5f, 0xad, 0x6a, 0xf5, 0x36, 0x4a, + 0xe6, 0x7f, 0x0e, 0xa5, 0xd8, 0x7d, 0xa2, 0xa4, 0x6b, 0x62, 0x52, 0xab, 0xef, 0xde, 0xca, 0x85, + 0x71, 0x5a, 0x03, 0x28, 0x8b, 0x67, 0x97, 0x8f, 0x60, 0x28, 0xc6, 0x53, 0x28, 0x61, 0xe2, 0x78, + 0x8c, 0x08, 0x1c, 0xad, 0xdb, 0x8f, 0xbf, 0xce, 0xd5, 0xdd, 0x1b, 0xa8, 0x7c, 0xc4, 0x53, 0x9d, + 0xf7, 0x2e, 0xff, 0xad, 0xa5, 0x2e, 0xaf, 0x6a, 0xca, 0x9b, 0xab, 0x9a, 0xf2, 0xcf, 0x55, 0x4d, + 0xf9, 0xf5, 0xba, 0x96, 0x7a, 0x73, 0x5d, 0x4b, 0xfd, 0x79, 0x5d, 0x4b, 0x7d, 0x9b, 0x17, 0x6f, + 0x97, 0x3f, 0x9b, 0xe5, 0xc4, 0xbf, 0x80, 0x27, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x67, 0x56, + 0x73, 0x8e, 0x72, 0x08, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -849,6 +935,152 @@ var _Store_serviceDesc = grpc.ServiceDesc{ Metadata: "rpc.proto", } +// WriteableStoreClient is the client API for WriteableStore service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type WriteableStoreClient interface { + // WriteRequest allows you to write metrics to this store via remote write + RemoteWrite(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) +} + +type writeableStoreClient struct { + cc *grpc.ClientConn +} + +func NewWriteableStoreClient(cc *grpc.ClientConn) WriteableStoreClient { + return &writeableStoreClient{cc} +} + +func (c *writeableStoreClient) RemoteWrite(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) { + out := new(WriteResponse) + err := c.cc.Invoke(ctx, "/thanos.WriteableStore/RemoteWrite", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// WriteableStoreServer is the server API for WriteableStore service. +type WriteableStoreServer interface { + // WriteRequest allows you to write metrics to this store via remote write + RemoteWrite(context.Context, *WriteRequest) (*WriteResponse, error) +} + +// UnimplementedWriteableStoreServer can be embedded to have forward compatible implementations. +type UnimplementedWriteableStoreServer struct { +} + +func (*UnimplementedWriteableStoreServer) RemoteWrite(ctx context.Context, req *WriteRequest) (*WriteResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RemoteWrite not implemented") +} + +func RegisterWriteableStoreServer(s *grpc.Server, srv WriteableStoreServer) { + s.RegisterService(&_WriteableStore_serviceDesc, srv) +} + +func _WriteableStore_RemoteWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WriteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WriteableStoreServer).RemoteWrite(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/thanos.WriteableStore/RemoteWrite", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WriteableStoreServer).RemoteWrite(ctx, req.(*WriteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _WriteableStore_serviceDesc = grpc.ServiceDesc{ + ServiceName: "thanos.WriteableStore", + HandlerType: (*WriteableStoreServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RemoteWrite", + Handler: _WriteableStore_RemoteWrite_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "rpc.proto", +} + +func (m *WriteResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WriteResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *WriteResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *WriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Replica != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.Replica)) + i-- + dAtA[i] = 0x18 + } + if len(m.Tenant) > 0 { + i -= len(m.Tenant) + copy(dAtA[i:], m.Tenant) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Tenant))) + i-- + dAtA[i] = 0x12 + } + if len(m.Timeseries) > 0 { + for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Timeseries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *InfoRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1313,6 +1545,37 @@ func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } +func (m *WriteResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *WriteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRpc(uint64(l)) + } + } + l = len(m.Tenant) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + if m.Replica != 0 { + n += 1 + sovRpc(uint64(m.Replica)) + } + return n +} + func (m *InfoRequest) Size() (n int) { if m == nil { return 0 @@ -1523,6 +1786,197 @@ func sovRpc(x uint64) (n int) { func sozRpc(x uint64) (n int) { return sovRpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *WriteResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *WriteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, prompb.TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Tenant = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Replica", wireType) + } + m.Replica = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Replica |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *InfoRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 219262f207..ebd3bb20c7 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -3,6 +3,7 @@ package thanos; import "types.proto"; import "gogoproto/gogo.proto"; +import "github.com/prometheus/prometheus/prompb/types.proto"; option go_package = "storepb"; @@ -39,6 +40,21 @@ service Store { rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse); } +/// WriteableStore reprents API against instance that stores XOR encoded values with label set metadata (e.g Prometheus metrics). +service WriteableStore { + // WriteRequest allows you to write metrics to this store via remote write + rpc RemoteWrite(WriteRequest) returns (WriteResponse) {} +} + +message WriteResponse { +} + +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; + string tenant = 2; + int64 replica = 3; +} + message InfoRequest { } @@ -108,11 +124,11 @@ enum Aggr { message SeriesResponse { oneof result { - Series series = 1; + Series series = 1; - /// warning is considered an information piece in place of series for warning purposes. - /// It is used to warn query customer about suspicious cases or partial response (if enabled). - string warning = 2; + /// warning is considered an information piece in place of series for warning purposes. + /// It is used to warn query customer about suspicious cases or partial response (if enabled). + string warning = 2; } } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 40de141811..3ea87412a6 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -28,6 +28,12 @@ type TSDBStore struct { externalLabels labels.Labels } +// ReadWriteTSDBStore is a TSDBStore that can also be written to. +type ReadWriteTSDBStore struct { + storepb.StoreServer + storepb.WriteableStoreServer +} + // NewTSDBStore creates a new TSDBStore. func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore { if logger == nil { diff --git a/scripts/genproto.sh b/scripts/genproto.sh index f0e029f766..2df48ba70f 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -30,8 +30,9 @@ echo "generating code" for dir in ${DIRS}; do pushd ${dir} ${PROTOC_BIN} --gogofast_out=plugins=grpc:. -I=. \ - -I="${GOGOPROTO_PATH}" \ - *.proto + -I="${GOGOPROTO_PATH}" \ + -I="../../../vendor" \ + *.proto sed -i.bak -E 's/import _ \"gogoproto\"//g' *.pb.go sed -i.bak -E 's/import _ \"google\/protobuf\"//g' *.pb.go diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index ae700711b5..be2e1118ef 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -33,15 +33,15 @@ func TestReceive(t *testing.T) { // receiver in the hashring than the one handling the request. // The querier queries all the receivers and the test verifies // the time series are forwarded to the correct receive node. - receiveHTTP1, receiveHTTP2, receiveHTTP3 := a.New(), a.New(), a.New() + receiveGRPC1, receiveGRPC2, receiveGRPC3 := a.New(), a.New(), a.New() h := receive.HashringConfig{ - Endpoints: []string{remoteWriteEndpoint(receiveHTTP1), remoteWriteEndpoint(receiveHTTP2), remoteWriteEndpoint(receiveHTTP3)}, + Endpoints: []string{receiveGRPC1.HostPort(), receiveGRPC2.HostPort(), receiveGRPC3.HostPort()}, } - r1 := receiver(receiveHTTP1, a.New(), a.New(), 1, h) - r2 := receiver(receiveHTTP2, a.New(), a.New(), 1, h) - r3 := receiver(receiveHTTP3, a.New(), a.New(), 1, h) + r1 := receiver(a.New(), receiveGRPC1, a.New(), 1, h) + r2 := receiver(a.New(), receiveGRPC2, a.New(), 1, h) + r3 := receiver(a.New(), receiveGRPC3, a.New(), 1, h) prom1 := prometheus(a.New(), defaultPromConfig("prom1", 1, remoteWriteEndpoint(r1.HTTP))) prom2 := prometheus(a.New(), defaultPromConfig("prom2", 1, remoteWriteEndpoint(r2.HTTP))) @@ -79,15 +79,15 @@ func TestReceive(t *testing.T) { // receives Prometheus remote-written data. The querier queries all // receivers and the test verifies that the time series are // replicated to all of the nodes. - receiveHTTP1, receiveHTTP2, receiveHTTP3 := a.New(), a.New(), a.New() + receiveGRPC1, receiveGRPC2, receiveGRPC3 := a.New(), a.New(), a.New() h := receive.HashringConfig{ - Endpoints: []string{remoteWriteEndpoint(receiveHTTP1), remoteWriteEndpoint(receiveHTTP2), remoteWriteEndpoint(receiveHTTP3)}, + Endpoints: []string{receiveGRPC1.HostPort(), receiveGRPC2.HostPort(), receiveGRPC3.HostPort()}, } - r1 := receiver(receiveHTTP1, a.New(), a.New(), 3, h) - r2 := receiver(receiveHTTP2, a.New(), a.New(), 3, h) - r3 := receiver(receiveHTTP3, a.New(), a.New(), 3, h) + r1 := receiver(a.New(), receiveGRPC1, a.New(), 3, h) + r2 := receiver(a.New(), receiveGRPC2, a.New(), 3, h) + r3 := receiver(a.New(), receiveGRPC3, a.New(), 3, h) prom1 := prometheus(a.New(), defaultPromConfig("prom1", 1, remoteWriteEndpoint(r1.HTTP))) @@ -122,14 +122,14 @@ func TestReceive(t *testing.T) { // The replication suite creates a three-node hashring but one of the // receivers is dead. In this case, replication should still // succeed and the time series should be replicated to the other nodes. - receiveHTTP1, receiveHTTP2, receiveHTTP3 := a.New(), a.New(), a.New() + receiveGRPC1, receiveGRPC2, receiveGRPC3 := a.New(), a.New(), a.New() h := receive.HashringConfig{ - Endpoints: []string{remoteWriteEndpoint(receiveHTTP1), remoteWriteEndpoint(receiveHTTP2), remoteWriteEndpoint(receiveHTTP3)}, + Endpoints: []string{receiveGRPC1.HostPort(), receiveGRPC2.HostPort(), receiveGRPC3.HostPort()}, } - r1 := receiver(receiveHTTP1, a.New(), a.New(), 3, h) - r2 := receiver(receiveHTTP2, a.New(), a.New(), 3, h) + r1 := receiver(a.New(), receiveGRPC1, a.New(), 3, h) + r2 := receiver(a.New(), receiveGRPC2, a.New(), 3, h) prom1 := prometheus(a.New(), defaultPromConfig("prom1", 1, remoteWriteEndpoint(r1.HTTP))) diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 54b34597ce..d51a51a60a 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -167,7 +167,7 @@ func sidecar(http, grpc address, prom *prometheusScheduler) *serverScheduler { func receiver(http, grpc, metric address, replicationFactor int, hashring ...receive.HashringConfig) *serverScheduler { if len(hashring) == 0 { - hashring = []receive.HashringConfig{{Endpoints: []string{remoteWriteEndpoint(http)}}} + hashring = []receive.HashringConfig{{Endpoints: []string{grpc.HostPort()}}} } return &serverScheduler{ @@ -198,7 +198,7 @@ func receiver(http, grpc, metric address, replicationFactor int, hashring ...rec "--tsdb.path", filepath.Join(receiveDir, "tsdb"), "--log.level", "debug", "--receive.replication-factor", strconv.Itoa(replicationFactor), - "--receive.local-endpoint", remoteWriteEndpoint(http), + "--receive.local-endpoint", grpc.HostPort(), "--receive.hashrings-file", filepath.Join(receiveDir, "hashrings.json"), "--receive.hashrings-file-refresh-interval", "5s")), nil },