From 2a573f01daa7f80212beb1acb79fabcaf4f8166d Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 25 Apr 2022 10:41:34 -0400 Subject: [PATCH] fix: data race issues with api.Server (backport #11724) (#11748) --- CHANGELOG.md | 4 +++ server/api/server.go | 75 +++++++++++++++++--------------------------- 2 files changed, 33 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e309d113bbab..e042a9cc56d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,10 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Bug Fixes + +* [\#11724](https://github.com/cosmos/cosmos-sdk/pull/11724) Fix data race issues with `api.Server`. + ### Improvements * [\#11693](https://github.com/cosmos/cosmos-sdk/pull/11693) Add validation for gentx cmd. diff --git a/server/api/server.go b/server/api/server.go index c48745edd308..be92dfcf2312 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -31,10 +31,9 @@ type Server struct { Router *mux.Router GRPCGatewayRouter *runtime.ServeMux ClientCtx client.Context - GRPCSrv *grpc.Server - logger log.Logger - metrics *telemetry.Metrics + logger log.Logger + metrics *telemetry.Metrics // Start() is blocking and generally called from a separate goroutine. // Close() can be called asynchronously and access shared memory // via the listener. Therefore, we sync access to Start and Close with @@ -90,21 +89,28 @@ func New(clientCtx client.Context, logger log.Logger, grpcSrv *grpc.Server) *Ser // Start starts the API server. Internally, the API server leverages CometBFT's // JSON RPC server. Configuration options are provided via config.APIConfig -// and are delegated to the CometBFT JSON RPC server. -// -// Note, this creates a blocking process if the server is started successfully. -// Otherwise, an error is returned. The caller is expected to provide a Context -// that is properly canceled or closed to indicate the server should be stopped. -func (s *Server) Start(ctx context.Context, cfg config.Config) error { +// and are delegated to the Tendermint JSON RPC server. The process is +// non-blocking, so an external signal handler must be used. +func (s *Server) Start(cfg config.Config) error { s.mtx.Lock() + if cfg.Telemetry.Enabled { + m, err := telemetry.New(cfg.Telemetry) + if err != nil { + s.mtx.Unlock() + return err + } + + s.metrics = m + s.registerMetrics() + } - cmtCfg := tmrpcserver.DefaultConfig() - cmtCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections) - cmtCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second - cmtCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second - cmtCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes) + tmCfg := tmrpcserver.DefaultConfig() + tmCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections) + tmCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second + tmCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second + tmCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes) - listener, err := tmrpcserver.Listen(cfg.API.Address, cmtCfg.MaxOpenConnections) + listener, err := tmrpcserver.Listen(cfg.API.Address, tmCfg) if err != nil { s.mtx.Unlock() return err @@ -113,38 +119,15 @@ func (s *Server) Start(ctx context.Context, cfg config.Config) error { s.listener = listener s.mtx.Unlock() - // register grpc-gateway routes - s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter) - - errCh := make(chan error) - - // Start the API in an external goroutine as Serve is blocking and will return - // an error upon failure, which we'll send on the error channel that will be - // consumed by the for block below. - go func(enableUnsafeCORS bool) { - s.logger.Info("starting API server...", "address", cfg.API.Address) - - if enableUnsafeCORS { - allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) - errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), servercmtlog.CometLoggerWrapper{Logger: s.logger}, cmtCfg) - } else { - errCh <- tmrpcserver.Serve(s.listener, s.Router, servercmtlog.CometLoggerWrapper{Logger: s.logger}, cmtCfg) - } - }(cfg.API.EnableUnsafeCORS) - - // Start a blocking select to wait for an indication to stop the server or that - // the server failed to start properly. - select { - case <-ctx.Done(): - // The calling process canceled or closed the provided context, so we must - // gracefully stop the API server. - s.logger.Info("stopping API server...", "address", cfg.API.Address) - return s.Close() - - case err := <-errCh: - s.logger.Error("failed to start API server", "err", err) - return err + if cfg.API.EnableUnsafeCORS { + allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) + s.mtx.Unlock() + return tmrpcserver.Serve(s.listener, allowAllCORS(h), s.logger, tmCfg) } + + s.logger.Info("starting API server...") + s.mtx.Unlock() + return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) } // Close closes the API server.