diff --git a/node/builder.go b/node/builder.go index acda5a07a80..6386f78a608 100644 --- a/node/builder.go +++ b/node/builder.go @@ -222,7 +222,7 @@ var LibP2P = Options( Override(ConnGaterKey, lp2p.ConnGaterOption), // Services (resource management) - Override(new(network.ResourceManager), lp2p.ResourceManager), + Override(new(network.ResourceManager), lp2p.ResourceManager(200)), Override(ResourceManagerKey, lp2p.ResourceManagerOption), ) @@ -282,6 +282,7 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option { cfg.Libp2p.ConnMgrHigh, time.Duration(cfg.Libp2p.ConnMgrGrace), cfg.Libp2p.ProtectedPeers)), + Override(new(network.ResourceManager), lp2p.ResourceManager(cfg.Libp2p.ConnMgrHigh)), Override(new(*pubsub.PubSub), lp2p.GossipSub), Override(new(*config.Pubsub), &cfg.Pubsub), diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 0bc4dd6b215..d0906fd8fc3 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/bits" "os" "path/filepath" @@ -15,6 +16,8 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" + logging "github.com/ipfs/go-log/v2" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" @@ -22,55 +25,104 @@ import ( "go.opencensus.io/tag" ) -func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { - var limiter *rcmgr.BasicLimiter - var opts []rcmgr.Option - - repoPath := repo.Path() +func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { + return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { + envvar := os.Getenv("LOTUS_RCMGR") + if envvar == "" || envvar == "0" { + // TODO opt-in for now -- flip this to enabled by default once we are comfortable with testing + log.Info("libp2p resource manager is disabled") + return network.NullResourceManager, nil + } - // create limiter -- parse $repo/limits.json if exists - limitsFile := filepath.Join(repoPath, "limits.json") - limitsIn, err := os.Open(limitsFile) - switch { - case err == nil: - defer limitsIn.Close() //nolint:errcheck - limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn) - if err != nil { - return nil, fmt.Errorf("error parsing limit file: %w", err) + log.Info("libp2p resource manager is enabled") + // enable debug logs for rcmgr + logging.SetLogLevel("rcmgr", "debug") + + // Adjust default limits + // - give it more memory, up to 4G, min of 1G + // - if maxconns are too high, adjust Conn/FD/Stream limits + defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30) + maxconns := int(connMgrHi) + if 2*maxconns > defaultLimits.SystemBaseLimit.ConnsInbound { + // adjust conns to 2x to allow for two conns per peer (TCP+QUIC) + defaultLimits.SystemBaseLimit.ConnsInbound = logScale(2 * maxconns) + defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(2 * maxconns) + defaultLimits.SystemBaseLimit.Conns = logScale(4 * maxconns) + + defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns) + defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns) + defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns) + + if 2*maxconns > defaultLimits.SystemBaseLimit.FD { + defaultLimits.SystemBaseLimit.FD = logScale(2 * maxconns) + } + + defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns) + defaultLimits.ServiceBaseLimit.StreamsOutbound = logScale(32 * maxconns) + defaultLimits.ServiceBaseLimit.Streams = logScale(32 * maxconns) + + defaultLimits.ProtocolBaseLimit.StreamsInbound = logScale(8 * maxconns) + defaultLimits.ProtocolBaseLimit.StreamsOutbound = logScale(32 * maxconns) + defaultLimits.ProtocolBaseLimit.Streams = logScale(32 * maxconns) + + log.Info("adjusted default resource manager limits") } - case errors.Is(err, os.ErrNotExist): - limiter = rcmgr.NewDefaultLimiter() + // initialize + var limiter *rcmgr.BasicLimiter + var opts []rcmgr.Option - default: - return nil, err - } + repoPath := repo.Path() - // TODO: also set appropriate default limits for lotus protocols - libp2p.SetDefaultServiceLimits(limiter) + // create limiter -- parse $repo/limits.json if exists + limitsFile := filepath.Join(repoPath, "limits.json") + limitsIn, err := os.Open(limitsFile) + switch { + case err == nil: + defer limitsIn.Close() //nolint:errcheck + limiter, err = rcmgr.NewLimiterFromJSON(limitsIn, defaultLimits) + if err != nil { + return nil, fmt.Errorf("error parsing limit file: %w", err) + } - opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) + case errors.Is(err, os.ErrNotExist): + limiter = rcmgr.NewStaticLimiter(defaultLimits) - if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { - debugPath := filepath.Join(repoPath, "debug") - if err := os.MkdirAll(debugPath, 0755); err != nil { - return nil, fmt.Errorf("error creating debug directory: %w", err) + default: + return nil, err } - traceFile := filepath.Join(debugPath, "rcmgr.json.gz") - opts = append(opts, rcmgr.WithTrace(traceFile)) - } - mgr, err := rcmgr.NewResourceManager(limiter, opts...) - if err != nil { - return nil, fmt.Errorf("error creating resource manager: %w", err) - } + // TODO: also set appropriate default limits for lotus protocols + libp2p.SetDefaultServiceLimits(limiter) + + opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return mgr.Close() - }}) + if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { + debugPath := filepath.Join(repoPath, "debug") + if err := os.MkdirAll(debugPath, 0755); err != nil { + return nil, fmt.Errorf("error creating debug directory: %w", err) + } + traceFile := filepath.Join(debugPath, "rcmgr.json.gz") + opts = append(opts, rcmgr.WithTrace(traceFile)) + } + + mgr, err := rcmgr.NewResourceManager(limiter, opts...) + if err != nil { + return nil, fmt.Errorf("error creating resource manager: %w", err) + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return mgr.Close() + }}) + + return mgr, nil + } +} - return mgr, nil +func logScale(val int) int { + bitlen := bits.Len(uint(val)) + return 1 << bitlen } func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {