Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve resource manager integration #8318

Merged
merged 2 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ var LibP2P = Options(
Override(ConnGaterKey, lp2p.ConnGaterOption),

// Services (resource management)
Override(new(network.ResourceManager), lp2p.ResourceManager),
Override(new(network.ResourceManager), lp2p.ResourceManager(200)),
Override(ResourceManagerKey, lp2p.ResourceManagerOption),
)

Expand Down Expand Up @@ -282,6 +282,7 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option {
cfg.Libp2p.ConnMgrHigh,
time.Duration(cfg.Libp2p.ConnMgrGrace),
cfg.Libp2p.ProtectedPeers)),
Override(new(network.ResourceManager), lp2p.ResourceManager(cfg.Libp2p.ConnMgrHigh)),
Override(new(*pubsub.PubSub), lp2p.GossipSub),
Override(new(*config.Pubsub), &cfg.Pubsub),

Expand Down
128 changes: 90 additions & 38 deletions node/modules/lp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/bits"
"os"
"path/filepath"

Expand All @@ -15,62 +16,113 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"

logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
)

func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
var limiter *rcmgr.BasicLimiter
var opts []rcmgr.Option

repoPath := repo.Path()
func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
envvar := os.Getenv("LOTUS_RCMGR")
if envvar == "" || envvar == "0" {
// TODO opt-in for now -- flip this to enabled by default once we are comfortable with testing
log.Info("libp2p resource manager is disabled")
return network.NullResourceManager, nil
}

// create limiter -- parse $repo/limits.json if exists
limitsFile := filepath.Join(repoPath, "limits.json")
limitsIn, err := os.Open(limitsFile)
switch {
case err == nil:
defer limitsIn.Close() //nolint:errcheck
limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn)
if err != nil {
return nil, fmt.Errorf("error parsing limit file: %w", err)
log.Info("libp2p resource manager is enabled")
// enable debug logs for rcmgr
logging.SetLogLevel("rcmgr", "debug")

// Adjust default limits
// - give it more memory, up to 4G, min of 1G
// - if maxconns are too high, adjust Conn/FD/Stream limits
defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30)
maxconns := int(connMgrHi)
if 2*maxconns > defaultLimits.SystemBaseLimit.ConnsInbound {
// adjust conns to 2x to allow for two conns per peer (TCP+QUIC)
defaultLimits.SystemBaseLimit.ConnsInbound = logScale(2 * maxconns)
defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(2 * maxconns)
defaultLimits.SystemBaseLimit.Conns = logScale(4 * maxconns)

defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns)
defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns)
defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns)

if 2*maxconns > defaultLimits.SystemBaseLimit.FD {
defaultLimits.SystemBaseLimit.FD = logScale(2 * maxconns)
}

defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns)
defaultLimits.ServiceBaseLimit.StreamsOutbound = logScale(32 * maxconns)
defaultLimits.ServiceBaseLimit.Streams = logScale(32 * maxconns)

defaultLimits.ProtocolBaseLimit.StreamsInbound = logScale(8 * maxconns)
defaultLimits.ProtocolBaseLimit.StreamsOutbound = logScale(32 * maxconns)
defaultLimits.ProtocolBaseLimit.Streams = logScale(32 * maxconns)

log.Info("adjusted default resource manager limits")
}

case errors.Is(err, os.ErrNotExist):
limiter = rcmgr.NewDefaultLimiter()
// initialize
var limiter *rcmgr.BasicLimiter
var opts []rcmgr.Option

default:
return nil, err
}
repoPath := repo.Path()

// TODO: also set appropriate default limits for lotus protocols
libp2p.SetDefaultServiceLimits(limiter)
// create limiter -- parse $repo/limits.json if exists
limitsFile := filepath.Join(repoPath, "limits.json")
limitsIn, err := os.Open(limitsFile)
switch {
case err == nil:
defer limitsIn.Close() //nolint:errcheck
limiter, err = rcmgr.NewLimiterFromJSON(limitsIn, defaultLimits)
if err != nil {
return nil, fmt.Errorf("error parsing limit file: %w", err)
}

opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
case errors.Is(err, os.ErrNotExist):
limiter = rcmgr.NewStaticLimiter(defaultLimits)

if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
debugPath := filepath.Join(repoPath, "debug")
if err := os.MkdirAll(debugPath, 0755); err != nil {
return nil, fmt.Errorf("error creating debug directory: %w", err)
default:
return nil, err
}
traceFile := filepath.Join(debugPath, "rcmgr.json.gz")
opts = append(opts, rcmgr.WithTrace(traceFile))
}

mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, fmt.Errorf("error creating resource manager: %w", err)
}
// TODO: also set appropriate default limits for lotus protocols
libp2p.SetDefaultServiceLimits(limiter)

opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))

lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mgr.Close()
}})
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
debugPath := filepath.Join(repoPath, "debug")
if err := os.MkdirAll(debugPath, 0755); err != nil {
return nil, fmt.Errorf("error creating debug directory: %w", err)
}
traceFile := filepath.Join(debugPath, "rcmgr.json.gz")
opts = append(opts, rcmgr.WithTrace(traceFile))
}

mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
return nil, fmt.Errorf("error creating resource manager: %w", err)
}

lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mgr.Close()
}})

return mgr, nil
}
}

return mgr, nil
func logScale(val int) int {
bitlen := bits.Len(uint(val))
return 1 << bitlen
}

func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
Expand Down