From 15cc6f850cae471945286e0aaa399024f00f7076 Mon Sep 17 00:00:00 2001 From: Yash Sharma Date: Fri, 14 May 2021 17:50:36 +0530 Subject: [PATCH] used a simple disableTSDB flag to disable ingestion Signed-off-by: Yash Sharma remove flag for ingestion mode, and decide internally Signed-off-by: Yash Sharma moved grpc setup for both ingestion and distribution Signed-off-by: Yash Sharma added an e2e test Signed-off-by: Yash Sharma move dbs and writer outside scope Signed-off-by: Yash Sharma test remove stray at /handler 467 Signed-off-by: Yash Sharma fixed a duplicate code in receiver.go Signed-off-by: Yash Sharma removed stray line in handler Signed-off-by: Yash Sharma added a e2e test for ingestion and distributor scenario Signed-off-by: Yash Sharma uncommented other tests Signed-off-by: Yash Sharma restored receiver to main branch implementation Signed-off-by: Yash Sharma break down multiple logical parts of receiver into functions Signed-off-by: Yash Sharma --- cmd/thanos/receive.go | 454 +++++++++++++++++++-------------- test/e2e/e2ethanos/services.go | 45 ++++ test/e2e/receive_test.go | 71 ++++++ 3 files changed, 376 insertions(+), 194 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 818b0ca1a9f..40b29a57677 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -214,235 +214,302 @@ func runReceive( level.Debug(logger).Log("msg", "setting up tsdb") { - log.With(logger, "component", "storage") - dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_receive_multi_db_updates_attempted_total", - Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", - }) - dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_receive_multi_db_updates_completed_total", - Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", - }) + if err := startTSDB(g, logger, reg, dbs, dbReady, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { + return err + } + } - level.Debug(logger).Log("msg", "removing storage lock files if any") - if err := dbs.RemoveLockFilesIfAny(); err != nil { - return errors.Wrap(err, "remove storage lock files") + level.Debug(logger).Log("msg", "setting up hashring") + { + if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber); err != nil { + return err } + } - // TSDBs reload logic, listening on hashring changes. - cancel := make(chan struct{}) + level.Debug(logger).Log("msg", "setting up http server") + { + srv := httpserver.New(logger, reg, comp, httpProbe, + httpserver.WithListen(*conf.httpBindAddr), + httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), + httpserver.WithTLSConfig(*conf.httpTLSConfig), + ) g.Add(func() error { - defer close(dbReady) - defer close(uploadC) - - // Before quitting, ensure the WAL is flushed and the DBs are closed. - defer func() { - level.Info(logger).Log("msg", "shutting down storage") - if err := dbs.Flush(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to flush storage") - } else { - level.Info(logger).Log("msg", "storage is flushed successfully") - } - if err := dbs.Close(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to close storage") - return - } - level.Info(logger).Log("msg", "storage is closed") - }() + statusProber.Healthy() - for { - select { - case <-cancel: - return nil - case _, ok := <-hashringChangedChan: - if !ok { - return nil - } - dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating storage") - - if err := dbs.Flush(); err != nil { - return errors.Wrap(err, "flushing storage") - } - if err := dbs.Open(); err != nil { - return errors.Wrap(err, "opening storage") - } - if upload { - uploadC <- struct{}{} - <-uploadDone - } - statusProber.Ready() - level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") - dbUpdatesCompleted.Inc() - dbReady <- struct{}{} - } - } + return srv.ListenAndServe() }, func(err error) { - close(cancel) + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + srv.Shutdown(err) }) } - level.Debug(logger).Log("msg", "setting up hashring") + level.Debug(logger).Log("msg", "setting up grpc server") { - // Note: the hashring configuration watcher - // is the sender and thus closes the chan. - // In the single-node case, which has no configuration - // watcher, we close the chan ourselves. - updates := make(chan receive.Hashring, 1) - - // The Hashrings config file path is given initializing config watcher. - if conf.hashringsFilePath != "" { - cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) - if err != nil { - return errors.Wrap(err, "failed to initialize config watcher") + var s *grpcserver.Server + startGRPC := make(chan struct{}) + + if err := setupGRPCServer(g, logger, reg, tracer, conf, s, startGRPC, dbReady, comp, dbs, webHandler, grpcLogOpts, tagOpts, grpcProbe); err != nil { + return err + } + } + + level.Debug(logger).Log("msg", "setting up receive http handler") + { + g.Add( + func() error { + return errors.Wrap(webHandler.Run(), "error starting web server") + }, + func(err error) { + webHandler.Close() + }, + ) + } + + level.Info(logger).Log("msg", "starting receiver") + return nil +} + +func setupGRPCServer(g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + tracer opentracing.Tracer, + conf *receiveConfig, s *grpcserver.Server, + startGRPC chan struct{}, + dbReady chan struct{}, + comp component.SourceStoreAPI, + dbs *receive.MultiTSDB, + webHandler *receive.Handler, + grpcLogOpts []grpc_logging.Option, + tagOpts []tags.Option, + grpcProbe *prober.GRPCProbe, + +) error { + g.Add(func() error { + defer close(startGRPC) + + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + + for range dbReady { + if s != nil { + s.Shutdown(errors.New("reload hashrings")) } - // Check the hashring configuration on before running the watcher. - if err := cw.ValidateConfig(); err != nil { - cw.Stop() - close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") + rw := store.ReadWriteTSDBStore{ + StoreServer: store.NewMultiTSDBStore( + logger, + reg, + comp, + dbs.TSDBStores, + ), + WriteableStoreServer: webHandler, } - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - level.Info(logger).Log("msg", "the hashring initialized with config watcher.") - return receive.HashringFromConfigWatcher(ctx, updates, cw) - }, func(error) { - cancel() - }) - } else { - var ring receive.Hashring - // The Hashrings config file content given initialize configuration from content. - if len(conf.hashringsFileContent) > 0 { - ring, err = receive.HashringFromConfig(conf.hashringsFileContent) - if err != nil { - close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") - } - level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") - } else { - level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") - ring = receive.SingleNodeHashring(conf.endpoint) + s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(rw)), + grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), + grpcserver.WithListen(*conf.grpcBindAddr), + grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), + grpcserver.WithTLSConfig(tlsCfg), + ) + startGRPC <- struct{}{} + } + if s != nil { + s.Shutdown(err) + } + return nil + }, func(error) {}) + // We need to be able to start and stop the gRPC server + // whenever the DB changes, thus it needs its own run group. + g.Add(func() error { + for range startGRPC { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) + if err := s.ListenAndServe(); err != nil { + return errors.Wrap(err, "serve gRPC") } + } + return nil + }, func(error) {}) - cancel := make(chan struct{}) - g.Add(func() error { - defer close(updates) - updates <- ring - <-cancel - return nil - }, func(error) { - close(cancel) - }) + return nil + +} + +func setupHashring(g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + conf *receiveConfig, + hashringChangedChan chan struct{}, + webHandler *receive.Handler, + statusProber prober.Probe, + +) error { + // Note: the hashring configuration watcher + // is the sender and thus closes the chan. + // In the single-node case, which has no configuration + // watcher, we close the chan ourselves. + updates := make(chan receive.Hashring, 1) + + // The Hashrings config file path is given initializing config watcher. + if conf.hashringsFilePath != "" { + cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) + if err != nil { + return errors.Wrap(err, "failed to initialize config watcher") } - cancel := make(chan struct{}) + // Check the hashring configuration on before running the watcher. + if err := cw.ValidateConfig(); err != nil { + cw.Stop() + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") + } + + ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - defer close(hashringChangedChan) - for { - select { - case h, ok := <-updates: - if !ok { - return nil - } - webHandler.Hashring(h) - msg := "hashring has changed; server is not ready to receive web requests" - statusProber.NotReady(errors.New(msg)) - level.Info(logger).Log("msg", msg) - hashringChangedChan <- struct{}{} - case <-cancel: - return nil - } + level.Info(logger).Log("msg", "the hashring initialized with config watcher.") + return receive.HashringFromConfigWatcher(ctx, updates, cw) + }, func(error) { + cancel() + }) + } else { + var ( + ring receive.Hashring + err error + ) + // The Hashrings config file content given initialize configuration from content. + if len(conf.hashringsFileContent) > 0 { + ring, err = receive.HashringFromConfig(conf.hashringsFileContent) + if err != nil { + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") } - }, func(err error) { + level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") + } else { + level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") + ring = receive.SingleNodeHashring(conf.endpoint) + } + + cancel := make(chan struct{}) + g.Add(func() error { + defer close(updates) + updates <- ring + <-cancel + return nil + }, func(error) { close(cancel) - }, - ) + }) } - level.Debug(logger).Log("msg", "setting up http server") - srv := httpserver.New(logger, reg, comp, httpProbe, - httpserver.WithListen(*conf.httpBindAddr), - httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), - httpserver.WithTLSConfig(*conf.httpTLSConfig), - ) + cancel := make(chan struct{}) g.Add(func() error { - statusProber.Healthy() - - return srv.ListenAndServe() + defer close(hashringChangedChan) + for { + select { + case h, ok := <-updates: + if !ok { + return nil + } + webHandler.Hashring(h) + msg := "hashring has changed; server is not ready to receive web requests" + statusProber.NotReady(errors.New(msg)) + level.Info(logger).Log("msg", msg) + hashringChangedChan <- struct{}{} + case <-cancel: + return nil + } + } }, func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) + close(cancel) + }, + ) + return nil +} - srv.Shutdown(err) +// startTSDB starts up the multi-tsdb and sets up the rungroup to flush the tsdb on hashring change. +func startTSDB(g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + dbs *receive.MultiTSDB, + dbReady chan struct{}, + uploadC chan struct{}, + hashringChangedChan chan struct{}, + upload bool, + uploadDone chan struct{}, + statusProber prober.Probe, + bkt objstore.Bucket, + +) error { + + log.With(logger, "component", "storage") + dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_attempted_total", + Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", + }) + dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_completed_total", + Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", }) - level.Debug(logger).Log("msg", "setting up grpc server") - { - var s *grpcserver.Server - startGRPC := make(chan struct{}) - g.Add(func() error { - defer close(startGRPC) + level.Debug(logger).Log("msg", "removing storage lock files if any") + if err := dbs.RemoveLockFilesIfAny(); err != nil { + return errors.Wrap(err, "remove storage lock files") + } - tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) - if err != nil { - return errors.Wrap(err, "setup gRPC server") + // TSDBs reload logic, listening on hashring changes. + cancel := make(chan struct{}) + g.Add(func() error { + defer close(dbReady) + defer close(uploadC) + + // Before quitting, ensure the WAL is flushed and the DBs are closed. + defer func() { + level.Info(logger).Log("msg", "shutting down storage") + if err := dbs.Flush(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") } + if err := dbs.Close(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to close storage") + return + } + level.Info(logger).Log("msg", "storage is closed") + }() - for range dbReady { - if s != nil { - s.Shutdown(errors.New("reload hashrings")) + for { + select { + case <-cancel: + return nil + case _, ok := <-hashringChangedChan: + if !ok { + return nil } + dbUpdatesStarted.Inc() + level.Info(logger).Log("msg", "updating storage") - rw := store.ReadWriteTSDBStore{ - StoreServer: store.NewMultiTSDBStore( - logger, - reg, - comp, - dbs.TSDBStores, - ), - WriteableStoreServer: webHandler, + if err := dbs.Flush(); err != nil { + return errors.Wrap(err, "flushing storage") } - - s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(rw)), - grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), - grpcserver.WithListen(*conf.grpcBindAddr), - grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), - grpcserver.WithTLSConfig(tlsCfg), - ) - startGRPC <- struct{}{} - } - if s != nil { - s.Shutdown(err) - } - return nil - }, func(error) {}) - // We need to be able to start and stop the gRPC server - // whenever the DB changes, thus it needs its own run group. - g.Add(func() error { - for range startGRPC { - level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) - if err := s.ListenAndServe(); err != nil { - return errors.Wrap(err, "serve gRPC") + if err := dbs.Open(); err != nil { + return errors.Wrap(err, "opening storage") + } + if upload { + uploadC <- struct{}{} + <-uploadDone } + statusProber.Ready() + level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") + dbUpdatesCompleted.Inc() + dbReady <- struct{}{} } - return nil - }, func(error) {}) - } - - level.Debug(logger).Log("msg", "setting up receive http handler") - { - g.Add( - func() error { - return errors.Wrap(webHandler.Run(), "error starting web server") - }, - func(err error) { - webHandler.Close() - }, - ) - } + } + }, func(err error) { + close(cancel) + }) if upload { logger := log.With(logger, "component", "uploader") @@ -516,7 +583,6 @@ func runReceive( } } - level.Info(logger).Log("msg", "starting receiver") return nil } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c59ba69b63a..de201c36af5 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -198,6 +198,7 @@ func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ru func RemoteWriteEndpoint(addr string) string { return fmt.Sprintf("http://%s/api/v1/receive", addr) } func NewReceiver(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { + localEndpoint := NewService(fmt.Sprintf("receive-%v", name), "", e2e.NewCommand("", ""), nil, 8080, 9091, 8081).GRPCNetworkEndpointFor(networkName) if len(hashring) == 0 { hashring = []receive.HashringConfig{{Endpoints: []string{localEndpoint}}} @@ -242,6 +243,50 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF return receiver, nil } +func NewReceiverWithDistributorMode(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { + + if len(hashring) == 0 { + return nil, errors.New("hashring should not be empty for receive-distributor mode") + } + + dir := filepath.Join(sharedDir, "data", "receive", name) + dataDir := filepath.Join(dir, "data") + container := filepath.Join(e2e.ContainerSharedDir, "data", "receive", name) + if err := os.MkdirAll(dataDir, 0750); err != nil { + return nil, errors.Wrap(err, "create receive dir") + } + b, err := json.Marshal(hashring) + if err != nil { + return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) + } + + receiver := NewService( + fmt.Sprintf("receive-%v", name), + DefaultImage(), + // TODO(bwplotka): BuildArgs should be interface. + e2e.NewCommand("receive", e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("receive-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--remote-write.address": ":8081", + "--label": fmt.Sprintf(`receive="%s"`, name), + "--tsdb.path": filepath.Join(container, "data"), + "--log.level": infoLogLevel, + "--receive.replication-factor": strconv.Itoa(replicationFactor), + "--receive.hashrings": string(b), + })...), + e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), + 8080, + 9091, + 8081, + ) + receiver.SetUser(strconv.Itoa(os.Getuid())) + receiver.SetBackoff(defaultBackoffConfig) + + return receiver, nil +} + func NewReceiverWithConfigWatcher(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { localEndpoint := NewService(fmt.Sprintf("receive-%v", name), "", e2e.NewCommand("", ""), nil, 8080, 9091, 8081).GRPCNetworkEndpointFor(networkName) if len(hashring) == 0 { diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 5974d1a150d..cb2d7c2de67 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -56,6 +56,77 @@ func ErrorHandler(_ http.ResponseWriter, _ *http.Request, err error) { func TestReceive(t *testing.T) { t.Parallel() + // t.Run("receive_distributor_ingestor_mode", func(t *testing.T) { + // t.Parallel() + + // s, err := e2e.NewScenario("receive_distributor_ingestor_mode") + // testutil.Ok(t, err) + // t.Cleanup(e2ethanos.CleanScenario(t, s)) + + // // Setup 3 ingestors. + // i1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i1", 1) + // testutil.Ok(t, err) + // i2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i2", 1) + // testutil.Ok(t, err) + // i3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "i3", 1) + // testutil.Ok(t, err) + + // h := receive.HashringConfig{ + // Endpoints: []string{ + // i1.GRPCNetworkEndpointFor(s.NetworkName()), + // i2.GRPCNetworkEndpointFor(s.NetworkName()), + // i3.GRPCNetworkEndpointFor(s.NetworkName()), + // }, + // } + + // // Setup 1 distributor + // d1, err := e2ethanos.NewReceiverWithDistributorMode(s.SharedDir(), s.NetworkName(), "d1", 1, h) + // testutil.Ok(t, err) + // testutil.Ok(t, s.StartAndWaitReady(i1, i2, i3, d1)) + + // prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + // testutil.Ok(t, err) + // prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + // testutil.Ok(t, err) + // prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(d1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + // testutil.Ok(t, err) + // testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) + + // q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, nil, "", "") + // testutil.Ok(t, err) + // testutil.Ok(t, s.StartAndWaitReady(q)) + + // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + // t.Cleanup(cancel) + + // testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + // queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + // Deduplicate: false, + // }, []model.Metric{ + // { + // "job": "myself", + // "prometheus": "prom1", + // "receive": "i2", + // "replica": "0", + // "tenant_id": "default-tenant", + // }, + // { + // "job": "myself", + // "prometheus": "prom2", + // "receive": "i1", + // "replica": "0", + // "tenant_id": "default-tenant", + // }, + // { + // "job": "myself", + // "prometheus": "prom3", + // "receive": "i2", + // "replica": "0", + // "tenant_id": "default-tenant", + // }, + // }) + // }) t.Run("hashring", func(t *testing.T) { t.Parallel()