From d377fffa6ca8e4cbe6049faaaf0d8f8579e2b601 Mon Sep 17 00:00:00 2001 From: ironbeer <7997273+ironbeer@users.noreply.github.com> Date: Thu, 15 Feb 2024 04:07:36 +0900 Subject: [PATCH] Improved throttling --- config/config.go | 57 +++++--- config/config_test.go | 39 +++-- p2p/node.go | 329 +++++++++++++++++++++++++++--------------- p2p/node_test.go | 161 +++++++++++++-------- util/sync.go | 49 +++++++ util/sync_test.go | 36 +++++ 6 files changed, 456 insertions(+), 215 deletions(-) create mode 100644 util/sync.go create mode 100644 util/sync_test.go diff --git a/config/config.go b/config/config.go index 5dc78d2..cd79f86 100644 --- a/config/config.go +++ b/config/config.go @@ -18,8 +18,6 @@ var ( defaults = map[string]interface{}{ "verse_layer.discovery.refresh_interval": time.Hour, - "p2p.publish_interval": 5 * time.Minute, - "p2p.stream_timeout": 10 * time.Second, "p2p.no_announce": []string{ "/ip4/127.0.0.1/ipcidr/8", "/ip4/10.0.0.0/ipcidr/8", @@ -32,14 +30,19 @@ var ( "/ip4/172.16.0.0/ipcidr/12", "/ip4/192.168.0.0/ipcidr/16", }, - "p2p.transports.tcp": true, - "p2p.transports.quic": true, - "p2p.nat.upnp": true, - "p2p.nat.autonat": true, - "p2p.nat.holepunch": true, - "p2p.relay_client.enable": true, - "p2p.experimental.concurrency": 40, - "p2p.experimental.sig_send_throttling": 2000, + "p2p.transports.tcp": true, + "p2p.transports.quic": true, + "p2p.nat.upnp": true, + "p2p.nat.autonat": true, + "p2p.nat.holepunch": true, + "p2p.relay_client.enable": true, + "p2p.publish_interval": 5 * time.Minute, + "p2p.stream_timeout": 10 * time.Second, + "p2p.outbound_limits.concurrency": 10, + "p2p.outbound_limits.throttling": 500, + "p2p.inbound_limits.concurrency": 10, + "p2p.inbound_limits.throttling": 500, + "p2p.inbound_limits.max_send_time": 30 * time.Second, "ipc.sockname": "oasvlfy", @@ -240,12 +243,6 @@ type P2P struct { // Deprecated: Address and port to listen. Listen string `json:"listen" validate:"omitempty,hostname_port"` - // Interval to publish own signature status. - PublishInterval time.Duration `json:"publish_interval" mapstructure:"publish_interval"` - - // Timeout for P2P stream communication. - StreamTimeout time.Duration `json:"stream_timeout" mapstructure:"stream_timeout"` - // Initial node list. Bootnodes []string `json:"bootnodes"` @@ -290,14 +287,30 @@ type P2P struct { RelayNodes []string `json:"relay_nodes" mapstructure:"relay_nodes"` } `json:"relay_client" mapstructure:"relay_client"` - // Configuration for experimental features. - Experimental struct { - // Number of concurrent executions. + // Interval to publish own signature status. + PublishInterval time.Duration `json:"publish_interval" mapstructure:"publish_interval"` + + // Timeout for P2P stream communication. + StreamTimeout time.Duration `json:"stream_timeout" mapstructure:"stream_timeout"` + + OutboundLimits struct { + // Maximum number of concurrent signature requests from oneself to peers. Concurrency int `json:"concurrency"` - // Number of signatures that can be sent per second. - SigSendThrottling int `json:"sig_send_throttling" mapstructure:"sig_send_throttling"` - } `json:"experimental"` + // The number of signatures that can be sent to peers per second. + Throttling int `json:"throttling"` + } `json:"outbound_limits" mapstructure:"outbound_limits"` + + InboundLimits struct { + // Maximum number of concurrent signature requests from peers to oneself. + Concurrency int `json:"concurrency"` + + // The number of signatures that can be sent to peers per second. + Throttling int `json:"throttling"` + + // Maximum time to send signatures to a peer. + MaxSendTime time.Duration `json:"max_send_time" mapstructure:"max_send_time"` + } `json:"inbound_limits" mapstructure:"inbound_limits"` } type IPC struct { diff --git a/config/config_test.go b/config/config_test.go index 131dddb..34fb67c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -52,8 +52,6 @@ func (s *ConfigTestSuite) TestParseConfig() { - noann0 connection_filter: - connfil0 - publish_interval: 5s - stream_timeout: 5s bootnodes: - /ip4/127.0.0.1/tcp/20002/p2p/12D3KooWCNqRgVdwAhGrurCc8XE4RsWB8S2T83yMZR9R7Gdtf899 relay_service: @@ -69,7 +67,6 @@ func (s *ConfigTestSuite) TestParseConfig() { max_reservations_per_asn: 9 relay_client: relay_nodes: ["relay-0", "relay-1"] - enable_hole_punching: true ipc: sockname: testsock @@ -177,9 +174,7 @@ func (s *ConfigTestSuite) TestParseConfig() { TCP: true, QUIC: true, }, - Listen: "", - PublishInterval: 5 * time.Second, - StreamTimeout: 5 * time.Second, + Listen: "", Bootnodes: []string{ "/ip4/127.0.0.1/tcp/20002/p2p/12D3KooWCNqRgVdwAhGrurCc8XE4RsWB8S2T83yMZR9R7Gdtf899", }, @@ -222,12 +217,23 @@ func (s *ConfigTestSuite) TestParseConfig() { Enable: true, RelayNodes: []string{"relay-0", "relay-1"}, }, - Experimental: struct { - Concurrency int "json:\"concurrency\"" - SigSendThrottling int "json:\"sig_send_throttling\" mapstructure:\"sig_send_throttling\"" + PublishInterval: 5 * time.Minute, + StreamTimeout: 10 * time.Second, + OutboundLimits: struct { + Concurrency int "json:\"concurrency\"" + Throttling int "json:\"throttling\"" }{ - Concurrency: 40, - SigSendThrottling: 2000, + Concurrency: 10, + Throttling: 500, + }, + InboundLimits: struct { + Concurrency int "json:\"concurrency\"" + Throttling int "json:\"throttling\"" + MaxSendTime time.Duration "json:\"max_send_time\" mapstructure:\"max_send_time\"" + }{ + Concurrency: 10, + Throttling: 500, + MaxSendTime: 30 * time.Second, }, }, got.P2P) @@ -399,13 +405,16 @@ func (s *ConfigTestSuite) TestDefaultValues() { }, got.P2P.ConnectionFilter) s.Equal(true, got.P2P.Transports.TCP) s.Equal(true, got.P2P.Transports.QUIC) - s.Equal(5*time.Minute, got.P2P.PublishInterval) - s.Equal(10*time.Second, got.P2P.StreamTimeout) s.Equal(true, got.P2P.NAT.UPnP) s.Equal(true, got.P2P.NAT.AutoNAT) s.Equal(true, got.P2P.NAT.HolePunch) - s.Equal(40, got.P2P.Experimental.Concurrency) - s.Equal(2000, got.P2P.Experimental.SigSendThrottling) + s.Equal(5*time.Minute, got.P2P.PublishInterval) + s.Equal(10*time.Second, got.P2P.StreamTimeout) + s.Equal(10, got.P2P.OutboundLimits.Concurrency) + s.Equal(500, got.P2P.OutboundLimits.Throttling) + s.Equal(10, got.P2P.InboundLimits.Concurrency) + s.Equal(500, got.P2P.InboundLimits.Throttling) + s.Equal(30*time.Second, got.P2P.InboundLimits.MaxSendTime) s.Equal("oasvlfy", got.IPC.Sockname) diff --git a/p2p/node.go b/p2p/node.go index f00f3ca..e0bac37 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -1,6 +1,7 @@ package p2p import ( + "bytes" "context" "errors" "fmt" @@ -28,6 +29,7 @@ import ( "github.com/oasysgames/oasys-optimism-verifier/util" "github.com/oasysgames/oasys-optimism-verifier/verselayer" "github.com/oklog/ulid/v2" + "golang.org/x/sync/semaphore" "golang.org/x/time/rate" "google.golang.org/protobuf/proto" ) @@ -40,6 +42,9 @@ const ( var ( eom = &pb.Stream{Body: &pb.Stream_Eom{Eom: nil}} tenMillionEther = new(big.Int).Mul(big.NewInt(params.Ether), big.NewInt(10_000_000)) + + // miscellaneous messages + misc_SIGRECEIVED = []byte("SIGNATURES_RECEIVED") ) type Node struct { @@ -53,10 +58,12 @@ type Node struct { ignoreSigners map[common.Address]int stakemanager *stakemanager.Cache - throttling *rate.Limiter - topic *ps.Topic - sub *ps.Subscription - log log.Logger + topic *ps.Topic + sub *ps.Subscription + log log.Logger + + outboundSem, inboundSem *semaphore.Weighted + outboundThrot, inboundThrot *rate.Limiter meterPubsubSubscribed, meterPubsubUnknownMsg, @@ -109,11 +116,16 @@ func NewNode( hubLayerChainID: new(big.Int).SetUint64(hubLayerChainID), ignoreSigners: map[common.Address]int{}, stakemanager: stakemanager, - throttling: rate.NewLimiter(rate.Limit( - cfg.Experimental.SigSendThrottling), cfg.Experimental.SigSendThrottling), - topic: topic, - sub: sub, - log: log.New("worker", "p2p"), + topic: topic, + sub: sub, + log: log.New("worker", "p2p"), + + outboundSem: semaphore.NewWeighted(int64(cfg.OutboundLimits.Concurrency)), + inboundSem: semaphore.NewWeighted(int64(cfg.InboundLimits.Concurrency)), + outboundThrot: rate.NewLimiter( + rate.Limit(cfg.OutboundLimits.Throttling), cfg.OutboundLimits.Throttling), + inboundThrot: rate.NewLimiter( + rate.Limit(cfg.InboundLimits.Throttling), cfg.InboundLimits.Throttling), meterPubsubSubscribed: meter.GetOrRegisterCounter([]string{"p2p", "pubsub", "subscribed"}, ""), meterPubsubUnknownMsg: meter.GetOrRegisterCounter([]string{"p2p", "pubsub", "unknown", "messages"}, ""), @@ -138,7 +150,6 @@ func NewNode( meterRelayStopStreams: meter.GetOrRegisterGauge([]string{"p2p", "relaystop", "streams"}, ""), meterVerifierStreams: meter.GetOrRegisterGauge([]string{"p2p", "verifier", "streams"}, ""), } - worker.h.SetStreamHandler(streamProtocol, worker.handleStream) for _, addr := range ignoreSigners { worker.ignoreSigners[addr] = 1 @@ -150,6 +161,7 @@ func NewNode( func (w *Node) Start(ctx context.Context) { defer w.topic.Close() defer w.sub.Cancel() + w.h.SetStreamHandler(streamProtocol, w.newStreamHandler(ctx)) wg := &sync.WaitGroup{} @@ -226,8 +238,7 @@ func (w *Node) subscribeLoop(ctx context.Context) { } // Storing workers and jobs. - // The maximum number of elements is equal to the number of signers. - workers := util.NewWorkerGroup(w.cfg.Experimental.Concurrency) + workers := util.NewWorkerGroup(100) procs := &sync.Map{} for { @@ -276,6 +287,9 @@ func (w *Node) subscribeLoop(ctx context.Context) { if data, ok := procs.Load(wname); ok { proc := data.(*job) + if peer == proc.peer { + continue + } if strings.Compare(remote.Id, proc.remote.Id) < 1 { w.log.Debug("Skipped old signature", append(proc.logctx, "skipped-peer", peer, "skipped-id", remote.Id)...) @@ -299,40 +313,43 @@ func (w *Node) subscribeLoop(ctx context.Context) { } } -func (w *Node) handleStream(s network.Stream) { - defer w.closeStream(s) - w.meterStreamHandled.Incr() +func (w *Node) newStreamHandler(ctx context.Context) network.StreamHandler { + return func(s network.Stream) { + defer w.closeStream(s) - peer := s.Conn().RemotePeer() - for { - m, err := w.readStream(s) - if t, ok := err.(*ReadWriteError); ok { - w.log.Error("Failed to read stream message", "peer", peer, "err", t) - break - } else if err != nil { - w.log.Error(err.Error(), "peer", peer) - continue - } + w.meterStreamHandled.Incr() - var disconnect bool - switch t := m.Body.(type) { - case *pb.Stream_OptimismSignatureExchange: - // received signature exchange request or response - disconnect = w.handleOptimismSignatureExchangeFromStream(s, t.OptimismSignatureExchange) - case *pb.Stream_FindCommonOptimismSignature: - // received FindCommonOptimismSignature request - w.handleFindCommonOptimismSignature(s, t.FindCommonOptimismSignature) - case *pb.Stream_Eom: - // received last message - return - default: - w.log.Warn("Received an unknown message", "peer", peer) - w.meterStreamUnknownMsg.Incr() - return - } + peer := s.Conn().RemotePeer() + for { + m, err := w.readStream(s) + if t, ok := err.(*ReadWriteError); ok { + w.log.Debug("Failed to read stream message", "peer", peer, "err", t) + return + } else if err != nil { + w.log.Debug(err.Error(), "peer", peer) + continue + } - if disconnect { - break + var disconnect bool + switch t := m.Body.(type) { + case *pb.Stream_FindCommonOptimismSignature: + // received FindCommonOptimismSignature request + disconnect = w.handleFindCommonOptimismSignature(s, t.FindCommonOptimismSignature) + case *pb.Stream_OptimismSignatureExchange: + // received signature exchange request + disconnect = w.handleOptimismSignatureExchangeRequest(ctx, s, t) + case *pb.Stream_Eom: + // received last message + return + default: + w.log.Warn("Received an unknown message", "peer", peer) + w.meterStreamUnknownMsg.Incr() + return + } + + if disconnect { + return + } } } } @@ -360,6 +377,9 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( if err != nil { w.log.Error("Failed to find the latest signature", append(logctx, "err", err)...) return + } else if len(local) > 0 && strings.Compare(local[0].ID, remote.Id) == 1 { + // fully synchronized or less than local + return } // open stream to peer @@ -372,8 +392,14 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( return nil } } + returned := make(chan any) + defer func() { close(returned) }() go func() { - <-ctx.Done() + select { + case <-ctx.Done(): + // canceled because newer signature were received + case <-returned: + } if s != nil { w.closeStream(s) } @@ -382,14 +408,11 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( var idAfter string if len(local) == 0 { w.log.Info("Request all signatures", logctx...) - } else if strings.Compare(local[0].ID, remote.Id) == 1 { - // fully synchronized or less than local - return } else { if openStream() != nil { return } - if found, err := w.findCommonLatestSignature(s, signer); err == nil { + if found, err := w.findCommonLatestSignature(ctx, s, signer); err == nil { fsigner := common.BytesToAddress(found.Signer) if fsigner != signer { w.log.Error("Signer does not match", append(logctx, "found-signer", fsigner)...) @@ -399,7 +422,7 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( idAfter = found.Id w.log.Info("Found common signature from peer", "signer", signer, "id", found.Id, "previous-id", found.PreviousId) - } else { + } else if errors.Is(err, database.ErrNotFound) { if localID, err := ulid.ParseStrict(local[0].ID); err == nil { // Prevent out-of-sync by specifying the ID of 1 second ago ms := localID.Time() - 1000 @@ -409,6 +432,8 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( w.log.Error("Failed to parse ULID", "local-id", local[0].ID, "err", err) return } + } else { + return } w.log.Info("Request signatures", append(logctx, "id-after", idAfter)...) @@ -427,7 +452,6 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( }, }, } - if s == nil && openStream() != nil { return } @@ -435,72 +459,132 @@ func (w *Node) handleOptimismSignatureExchangeFromPubSub( w.log.Error("Failed to send signature request", "err", err) return } - if err := w.writeStream(s, eom); err != nil { - w.log.Error("Failed to send end-of-message", "err", err) - return - } - // wait for signature exchange response - w.handleStream(s) + w.handleOptimismSignatureExchangeResponses(ctx, s) } -func (w *Node) handleOptimismSignatureExchangeFromStream( +func (w *Node) handleOptimismSignatureExchangeRequest( + ctx context.Context, s network.Stream, - recv *pb.OptimismSignatureExchange, + request *pb.Stream_OptimismSignatureExchange, ) (disconnect bool) { peerID := s.Conn().RemotePeer() logctx := []interface{}{"peer", peerID} - if len(recv.Requests) > 0 { - // received signature exchange request - limit := w.cfg.Experimental.SigSendThrottling / w.cfg.Experimental.Concurrency - for _, req := range recv.Requests { - signer := common.BytesToAddress(req.Signer) - if w.stakemanager.StakeBySigner(signer).Cmp(tenMillionEther) == -1 { - continue + requests := request.OptimismSignatureExchange.GetRequests() + if len(requests) == 0 { + w.log.Warn("No requests", logctx...) + return false + } + + // number of signatures obtained from the database + queryLimit := w.cfg.InboundLimits.Throttling / w.cfg.InboundLimits.Concurrency + + // sending time limit + isTimeup, timePenalty := func() (func() bool, func()) { + limit := time.Now().Add(w.cfg.InboundLimits.MaxSendTime) + return func() bool { + return time.Now().After(limit) + }, func() { + limit = limit.Add(-(w.cfg.InboundLimits.MaxSendTime / 3)) } + }() - logctx := append(logctx, "signer", signer, "id-after", req.IdAfter) - w.log.Info("Received signature request", logctx...) - - for offset := 0; offset < limit*10; offset += limit { - // get latest signatures for each requested signer - sigs, err := w.db.Optimism.FindSignatures( - &req.IdAfter, &signer, nil, nil, limit, offset) - if err != nil { - w.log.Error("Failed to find requested signatures", - append(logctx, "err", err)...) - break - } + // By finely acquiring the semaphore, it prevents + // other peers from being blocked for a long time. + sem := util.NewReleaseGuardSemaphore(w.inboundSem) + defer sem.ReleaseALL() - sigLen := len(sigs) - if sigLen == 0 { - break // reached the last - } - w.sigSendThrottling(sigLen, "in", "handleOptimismSignatureExchange", "peerID", peerID) + for _, req := range requests { + signer := common.BytesToAddress(req.Signer) + if w.stakemanager.StakeBySigner(signer).Cmp(tenMillionEther) == -1 { + continue + } - responses := make([]*pb.OptimismSignature, sigLen) - for i, sig := range sigs { - responses[i] = toProtoBufSig(sig) - } - m := &pb.Stream{Body: &pb.Stream_OptimismSignatureExchange{ - OptimismSignatureExchange: &pb.OptimismSignatureExchange{ - Responses: responses, - }, - }} + logctx := append(logctx, "signer", signer, "id-after", req.IdAfter) + w.log.Info("Received signature request", logctx...) - // send response to peer - if err := w.writeStream(s, m); err != nil { - w.log.Error("Failed to send signatures", append(logctx, "err", err)...) - return true - } + for offset := 0; ; offset += queryLimit { + if isTimeup() { + w.log.Warn("Time up", logctx...) + return true + } else if err := sem.Acquire(ctx, 1); err != nil { + w.log.Error("Failed to acquire inbound semaphore", append(logctx, "err", err)...) + return true + } + + // get latest signatures for each requested signer + sigs, err := w.db.Optimism.FindSignatures( + &req.IdAfter, &signer, nil, nil, queryLimit, offset) + sem.ReleaseALL() + if err != nil { + w.log.Error("Failed to find requested signatures", + append(logctx, "err", err)...) + break + } + + sigLen := len(sigs) + if sigLen == 0 { + break // reached the last + } + w.throttling(w.inboundThrot, sigLen, + "in", "handleOptimismSignatureExchangeRequest", "peer", peerID) + + responses := make([]*pb.OptimismSignature, sigLen) + for i, sig := range sigs { + responses[i] = toProtoBufSig(sig) + } + m := &pb.Stream{Body: &pb.Stream_OptimismSignatureExchange{ + OptimismSignatureExchange: &pb.OptimismSignatureExchange{ + Responses: responses, + }, + }} + + // send response to peer + if err := w.writeStream(s, m); err != nil { + w.log.Error("Failed to send signatures", append(logctx, "err", err)...) + return true + } + w.log.Info("Sent signatures", append(logctx, "sents", sigLen)...) + + // wait for received notify + if m, err = w.readStream(s); err == nil && bytes.Equal(m.GetMisc(), misc_SIGRECEIVED) { + w.log.Info("Received notification of receipt", logctx...) + } else { + timePenalty() + } + } + } + + return false +} + +func (w *Node) handleOptimismSignatureExchangeResponses(ctx context.Context, s network.Stream) { + peerID := s.Conn().RemotePeer() + logctx := []interface{}{"peer", peerID} + + for { + m, err := w.readStream(s) + if err != nil { + w.log.Debug("Failed to read stream message", append(logctx, "err", err)...) + return + } - w.log.Info("Sent signatures", append(logctx, "sents", sigLen)...) + body := m.GetOptimismSignatureExchange() + if body == nil { + if m.GetEom() != nil { + w.log.Warn("Received an unknown message", logctx...) + w.meterStreamUnknownMsg.Incr() } + return + } + + responses := body.GetResponses() + if len(responses) == 0 { + return } - } else if len(recv.Responses) > 0 { - // save received signatures - for _, res := range recv.Responses { + + for _, res := range responses { signer := common.BytesToAddress(res.Signer) scc := common.BytesToAddress(res.Scc) logctx := append(logctx, @@ -559,18 +643,19 @@ func (w *Node) handleOptimismSignatureExchangeFromStream( } w.log.Info("Received new signature", logctx...) } - } - return false + // send received notify + w.writeStream(s, &pb.Stream{Body: &pb.Stream_Misc{Misc: misc_SIGRECEIVED}}) + } } func (w *Node) handleFindCommonOptimismSignature( s network.Stream, recv *pb.FindCommonOptimismSignature, -) { +) (disconnect bool) { remotes := recv.Locals if len(remotes) == 0 { - return + return false } w.log.Info("Received FindCommonOptimismSignature request", @@ -584,7 +669,7 @@ func (w *Node) handleFindCommonOptimismSignature( } if err != nil { w.log.Error("Failed to find signature", "remote-id", remote.Id, "err", err) - return + return true } if local.PreviousID == remote.PreviousId { found = toProtoBufSig(local) @@ -597,30 +682,42 @@ func (w *Node) handleFindCommonOptimismSignature( FindCommonOptimismSignature: &pb.FindCommonOptimismSignature{Found: found}, }, } - if err := w.writeStream(s, m); err == nil { + if err := w.writeStream(s, m); err != nil { + w.log.Error("Failed to send FindCommonOptimismSignature response", "err", err) + return true + } else { if found == nil { w.log.Info("Sent FindCommonOptimismSignature response", "found", found != nil) } else { w.log.Info("Sent FindCommonOptimismSignature response", "found", found != nil, "id", found.Id, "previous-id", found.PreviousId) } - } else { - w.log.Error("Failed to send FindCommonOptimismSignature response", "err", err) } + return false } // Find the latest signature of the same ID and PreviousID from peer func (w *Node) findCommonLatestSignature( + ctx context.Context, s network.Stream, signer common.Address, ) (*pb.OptimismSignature, error) { peerID := s.Conn().RemotePeer() logctx := []interface{}{"peer", peerID, "signer", signer} - limit := w.cfg.Experimental.SigSendThrottling / w.cfg.Experimental.Concurrency + limit := w.cfg.OutboundLimits.Throttling / w.cfg.OutboundLimits.Concurrency + + sem := util.NewReleaseGuardSemaphore(w.outboundSem) + defer sem.ReleaseALL() for offset := 0; ; offset += limit { + if err := sem.Acquire(ctx, 1); err != nil { + w.log.Error("Failed to acquire outbound semaphore", append(logctx, "err", err)...) + return nil, err + } + // find local latest signatures (order by: id desc) sigs, err := w.db.Optimism.FindLatestSignaturesBySigner(signer, limit, offset) + sem.ReleaseALL() if err != nil { w.log.Error("Failed to find latest signatures", append(logctx, "err", err)...) return nil, err @@ -630,7 +727,7 @@ func (w *Node) findCommonLatestSignature( if sigLen == 0 { break // reached the last } - w.sigSendThrottling(sigLen, "in", "findCommonLatestSignature", "peerID", peerID) + w.throttling(w.outboundThrot, sigLen, "in", "findCommonLatestSignature", "peer", peerID) logctx = append(logctx, "from", sigs[0].ID, "to", sigs[sigLen-1].ID) @@ -674,7 +771,7 @@ func (w *Node) findCommonLatestSignature( } w.log.Warn("Common signature not found", "signer", signer) - return nil, errors.New("not found") + return nil, database.ErrNotFound } func (w *Node) publishLatestSignatures(ctx context.Context) { @@ -802,12 +899,16 @@ func (w *Node) showBootstrapLog() { w.log.Info("Worker started", "id", w.h.ID(), "publish-interval", w.cfg.PublishInterval, "stream-timeout", w.cfg.StreamTimeout, - "sig-send-throttling", w.cfg.Experimental.SigSendThrottling, + "outbound-limits-concurrency", w.cfg.OutboundLimits.Concurrency, + "outbound-limits-throttling", w.cfg.OutboundLimits.Throttling, + "inbound-limits-concurrency", w.cfg.InboundLimits.Concurrency, + "inbound-limits-maxsendtime", w.cfg.InboundLimits.MaxSendTime, + "inbound-limits-throttling", w.cfg.InboundLimits.Throttling, ) } -func (w *Node) sigSendThrottling(num int, logCtx ...any) { - rsv := w.throttling.ReserveN(time.Now(), num) +func (w *Node) throttling(limiter *rate.Limiter, num int, logCtx ...any) { + rsv := limiter.ReserveN(time.Now(), num) if !rsv.OK() { w.log.Error("num is greater than burst", logCtx...) return diff --git a/p2p/node_test.go b/p2p/node_test.go index d58b57b..5ff0b9e 100644 --- a/p2p/node_test.go +++ b/p2p/node_test.go @@ -191,14 +191,12 @@ func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeFromPubSub() { // set assertion func to subscriber var ( - mu = &sync.Mutex{} - reads = []*pb.Stream{} - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + reads = []*pb.Stream{} + wg sync.WaitGroup ) - defer cancel() + wg.Add(len(cases)) s.node2.h.SetStreamHandler(streamProtocol, func(st network.Stream) { - mu.Lock() - defer mu.Unlock() + defer wg.Done() defer closeStream(st) for { @@ -206,91 +204,113 @@ func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeFromPubSub() { reads = append(reads, m) switch m.Body.(type) { - case *pb.Stream_Eom: - return case *pb.Stream_FindCommonOptimismSignature: writeStream(st, &pb.Stream{Body: &pb.Stream_FindCommonOptimismSignature{ FindCommonOptimismSignature: &pb.FindCommonOptimismSignature{ Found: nil, }, }}) + case *pb.Stream_OptimismSignatureExchange: + writeStream(st, eom) + case *pb.Stream_Eom: + return } } }) // publish message for _, tt := range cases { - go s.node1.handleOptimismSignatureExchangeFromPubSub(ctx, s.node2.h.ID(), tt.msg) + go s.node1.handleOptimismSignatureExchangeFromPubSub(context.Background(), s.node2.h.ID(), tt.msg) time.Sleep(time.Millisecond * 50) } - <-ctx.Done() + wg.Wait() - s.Len(reads, 12) + s.Len(reads, 16) // signer0 gots0 := reads[0].GetFindCommonOptimismSignature().Locals - s.Len(gots0, 100) + s.Len(gots0, 50) s.Equal(gots0[0].Id, s.sigs[s.signer0][s.scc1][99].ID) - s.Equal(gots0[99].Id, s.sigs[s.signer0][s.scc1][0].ID) + s.Equal(gots0[49].Id, s.sigs[s.signer0][s.scc1][50].ID) gots1 := reads[1].GetFindCommonOptimismSignature().Locals s.Len(gots1, 50) - s.Equal(gots1[0].Id, s.sigs[s.signer0][s.scc0][49].ID) - s.Equal(gots1[49].Id, s.sigs[s.signer0][s.scc0][0].ID) + s.Equal(gots1[0].Id, s.sigs[s.signer0][s.scc1][49].ID) + s.Equal(gots1[49].Id, s.sigs[s.signer0][s.scc1][0].ID) + + gots2 := reads[2].GetFindCommonOptimismSignature().Locals + s.Len(gots2, 50) + s.Equal(gots2[0].Id, s.sigs[s.signer0][s.scc0][49].ID) + s.Equal(gots2[49].Id, s.sigs[s.signer0][s.scc0][0].ID) - gots2 := reads[2].GetOptimismSignatureExchange().Requests - s.Len(gots2, 1) - s.Equal(cases[0].want.signer[:], gots2[0].Signer) + gots3 := reads[3].GetOptimismSignatureExchange().Requests + s.Len(gots3, 1) + s.Equal(cases[0].want.signer[:], gots3[0].Signer) func() { - gt := ulid.MustParse(gots2[0].IdAfter) + gt := ulid.MustParse(gots3[0].IdAfter) wt := ulid.MustParse(cases[0].want.idAfter) s.Equal(wt.Time()-1000, gt.Time()) }() - gots3 := reads[3].GetEom() - s.NotNil(gots3) + gots4 := reads[4].GetEom() + s.NotNil(gots4) // signer1 - gots4 := reads[4].GetFindCommonOptimismSignature().Locals - s.Len(gots4, 100) - s.Equal(gots4[0].Id, s.sigs[s.signer1][s.scc1][199].ID) - s.Equal(gots4[99].Id, s.sigs[s.signer1][s.scc1][100].ID) - gots5 := reads[5].GetFindCommonOptimismSignature().Locals - s.Len(gots5, 100) - s.Equal(gots5[0].Id, s.sigs[s.signer1][s.scc1][99].ID) - s.Equal(gots5[99].Id, s.sigs[s.signer1][s.scc1][0].ID) + s.Len(gots5, 50) + s.Equal(gots5[0].Id, s.sigs[s.signer1][s.scc1][199].ID) + s.Equal(gots5[49].Id, s.sigs[s.signer1][s.scc1][150].ID) gots6 := reads[6].GetFindCommonOptimismSignature().Locals - s.Len(gots6, 100) - s.Equal(gots6[0].Id, s.sigs[s.signer1][s.scc0][149].ID) - s.Equal(gots6[99].Id, s.sigs[s.signer1][s.scc0][50].ID) + s.Len(gots6, 50) + s.Equal(gots6[0].Id, s.sigs[s.signer1][s.scc1][149].ID) + s.Equal(gots6[49].Id, s.sigs[s.signer1][s.scc1][100].ID) gots7 := reads[7].GetFindCommonOptimismSignature().Locals s.Len(gots7, 50) - s.Equal(gots7[0].Id, s.sigs[s.signer1][s.scc0][49].ID) - s.Equal(gots7[49].Id, s.sigs[s.signer1][s.scc0][0].ID) - - gots8 := reads[8].GetOptimismSignatureExchange().Requests - s.Len(gots8, 1) - s.Equal(cases[1].want.signer[:], gots8[0].Signer) + s.Equal(gots7[0].Id, s.sigs[s.signer1][s.scc1][99].ID) + s.Equal(gots7[49].Id, s.sigs[s.signer1][s.scc1][50].ID) + + gots8 := reads[8].GetFindCommonOptimismSignature().Locals + s.Len(gots8, 50) + s.Equal(gots8[0].Id, s.sigs[s.signer1][s.scc1][49].ID) + s.Equal(gots8[49].Id, s.sigs[s.signer1][s.scc1][0].ID) + + gots9 := reads[9].GetFindCommonOptimismSignature().Locals + s.Len(gots9, 50) + s.Equal(gots9[0].Id, s.sigs[s.signer1][s.scc0][149].ID) + s.Equal(gots9[49].Id, s.sigs[s.signer1][s.scc0][100].ID) + + gots10 := reads[10].GetFindCommonOptimismSignature().Locals + s.Len(gots10, 50) + s.Equal(gots10[0].Id, s.sigs[s.signer1][s.scc0][99].ID) + s.Equal(gots10[49].Id, s.sigs[s.signer1][s.scc0][50].ID) + + gots11 := reads[11].GetFindCommonOptimismSignature().Locals + s.Len(gots11, 50) + s.Equal(gots11[0].Id, s.sigs[s.signer1][s.scc0][49].ID) + s.Equal(gots11[49].Id, s.sigs[s.signer1][s.scc0][0].ID) + + gots12 := reads[12].GetOptimismSignatureExchange().Requests + s.Len(gots12, 1) + s.Equal(cases[1].want.signer[:], gots12[0].Signer) func() { - gt := ulid.MustParse(gots8[0].IdAfter) + gt := ulid.MustParse(gots12[0].IdAfter) wt := ulid.MustParse(cases[1].want.idAfter) s.Equal(wt.Time()-1000, gt.Time()) }() - gots9 := reads[9].GetEom() - s.NotNil(gots9) + gots13 := reads[13].GetEom() + s.NotNil(gots13) // signer2 - gots10 := reads[10].GetOptimismSignatureExchange().Requests - s.Len(gots10, 1) - s.Equal(cases[2].want.signer[:], gots10[0].Signer) - s.Equal("", gots10[0].IdAfter) + gots14 := reads[14].GetOptimismSignatureExchange().Requests + s.Len(gots14, 1) + s.Equal(cases[2].want.signer[:], gots14[0].Signer) + s.Equal("", gots14[0].IdAfter) - gots11 := reads[11].GetEom() - s.NotNil(gots11) + gots15 := reads[15].GetEom() + s.NotNil(gots15) } func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeRequests() { @@ -322,10 +342,18 @@ func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeRequests() { }, }, }}) - writeStream(st, eom) - // read messages - reads := s.readsStream(st) + var reads []*pb.Stream + for _, wants := range wantss { + for range wants { + // read message + m, _ := readStream(st) + reads = append(reads, m) + + // send receipt notify + writeStream(st, &pb.Stream{Body: &pb.Stream_Misc{Misc: misc_SIGRECEIVED}}) + } + } closeStream(st) // assert @@ -436,7 +464,15 @@ func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeResponses() { }, } - // send message + // set stream handler to node1 + var wg sync.WaitGroup + wg.Add(1) + s.node1.h.SetStreamHandler(streamProtocol, func(st network.Stream) { + s.node1.handleOptimismSignatureExchangeResponses(context.Background(), st) + wg.Done() + }) + + // send message from node2 to node1 responses := []*pb.OptimismSignature{} for _, tt := range cases { m := verselayer.NewSccMessage( @@ -456,16 +492,11 @@ func (s *NodeTestSuite) TestHandleOptimismSignatureExchangeResponses() { writeStream(st, &pb.Stream{Body: &pb.Stream_OptimismSignatureExchange{ OptimismSignatureExchange: &pb.OptimismSignatureExchange{Responses: responses}, }}) - writeStream(st, eom) - - // read messages - reads := s.readsStream(st) closeStream(st) - // assert - s.Len(reads, 1) - s.NotNil(reads[0].GetEom()) + wg.Wait() + // assert for _, tt := range cases { signer := common.BytesToAddress(tt.want.Signer) scc := common.BytesToAddress(tt.want.Scc) @@ -575,15 +606,17 @@ func (s *NodeTestSuite) newWorker(bootnodes []string) *Node { PublishInterval: 0, StreamTimeout: 3 * time.Second, } - cfg.Experimental.Concurrency = 10 - cfg.Experimental.SigSendThrottling = 1000 + cfg.OutboundLimits.Concurrency = 10 + cfg.OutboundLimits.Throttling = 500 + cfg.InboundLimits.Concurrency = 10 + cfg.InboundLimits.Throttling = 1000 + cfg.InboundLimits.MaxSendTime = time.Second * 5 host, dht, bwm, hpHelper, _ := NewHost(context.Background(), cfg, priv) - worker, err := NewNode(cfg, db, host, dht, bwm, hpHelper, + worker, _ := NewNode(cfg, db, host, dht, bwm, hpHelper, s.b0.ChainID().Uint64(), []common.Address{}, s.stakemanager) - if err != nil { - s.Fail(err.Error()) - } + host.SetStreamHandler(streamProtocol, + worker.newStreamHandler(context.Background())) return worker } diff --git a/util/sync.go b/util/sync.go new file mode 100644 index 0000000..c6ae0e2 --- /dev/null +++ b/util/sync.go @@ -0,0 +1,49 @@ +package util + +import ( + "context" + "sync" + + "golang.org/x/sync/semaphore" +) + +type ReleaseGuardSemaphore struct { + sem *semaphore.Weighted + mu sync.Mutex + cnt int64 +} + +func NewReleaseGuardSemaphore(sem *semaphore.Weighted) *ReleaseGuardSemaphore { + return &ReleaseGuardSemaphore{sem: sem} +} + +func (s *ReleaseGuardSemaphore) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.sem.Acquire(ctx, n); err != nil { + return err + } else { + s.cnt += n + return nil + } +} + +func (s *ReleaseGuardSemaphore) Release(n int64) { + s.mu.Lock() + defer s.mu.Unlock() + + if n > s.cnt { + n = s.cnt + } + s.sem.Release(n) + s.cnt -= n +} + +func (s *ReleaseGuardSemaphore) ReleaseALL() { + s.mu.Lock() + defer s.mu.Unlock() + + s.sem.Release(s.cnt) + s.cnt = 0 +} diff --git a/util/sync_test.go b/util/sync_test.go new file mode 100644 index 0000000..8af72b4 --- /dev/null +++ b/util/sync_test.go @@ -0,0 +1,36 @@ +package util + +import ( + "context" + "errors" + "testing" + "time" + + "golang.org/x/sync/semaphore" +) + +func TestReleaseGuardSemaphore(t *testing.T) { + parent := context.Background() + gsem := NewReleaseGuardSemaphore(semaphore.NewWeighted(2)) + + shouldErr := func() { + ctx, cancel := context.WithTimeout(parent, time.Millisecond*5) + defer cancel() + if !errors.Is(gsem.Acquire(ctx, 1), context.DeadlineExceeded) { + t.Error("Context.DeadlineExceeded should be returned") + } + } + + gsem.Acquire(parent, 1) + gsem.Acquire(parent, 1) + shouldErr() + + gsem.Release(1) + gsem.Release(1) + gsem.Acquire(parent, 2) + shouldErr() + + gsem.ReleaseALL() + gsem.Acquire(parent, 2) + shouldErr() +}