From 29870d81b7cbd8a524ddeac8c4ade7985c7e5f00 Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Fri, 14 May 2021 17:50:36 +0530 Subject: [PATCH 1/8] used a simple disableTSDB flag to disable ingestion Signed-off-by: Yash Sharma remove flag for ingestion mode, and decide internally Signed-off-by: Yash Sharma moved grpc setup for both ingestion and distribution Signed-off-by: Yash Sharma added an e2e test Signed-off-by: Yash Sharma move dbs and writer outside scope Signed-off-by: Yash Sharma test remove stray at /handler 467 Signed-off-by: Yash Sharma fixed a duplicate code in receiver.go Signed-off-by: Yash Sharma removed stray line in handler Signed-off-by: Yash Sharma added a e2e test for ingestion and distributor scenario Signed-off-by: Yash Sharma uncommented other tests Signed-off-by: Yash Sharma restored receiver to main branch implementation Signed-off-by: Yash Sharma break down multiple logical parts of receiver into functions Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 454 +++++++++++++++++++-------------- test/e2e/e2ethanos/services.go | 45 ++++ test/e2e/receive_test.go | 71 ++++++ 3 files changed, 376 insertions(+), 194 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 818b0ca1a9..40b29a5767 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -214,235 +214,302 @@ func runReceive( level.Debug(logger).Log("msg", "setting up tsdb") { - log.With(logger, "component", "storage") - dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_receive_multi_db_updates_attempted_total", - Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", - }) - dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_receive_multi_db_updates_completed_total", - Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", - }) + if err := startTSDB(g, logger, reg, dbs, dbReady, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { + return err + } + } - level.Debug(logger).Log("msg", "removing storage lock files if any") - if err := dbs.RemoveLockFilesIfAny(); err != nil { - return errors.Wrap(err, "remove storage lock files") + level.Debug(logger).Log("msg", "setting up hashring") + { + if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber); err != nil { + return err } + } - // TSDBs reload logic, listening on hashring changes. - cancel := make(chan struct{}) + level.Debug(logger).Log("msg", "setting up http server") + { + srv := httpserver.New(logger, reg, comp, httpProbe, + httpserver.WithListen(*conf.httpBindAddr), + httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), + httpserver.WithTLSConfig(*conf.httpTLSConfig), + ) g.Add(func() error { - defer close(dbReady) - defer close(uploadC) - - // Before quitting, ensure the WAL is flushed and the DBs are closed. - defer func() { - level.Info(logger).Log("msg", "shutting down storage") - if err := dbs.Flush(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to flush storage") - } else { - level.Info(logger).Log("msg", "storage is flushed successfully") - } - if err := dbs.Close(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to close storage") - return - } - level.Info(logger).Log("msg", "storage is closed") - }() + statusProber.Healthy() - for { - select { - case <-cancel: - return nil - case _, ok := <-hashringChangedChan: - if !ok { - return nil - } - dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating storage") - - if err := dbs.Flush(); err != nil { - return errors.Wrap(err, "flushing storage") - } - if err := dbs.Open(); err != nil { - return errors.Wrap(err, "opening storage") - } - if upload { - uploadC <- struct{}{} - <-uploadDone - } - statusProber.Ready() - level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") - dbUpdatesCompleted.Inc() - dbReady <- struct{}{} - } - } + return srv.ListenAndServe() }, func(err error) { - close(cancel) + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + srv.Shutdown(err) }) } - level.Debug(logger).Log("msg", "setting up hashring") + level.Debug(logger).Log("msg", "setting up grpc server") { - // Note: the hashring configuration watcher - // is the sender and thus closes the chan. - // In the single-node case, which has no configuration - // watcher, we close the chan ourselves. - updates := make(chan receive.Hashring, 1) - - // The Hashrings config file path is given initializing config watcher. - if conf.hashringsFilePath != "" { - cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) - if err != nil { - return errors.Wrap(err, "failed to initialize config watcher") + var s *grpcserver.Server + startGRPC := make(chan struct{}) + + if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPC, dbReady, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { + return err + } + } + + level.Debug(logger).Log("msg", "setting up receive http handler") + { + g.Add( + func() error { + return errors.Wrap(webHandler.Run(), "error starting web server") + }, + func(err error) { + webHandler.Close() + }, + ) + } + + level.Info(logger).Log("msg", "starting receiver") + return nil +} + +func setupGRPCServer(g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + tracer opentracing.Tracer, + conf *receiveConfig, s *grpcserver.Server, + startGRPC chan struct{}, + dbReady chan struct{}, + comp component.SourceStoreAPI, + dbs *receive.MultiTSDB, + webHandler *receive.Handler, + grpcLogOpts []grpc_logging.Option, + tagOpts []tags.Option, + grpcProbe *prober.GRPCProbe, + +) error { + g.Add(func() error { + defer close(startGRPC) + + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + + for range dbReady { + if s != nil { + s.Shutdown(errors.New("reload hashrings")) } - // Check the hashring configuration on before running the watcher. - if err := cw.ValidateConfig(); err != nil { - cw.Stop() - close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") + rw := store.ReadWriteTSDBStore{ + StoreServer: store.NewMultiTSDBStore( + logger, + reg, + comp, + dbs.TSDBStores, + ), + WriteableStoreServer: webHandler, } - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - level.Info(logger).Log("msg", "the hashring initialized with config watcher.") - return receive.HashringFromConfigWatcher(ctx, updates, cw) - }, func(error) { - cancel() - }) - } else { - var ring receive.Hashring - // The Hashrings config file content given initialize configuration from content. - if len(conf.hashringsFileContent) > 0 { - ring, err = receive.HashringFromConfig(conf.hashringsFileContent) - if err != nil { - close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") - } - level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") - } else { - level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") - ring = receive.SingleNodeHashring(conf.endpoint) + s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(rw)), + grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), + grpcserver.WithListen(*conf.grpcBindAddr), + grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), + grpcserver.WithTLSConfig(tlsCfg), + ) + startGRPC <- struct{}{} + } + if s != nil { + s.Shutdown(err) + } + return nil + }, func(error) {}) + // We need to be able to start and stop the gRPC server + // whenever the DB changes, thus it needs its own run group. + g.Add(func() error { + for range startGRPC { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) + if err := s.ListenAndServe(); err != nil { + return errors.Wrap(err, "serve gRPC") } + } + return nil + }, func(error) {}) - cancel := make(chan struct{}) - g.Add(func() error { - defer close(updates) - updates <- ring - <-cancel - return nil - }, func(error) { - close(cancel) - }) + return nil + +} + +func setupHashring(g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + conf *receiveConfig, + hashringChangedChan chan struct{}, + webHandler *receive.Handler, + statusProber prober.Probe, + +) error { + // Note: the hashring configuration watcher + // is the sender and thus closes the chan. + // In the single-node case, which has no configuration + // watcher, we close the chan ourselves. + updates := make(chan receive.Hashring, 1) + + // The Hashrings config file path is given initializing config watcher. + if conf.hashringsFilePath != "" { + cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) + if err != nil { + return errors.Wrap(err, "failed to initialize config watcher") } - cancel := make(chan struct{}) + // Check the hashring configuration on before running the watcher. + if err := cw.ValidateConfig(); err != nil { + cw.Stop() + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") + } + + ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - defer close(hashringChangedChan) - for { - select { - case h, ok := <-updates: - if !ok { - return nil - } - webHandler.Hashring(h) - msg := "hashring has changed; server is not ready to receive web requests" - statusProber.NotReady(errors.New(msg)) - level.Info(logger).Log("msg", msg) - hashringChangedChan <- struct{}{} - case <-cancel: - return nil - } + level.Info(logger).Log("msg", "the hashring initialized with config watcher.") + return receive.HashringFromConfigWatcher(ctx, updates, cw) + }, func(error) { + cancel() + }) + } else { + var ( + ring receive.Hashring + err error + ) + // The Hashrings config file content given initialize configuration from content. + if len(conf.hashringsFileContent) > 0 { + ring, err = receive.HashringFromConfig(conf.hashringsFileContent) + if err != nil { + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") } - }, func(err error) { + level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") + } else { + level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") + ring = receive.SingleNodeHashring(conf.endpoint) + } + + cancel := make(chan struct{}) + g.Add(func() error { + defer close(updates) + updates <- ring + <-cancel + return nil + }, func(error) { close(cancel) - }, - ) + }) } - level.Debug(logger).Log("msg", "setting up http server") - srv := httpserver.New(logger, reg, comp, httpProbe, - httpserver.WithListen(*conf.httpBindAddr), - httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), - httpserver.WithTLSConfig(*conf.httpTLSConfig), - ) + cancel := make(chan struct{}) g.Add(func() error { - statusProber.Healthy() - - return srv.ListenAndServe() + defer close(hashringChangedChan) + for { + select { + case h, ok := <-updates: + if !ok { + return nil + } + webHandler.Hashring(h) + msg := "hashring has changed; server is not ready to receive web requests" + statusProber.NotReady(errors.New(msg)) + level.Info(logger).Log("msg", msg) + hashringChangedChan <- struct{}{} + case <-cancel: + return nil + } + } }, func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) + close(cancel) + }, + ) + return nil +} - srv.Shutdown(err) +// startTSDB starts up the multi-tsdb and sets up the rungroup to flush the tsdb on hashring change. +func startTSDB(g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + dbs *receive.MultiTSDB, + dbReady chan struct{}, + uploadC chan struct{}, + hashringChangedChan chan struct{}, + upload bool, + uploadDone chan struct{}, + statusProber prober.Probe, + bkt objstore.Bucket, + +) error { + + log.With(logger, "component", "storage") + dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_attempted_total", + Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", + }) + dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_completed_total", + Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", }) - level.Debug(logger).Log("msg", "setting up grpc server") - { - var s *grpcserver.Server - startGRPC := make(chan struct{}) - g.Add(func() error { - defer close(startGRPC) + level.Debug(logger).Log("msg", "removing storage lock files if any") + if err := dbs.RemoveLockFilesIfAny(); err != nil { + return errors.Wrap(err, "remove storage lock files") + } - tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) - if err != nil { - return errors.Wrap(err, "setup gRPC server") + // TSDBs reload logic, listening on hashring changes. + cancel := make(chan struct{}) + g.Add(func() error { + defer close(dbReady) + defer close(uploadC) + + // Before quitting, ensure the WAL is flushed and the DBs are closed. + defer func() { + level.Info(logger).Log("msg", "shutting down storage") + if err := dbs.Flush(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") } + if err := dbs.Close(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to close storage") + return + } + level.Info(logger).Log("msg", "storage is closed") + }() - for range dbReady { - if s != nil { - s.Shutdown(errors.New("reload hashrings")) + for { + select { + case <-cancel: + return nil + case _, ok := <-hashringChangedChan: + if !ok { + return nil } + dbUpdatesStarted.Inc() + level.Info(logger).Log("msg", "updating storage") - rw := store.ReadWriteTSDBStore{ - StoreServer: store.NewMultiTSDBStore( - logger, - reg, - comp, - dbs.TSDBStores, - ), - WriteableStoreServer: webHandler, + if err := dbs.Flush(); err != nil { + return errors.Wrap(err, "flushing storage") } - - s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(rw)), - grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), - grpcserver.WithListen(*conf.grpcBindAddr), - grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), - grpcserver.WithTLSConfig(tlsCfg), - ) - startGRPC <- struct{}{} - } - if s != nil { - s.Shutdown(err) - } - return nil - }, func(error) {}) - // We need to be able to start and stop the gRPC server - // whenever the DB changes, thus it needs its own run group. - g.Add(func() error { - for range startGRPC { - level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) - if err := s.ListenAndServe(); err != nil { - return errors.Wrap(err, "serve gRPC") + if err := dbs.Open(); err != nil { + return errors.Wrap(err, "opening storage") + } + if upload { + uploadC <- struct{}{} + <-uploadDone } + statusProber.Ready() + level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") + dbUpdatesCompleted.Inc() + dbReady <- struct{}{} } - return nil - }, func(error) {}) - } - - level.Debug(logger).Log("msg", "setting up receive http handler") - { - g.Add( - func() error { - return errors.Wrap(webHandler.Run(), "error starting web server") - }, - func(err error) { - webHandler.Close() - }, - ) - } + } + }, func(err error) { + close(cancel) + }) if upload { logger := log.With(logger, "component", "uploader") @@ -516,7 +583,6 @@ func runReceive( } } - level.Info(logger).Log("msg", "starting receiver") return nil } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c59ba69b63..de201c36af 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -198,6 +198,7 @@ func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ru func RemoteWriteEndpoint(addr string) string { return fmt.Sprintf("http://%s/api/v1/receive", addr) } func NewReceiver(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { + localEndpoint := NewService(fmt.Sprintf("receive-%v", name), "", e2e.NewCommand("", ""), nil, 8080, 9091, 8081).GRPCNetworkEndpointFor(networkName) if len(hashring) == 0 { hashring = []receive.HashringConfig{{Endpoints: []string{localEndpoint}}} @@ -242,6 +243,50 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF return receiver, nil } +func NewReceiverWithDistributorMode(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { + + if len(hashring) == 0 { + return nil, errors.New("hashring should not be empty for receive-distributor mode") + } + + dir := filepath.Join(sharedDir, "data", "receive", name) + dataDir := filepath.Join(dir, "data") + container := filepath.Join(e2e.ContainerSharedDir, "data", "receive", name) + if err := os.MkdirAll(dataDir, 0750); err != nil { + return nil, errors.Wrap(err, "create receive dir") + } + b, err := json.Marshal(hashring) + if err != nil { + return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) + } + + receiver := NewService( + fmt.Sprintf("receive-%v", name), + DefaultImage(), + // TODO(bwplotka): BuildArgs should be interface. + e2e.NewCommand("receive", e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("receive-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--remote-write.address": ":8081", + "--label": fmt.Sprintf(`receive="%s"`, name), + "--tsdb.path": filepath.Join(container, "data"), + "--log.level": infoLogLevel, + "--receive.replication-factor": strconv.Itoa(replicationFactor), + "--receive.hashrings": string(b), + })...), + e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), + 8080, + 9091, + 8081, + ) + receiver.SetUser(strconv.Itoa(os.Getuid())) + receiver.SetBackoff(defaultBackoffConfig) + + return receiver, nil +} + func NewReceiverWithConfigWatcher(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { localEndpoint := NewService(fmt.Sprintf("receive-%v", name), "", e2e.NewCommand("", ""), nil, 8080, 9091, 8081).GRPCNetworkEndpointFor(networkName) if len(hashring) == 0 { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index cd078111b4..e664a5f021 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -56,6 +56,77 @@ func ErrorHandler(_ http.ResponseWriter, _ *http.Request, err error) { func TestReceive(t *testing.T) { t.Parallel() + // t.Run("receive_distributor_ingestor_mode", func(t *testing.T) { + // t.Parallel() + + // s, err := e2e.NewScenario("receive_distributor_ingestor_mode") + // testutil.Ok(t, err) + // t.Cleanup(e2ethanos.CleanScenario(t, s)) + + // // Setup 3 ingestors. + // i1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i1", 1) + // testutil.Ok(t, err) + // i2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i2", 1) + // testutil.Ok(t, err) + // i3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i3", 1) + // testutil.Ok(t, err) + + // h := receive.HashringConfig{ + // Endpoints: []string{ + // i1.GRPCNetworkEndpointFor(s.NetworkName()), + // i2.GRPCNetworkEndpointFor(s.NetworkName()), + // i3.GRPCNetworkEndpointFor(s.NetworkName()), + // }, + // } + + // // Setup 1 distributor + // d1, err := e2ethanos.NewReceiverWithDistributorMode(s.SharedDir(), s.NetworkName(), "d1", 1, h) + // testutil.Ok(t, err) + // testutil.Ok(t, s.StartAndWaitReady(i1, i2, i3, d1)) + + // prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + // testutil.Ok(t, err) + // prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + // testutil.Ok(t, err) + // prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + // testutil.Ok(t, err) + // testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) + + // q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") + // testutil.Ok(t, err) + // testutil.Ok(t, s.StartAndWaitReady(q)) + + // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + // t.Cleanup(cancel) + + // testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + // queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + // Deduplicate: false, + // }, []model.Metric{ + // { + // "job": "myself", + // "prometheus": "prom1", + // "receive": "i2", + // "replica": "0", + // "tenant_id": "default-tenant", + // }, + // { + // "job": "myself", + // "prometheus": "prom2", + // "receive": "i1", + // "replica": "0", + // "tenant_id": "default-tenant", + // }, + // { + // "job": "myself", + // "prometheus": "prom3", + // "receive": "i2", + // "replica": "0", + // "tenant_id": "default-tenant", + // }, + // }) + // }) t.Run("hashring", func(t *testing.T) { t.Parallel() From bd7f920480cf14aa2f177e9aeba0362b52ba4a66 Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Thu, 20 May 2021 23:02:12 +0530 Subject: [PATCH 2/8] added an e2e test and refactored some more code for receiver Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 110 ++++++++++++++++++------------ test/e2e/receive_test.go | 142 +++++++++++++++++++-------------------- 2 files changed, 137 insertions(+), 115 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 40b29a5767..346eb626ec 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -6,10 +6,8 @@ package main import ( "context" "io/ioutil" - "net" "os" "path" - "strings" "time" "github.com/go-kit/kit/log" @@ -74,17 +72,8 @@ func registerReceive(app *extkingpin.App) { AllowOverlappingBlocks: conf.tsdbAllowOverlappingBlocks, } - // Local is empty, so try to generate a local endpoint - // based on the hostname and the listening port. - if conf.endpoint == "" { - hostname, err := os.Hostname() - if hostname == "" || err != nil { - return errors.New("--receive.local-endpoint is empty and host could not be determined.") - } - parts := strings.Split(*conf.grpcBindAddr, ":") - port := parts[len(parts)-1] - conf.endpoint = net.JoinHostPort(hostname, port) - } + // enable ingestion if local endpoint is specified, otherwise run receiver in distributor mode. + enableIngestion := conf.endpoint != "" return runReceive( g, @@ -96,6 +85,7 @@ func registerReceive(app *extkingpin.App) { lset, component.Receive, metadata.HashFunc(conf.hashFunc), + enableIngestion, conf, ) }) @@ -112,14 +102,21 @@ func runReceive( lset labels.Labels, comp component.SourceStoreAPI, hashFunc metadata.HashFunc, + enableIngestion bool, conf *receiveConfig, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive") + + if !enableIngestion { + level.Info(logger).Log("msg", "ingestion is disabled for receiver") + } + rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) if err != nil { return err } + dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, @@ -140,23 +137,26 @@ func runReceive( if err != nil { return err } + upload := len(confContentYaml) > 0 - if upload { - if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { - if !conf.ignoreBlockSize { - return errors.Errorf("found that TSDB Max time is %d and Min time is %d. "+ - "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) + if enableIngestion { + if upload { + if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { + if !conf.ignoreBlockSize { + return errors.Errorf("found that TSDB Max time is %d and Min time is %d. "+ + "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) + } + level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") } - level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") - } - // The background shipper continuously scans the data directory and uploads - // new blocks to object storage service. - bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) - if err != nil { - return err + // The background shipper continuously scans the data directory and uploads + // new blocks to object storage service. + bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) + if err != nil { + return err + } + } else { + level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") } - } else { - level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") } // TODO(brancz): remove after a couple of versions @@ -212,10 +212,12 @@ func runReceive( // uploadDone signals when uploading has finished. uploadDone := make(chan struct{}, 1) - level.Debug(logger).Log("msg", "setting up tsdb") - { - if err := startTSDB(g, logger, reg, dbs, dbReady, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { - return err + if enableIngestion { + level.Debug(logger).Log("msg", "setting up tsdb") + { + if err := startTSDBAndUpload(g, logger, reg, dbs, dbReady, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { + return err + } } } @@ -253,6 +255,9 @@ func runReceive( if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPC, dbReady, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { return err } + if err := runGRPCServer(g, logger, s, startGRPC, conf); err != nil { + return err + } } level.Debug(logger).Log("msg", "setting up receive http handler") @@ -271,6 +276,31 @@ func runReceive( return nil } +// runGRPCServer starts the grpc server, once it receives a signal from the startGRPC channel. +func runGRPCServer(g *run.Group, + logger log.Logger, + s *grpcserver.Server, + startGRPC chan struct{}, + conf *receiveConfig, + +) error { + // We need to be able to start and stop the gRPC server + // whenever the DB changes, thus it needs its own run group. + g.Add(func() error { + for range startGRPC { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) + if err := s.ListenAndServe(); err != nil { + return errors.Wrap(err, "serve gRPC") + } + } + return nil + }, func(error) {}) + + return nil +} + +// setupGRPCServer sets up the configuration for the gRPC server. +// It also sets up a handler for reloading the server if tsdb reloads. func setupGRPCServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, @@ -323,22 +353,13 @@ func setupGRPCServer(g *run.Group, } return nil }, func(error) {}) - // We need to be able to start and stop the gRPC server - // whenever the DB changes, thus it needs its own run group. - g.Add(func() error { - for range startGRPC { - level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) - if err := s.ListenAndServe(); err != nil { - return errors.Wrap(err, "serve gRPC") - } - } - return nil - }, func(error) {}) return nil } +// setupHashring sets up the hashring configuration provided. +// If no hashring is provided, we setup a single node hashring with local endpoint. func setupHashring(g *run.Group, logger log.Logger, reg *prometheus.Registry, @@ -429,8 +450,9 @@ func setupHashring(g *run.Group, return nil } -// startTSDB starts up the multi-tsdb and sets up the rungroup to flush the tsdb on hashring change. -func startTSDB(g *run.Group, +// startTSDBAndUpload starts up the multi-tsdb and sets up the rungroup to flush the tsdb and reload on hashring change. +// It also uploads the tsdb to object store if upload is enabled. +func startTSDBAndUpload(g *run.Group, logger log.Logger, reg *prometheus.Registry, dbs *receive.MultiTSDB, diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index e664a5f021..70c63e1b73 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -56,77 +56,77 @@ func ErrorHandler(_ http.ResponseWriter, _ *http.Request, err error) { func TestReceive(t *testing.T) { t.Parallel() - // t.Run("receive_distributor_ingestor_mode", func(t *testing.T) { - // t.Parallel() - - // s, err := e2e.NewScenario("receive_distributor_ingestor_mode") - // testutil.Ok(t, err) - // t.Cleanup(e2ethanos.CleanScenario(t, s)) - - // // Setup 3 ingestors. - // i1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i1", 1) - // testutil.Ok(t, err) - // i2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i2", 1) - // testutil.Ok(t, err) - // i3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i3", 1) - // testutil.Ok(t, err) - - // h := receive.HashringConfig{ - // Endpoints: []string{ - // i1.GRPCNetworkEndpointFor(s.NetworkName()), - // i2.GRPCNetworkEndpointFor(s.NetworkName()), - // i3.GRPCNetworkEndpointFor(s.NetworkName()), - // }, - // } - - // // Setup 1 distributor - // d1, err := e2ethanos.NewReceiverWithDistributorMode(s.SharedDir(), s.NetworkName(), "d1", 1, h) - // testutil.Ok(t, err) - // testutil.Ok(t, s.StartAndWaitReady(i1, i2, i3, d1)) - - // prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) - // testutil.Ok(t, err) - // prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) - // testutil.Ok(t, err) - // prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) - // testutil.Ok(t, err) - // testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - - // q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") - // testutil.Ok(t, err) - // testutil.Ok(t, s.StartAndWaitReady(q)) - - // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) - // t.Cleanup(cancel) - - // testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) - - // queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ - // Deduplicate: false, - // }, []model.Metric{ - // { - // "job": "myself", - // "prometheus": "prom1", - // "receive": "i2", - // "replica": "0", - // "tenant_id": "default-tenant", - // }, - // { - // "job": "myself", - // "prometheus": "prom2", - // "receive": "i1", - // "replica": "0", - // "tenant_id": "default-tenant", - // }, - // { - // "job": "myself", - // "prometheus": "prom3", - // "receive": "i2", - // "replica": "0", - // "tenant_id": "default-tenant", - // }, - // }) - // }) + t.Run("receive_distributor_ingestor_mode", func(t *testing.T) { + t.Parallel() + + s, err := e2e.NewScenario("receive_distributor_ingestor_mode") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + // Setup 3 ingestors. + i1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i1", 1) + testutil.Ok(t, err) + i2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i2", 1) + testutil.Ok(t, err) + i3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i3", 1) + testutil.Ok(t, err) + + h := receive.HashringConfig{ + Endpoints: []string{ + i1.GRPCNetworkEndpointFor(s.NetworkName()), + i2.GRPCNetworkEndpointFor(s.NetworkName()), + i3.GRPCNetworkEndpointFor(s.NetworkName()), + }, + } + + // Setup 1 distributor + d1, err := e2ethanos.NewReceiverWithDistributorMode(s.SharedDir(), s.NetworkName(), "d1", 1, h) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(i1, i2, i3, d1)) + + prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) + + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom1", + "receive": "i2", + "replica": "0", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom2", + "receive": "i1", + "replica": "0", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom3", + "receive": "i2", + "replica": "0", + "tenant_id": "default-tenant", + }, + }) + }) t.Run("hashring", func(t *testing.T) { t.Parallel() From 51d67915a2d21c549848e005c5ec98fbcb3b2581 Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Thu, 20 May 2021 23:42:32 +0530 Subject: [PATCH 3/8] corrected logic for receiver Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 76 ++++++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 346eb626ec..66010d78d3 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -6,8 +6,10 @@ package main import ( "context" "io/ioutil" + "net" "os" "path" + "strings" "time" "github.com/go-kit/kit/log" @@ -72,8 +74,21 @@ func registerReceive(app *extkingpin.App) { AllowOverlappingBlocks: conf.tsdbAllowOverlappingBlocks, } - // enable ingestion if local endpoint is specified, otherwise run receiver in distributor mode. - enableIngestion := conf.endpoint != "" + // enable ingestion if endpoint is specified, or both the hashrings config are empty, + // otherwise run receiver in distributor mode. + enableIngestion := conf.endpoint != "" || (conf.hashringsFileContent == "" && conf.hashringsFilePath == "") + + // If endpoint and hashrings are empty, so try to generate a local endpoint + // based on the hostname and the listening port. + if conf.endpoint == "" && (conf.hashringsFileContent == "" && conf.hashringsFilePath == "") { + hostname, err := os.Hostname() + if hostname == "" || err != nil { + return errors.New("--receive.local-endpoint is empty and host could not be determined.") + } + parts := strings.Split(*conf.grpcBindAddr, ":") + port := parts[len(parts)-1] + conf.endpoint = net.JoinHostPort(hostname, port) + } return runReceive( g, @@ -203,8 +218,9 @@ func runReceive( // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. - // dbReady signals when TSDB is ready and the Store gRPC server can start. - dbReady := make(chan struct{}, 1) + // reloadGRPCServer signals when - (1)TSDB is ready and the Store gRPC server can start. + // (2) The Hashring files have changed. + reloadGRPCServer := make(chan struct{}, 1) // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. hashringChangedChan := make(chan struct{}, 1) // uploadC signals when new blocks should be uploaded. @@ -215,7 +231,7 @@ func runReceive( if enableIngestion { level.Debug(logger).Log("msg", "setting up tsdb") { - if err := startTSDBAndUpload(g, logger, reg, dbs, dbReady, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { + if err := startTSDBAndUpload(g, logger, reg, dbs, reloadGRPCServer, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { return err } } @@ -223,7 +239,7 @@ func runReceive( level.Debug(logger).Log("msg", "setting up hashring") { - if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber); err != nil { + if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, reloadGRPCServer, enableIngestion); err != nil { return err } } @@ -250,12 +266,13 @@ func runReceive( level.Debug(logger).Log("msg", "setting up grpc server") { var s *grpcserver.Server - startGRPC := make(chan struct{}) + // startGRPCListening re-starts the gRPC server once it receives a signal. + startGRPCListening := make(chan struct{}) - if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPC, dbReady, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { + if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPCListening, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { return err } - if err := runGRPCServer(g, logger, s, startGRPC, conf); err != nil { + if err := runGRPCServer(g, logger, s, startGRPCListening, conf); err != nil { return err } } @@ -276,18 +293,18 @@ func runReceive( return nil } -// runGRPCServer starts the grpc server, once it receives a signal from the startGRPC channel. +// runGRPCServer starts the grpc server, once it receives a signal from the startGRPCListening channel. func runGRPCServer(g *run.Group, logger log.Logger, s *grpcserver.Server, - startGRPC chan struct{}, + startGRPCListening chan struct{}, conf *receiveConfig, ) error { // We need to be able to start and stop the gRPC server // whenever the DB changes, thus it needs its own run group. g.Add(func() error { - for range startGRPC { + for range startGRPCListening { level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) if err := s.ListenAndServe(); err != nil { return errors.Wrap(err, "serve gRPC") @@ -306,8 +323,8 @@ func setupGRPCServer(g *run.Group, reg *prometheus.Registry, tracer opentracing.Tracer, conf *receiveConfig, s *grpcserver.Server, - startGRPC chan struct{}, - dbReady chan struct{}, + startGRPCListening chan struct{}, + reloadGRPCServer chan struct{}, comp component.SourceStoreAPI, dbs *receive.MultiTSDB, webHandler *receive.Handler, @@ -317,14 +334,14 @@ func setupGRPCServer(g *run.Group, ) error { g.Add(func() error { - defer close(startGRPC) + defer close(startGRPCListening) tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - for range dbReady { + for range reloadGRPCServer { if s != nil { s.Shutdown(errors.New("reload hashrings")) } @@ -346,7 +363,7 @@ func setupGRPCServer(g *run.Group, grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), grpcserver.WithTLSConfig(tlsCfg), ) - startGRPC <- struct{}{} + startGRPCListening <- struct{}{} } if s != nil { s.Shutdown(err) @@ -367,6 +384,8 @@ func setupHashring(g *run.Group, hashringChangedChan chan struct{}, webHandler *receive.Handler, statusProber prober.Probe, + reloadGRPCServer chan struct{}, + enableIngestion bool, ) error { // Note: the hashring configuration watcher @@ -427,7 +446,11 @@ func setupHashring(g *run.Group, cancel := make(chan struct{}) g.Add(func() error { - defer close(hashringChangedChan) + + if enableIngestion { + defer close(hashringChangedChan) + } + for { select { case h, ok := <-updates: @@ -438,7 +461,16 @@ func setupHashring(g *run.Group, msg := "hashring has changed; server is not ready to receive web requests" statusProber.NotReady(errors.New(msg)) level.Info(logger).Log("msg", msg) - hashringChangedChan <- struct{}{} + + if enableIngestion { + // send a signal to tsdb to reload, and then restart the gRPC server. + hashringChangedChan <- struct{}{} + } else { + // we dont need tsdb to reload, so restart the gRPC server. + level.Info(logger).Log("msg", "server has reloaded, ready to start accepting requests") + statusProber.Ready() + reloadGRPCServer <- struct{}{} + } case <-cancel: return nil } @@ -456,7 +488,7 @@ func startTSDBAndUpload(g *run.Group, logger log.Logger, reg *prometheus.Registry, dbs *receive.MultiTSDB, - dbReady chan struct{}, + reloadGRPCServer chan struct{}, uploadC chan struct{}, hashringChangedChan chan struct{}, upload bool, @@ -484,7 +516,7 @@ func startTSDBAndUpload(g *run.Group, // TSDBs reload logic, listening on hashring changes. cancel := make(chan struct{}) g.Add(func() error { - defer close(dbReady) + defer close(reloadGRPCServer) defer close(uploadC) // Before quitting, ensure the WAL is flushed and the DBs are closed. @@ -526,7 +558,7 @@ func startTSDBAndUpload(g *run.Group, statusProber.Ready() level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") dbUpdatesCompleted.Inc() - dbReady <- struct{}{} + reloadGRPCServer <- struct{}{} } } }, func(err error) { From a12006c295f4318101cd864882586bf8d0abb101 Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Thu, 20 May 2021 23:45:50 +0530 Subject: [PATCH 4/8] remove default endpoint Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 66010d78d3..bc1998bb8f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -6,10 +6,8 @@ package main import ( "context" "io/ioutil" - "net" "os" "path" - "strings" "time" "github.com/go-kit/kit/log" @@ -78,18 +76,6 @@ func registerReceive(app *extkingpin.App) { // otherwise run receiver in distributor mode. enableIngestion := conf.endpoint != "" || (conf.hashringsFileContent == "" && conf.hashringsFilePath == "") - // If endpoint and hashrings are empty, so try to generate a local endpoint - // based on the hostname and the listening port. - if conf.endpoint == "" && (conf.hashringsFileContent == "" && conf.hashringsFilePath == "") { - hostname, err := os.Hostname() - if hostname == "" || err != nil { - return errors.New("--receive.local-endpoint is empty and host could not be determined.") - } - parts := strings.Split(*conf.grpcBindAddr, ":") - port := parts[len(parts)-1] - conf.endpoint = net.JoinHostPort(hostname, port) - } - return runReceive( g, logger, @@ -219,7 +205,7 @@ func runReceive( // initial config and mark ourselves as ready after it completed. // reloadGRPCServer signals when - (1)TSDB is ready and the Store gRPC server can start. - // (2) The Hashring files have changed. + // (2) The Hashring files have changed if tsdb ingestion is disabled. reloadGRPCServer := make(chan struct{}, 1) // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. hashringChangedChan := make(chan struct{}, 1) From 4be0b7bf8ae702e898d0329800afa51d6661307f Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Fri, 21 May 2021 12:06:23 +0530 Subject: [PATCH 5/8] move run grpc to setup grpc Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index bc1998bb8f..f8feaca701 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -258,9 +258,9 @@ func runReceive( if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPCListening, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { return err } - if err := runGRPCServer(g, logger, s, startGRPCListening, conf); err != nil { - return err - } + // if err := runGRPCServer(g, logger, s, startGRPCListening, conf); err != nil { + // return err + // } } level.Debug(logger).Log("msg", "setting up receive http handler") @@ -357,6 +357,18 @@ func setupGRPCServer(g *run.Group, return nil }, func(error) {}) + // We need to be able to start and stop the gRPC server + // whenever the DB changes, thus it needs its own run group. + g.Add(func() error { + for range startGRPCListening { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) + if err := s.ListenAndServe(); err != nil { + return errors.Wrap(err, "serve gRPC") + } + } + return nil + }, func(error) {}) + return nil } From b635613549e9069509b7bfc1f86f842827be378a Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Fri, 21 May 2021 12:14:46 +0530 Subject: [PATCH 6/8] merged setup and run grpc server groups Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 43 +++++++++---------------------------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index f8feaca701..f34a658d03 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -251,16 +251,10 @@ func runReceive( level.Debug(logger).Log("msg", "setting up grpc server") { - var s *grpcserver.Server - // startGRPCListening re-starts the gRPC server once it receives a signal. - startGRPCListening := make(chan struct{}) - if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPCListening, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { + if err := setupAndRunGRPCServer(g, logger, reg, tracer, conf, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { return err } - // if err := runGRPCServer(g, logger, s, startGRPCListening, conf); err != nil { - // return err - // } } level.Debug(logger).Log("msg", "setting up receive http handler") @@ -279,37 +273,13 @@ func runReceive( return nil } -// runGRPCServer starts the grpc server, once it receives a signal from the startGRPCListening channel. -func runGRPCServer(g *run.Group, - logger log.Logger, - s *grpcserver.Server, - startGRPCListening chan struct{}, - conf *receiveConfig, - -) error { - // We need to be able to start and stop the gRPC server - // whenever the DB changes, thus it needs its own run group. - g.Add(func() error { - for range startGRPCListening { - level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) - if err := s.ListenAndServe(); err != nil { - return errors.Wrap(err, "serve gRPC") - } - } - return nil - }, func(error) {}) - - return nil -} - -// setupGRPCServer sets up the configuration for the gRPC server. +// setupAndRunGRPCServer sets up the configuration for the gRPC server. // It also sets up a handler for reloading the server if tsdb reloads. -func setupGRPCServer(g *run.Group, +func setupAndRunGRPCServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, - conf *receiveConfig, s *grpcserver.Server, - startGRPCListening chan struct{}, + conf *receiveConfig, reloadGRPCServer chan struct{}, comp component.SourceStoreAPI, dbs *receive.MultiTSDB, @@ -319,6 +289,11 @@ func setupGRPCServer(g *run.Group, grpcProbe *prober.GRPCProbe, ) error { + + var s *grpcserver.Server + // startGRPCListening re-starts the gRPC server once it receives a signal. + startGRPCListening := make(chan struct{}) + g.Add(func() error { defer close(startGRPCListening) From 3dfc12d3f5431e8b0f55d385a101c1cdeef5a019 Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Fri, 28 May 2021 17:37:59 +0530 Subject: [PATCH 7/8] Update cmd/thanos/receive.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lucas Servén Marín Update cmd/thanos/receive.go Co-authored-by: Lucas Servén Marín Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index f34a658d03..83ac57802f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -72,7 +72,8 @@ func registerReceive(app *extkingpin.App) { AllowOverlappingBlocks: conf.tsdbAllowOverlappingBlocks, } - // enable ingestion if endpoint is specified, or both the hashrings config are empty, + // Enable ingestion if endpoint is specified or if both the hashrings configs are empty. + // Otherwise, run the receiver exclusively as a distributor. // otherwise run receiver in distributor mode. enableIngestion := conf.endpoint != "" || (conf.hashringsFileContent == "" && conf.hashringsFilePath == "") @@ -359,7 +360,6 @@ func setupHashring(g *run.Group, statusProber prober.Probe, reloadGRPCServer chan struct{}, enableIngestion bool, - ) error { // Note: the hashring configuration watcher // is the sender and thus closes the chan. From 36866ab89efe76e0da4eb7197e1ec611bda3251e Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Fri, 28 May 2021 22:04:18 +0530 Subject: [PATCH 8/8] changed the e2e test name Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 2 -- test/e2e/e2ethanos/services.go | 2 +- test/e2e/receive_test.go | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 83ac57802f..982f477a36 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -74,7 +74,6 @@ func registerReceive(app *extkingpin.App) { // Enable ingestion if endpoint is specified or if both the hashrings configs are empty. // Otherwise, run the receiver exclusively as a distributor. - // otherwise run receiver in distributor mode. enableIngestion := conf.endpoint != "" || (conf.hashringsFileContent == "" && conf.hashringsFilePath == "") return runReceive( @@ -252,7 +251,6 @@ func runReceive( level.Debug(logger).Log("msg", "setting up grpc server") { - if err := setupAndRunGRPCServer(g, logger, reg, tracer, conf, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { return err } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index de201c36af..286528260b 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -243,7 +243,7 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF return receiver, nil } -func NewReceiverWithDistributorMode(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { +func NewReceiverWithoutTSDB(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { if len(hashring) == 0 { return nil, errors.New("hashring should not be empty for receive-distributor mode") diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 70c63e1b73..276cd3cf29 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -80,7 +80,7 @@ func TestReceive(t *testing.T) { } // Setup 1 distributor - d1, err := e2ethanos.NewReceiverWithDistributorMode(s.SharedDir(), s.NetworkName(), "d1", 1, h) + d1, err := e2ethanos.NewReceiverWithoutTSDB(s.SharedDir(), s.NetworkName(), "d1", 1, h) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(i1, i2, i3, d1))