Skip to content

Commit

Permalink
Merge pull request #865 from lochjin/dev2.0
Browse files Browse the repository at this point in the history
fix:meerevm connection instability
  • Loading branch information
dindinw authored Dec 23, 2024
2 parents 257077a + 5b97800 commit 947a8de
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 18 deletions.
14 changes: 8 additions & 6 deletions core/blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,15 @@ func (b *BlockChain) Start() error {
b.wg.Add(1)
go b.handler()

// prepare evm env
mainTip := b.bd.GetMainChainTip()
evmHead, err := b.meerChain.PrepareEnvironment(mainTip.GetState())
if err != nil {
return err
if !b.IsSnapSyncing() {
// prepare evm env
mainTip := b.bd.GetMainChainTip()
evmHead, err := b.meerChain.PrepareEnvironment(mainTip.GetState())
if err != nil {
return err
}
log.Info("prepare evm environment", "mainTipOrder", mainTip.GetOrder(), "mainTipHash", mainTip.GetHash().String(), "hash", evmHead.Hash().String(), "number", evmHead.Number.Uint64(), "root", evmHead.Root.String())
}
log.Info("prepare evm environment", "mainTipOrder", mainTip.GetOrder(), "mainTipHash", mainTip.GetHash().String(), "hash", evmHead.Hash().String(), "number", evmHead.Number.Uint64(), "root", evmHead.Root.String())

return b.DB().Snapshot()
}
Expand Down
6 changes: 6 additions & 0 deletions meerevm/meer/meerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ func (b *MeerChain) prepareEnvironment(state model.BlockState) (*types.Header, e
list := []model.BlockState{state}
startState := b.consensus.BlockChain().GetBlockState(state.GetOrder() - 1)
for startState != nil && startState.GetEVMNumber() >= curBlockHeader.Number.Uint64() {
if system.InterruptRequested(b.consensus.Interrupt()) {
return nil, getError("shutdown interrupt")
}
if startState.GetEVMNumber() == curBlockHeader.Number.Uint64() &&
startState.GetEVMHash() == curBlockHeader.Hash() {
curBlockState = startState
Expand All @@ -440,6 +443,9 @@ func (b *MeerChain) prepareEnvironment(state model.BlockState) (*types.Header, e
}
log.Info("Find cur block state", "state.order", curBlockState.GetOrder(), "evm.Number", curBlockState.GetEVMNumber())
for i := len(list) - 1; i >= 0; i-- {
if system.InterruptRequested(b.consensus.Interrupt()) {
return nil, getError("shutdown interrupt")
}
if list[i].GetStatus().KnownInvalid() {
continue
}
Expand Down
15 changes: 11 additions & 4 deletions p2p/synch/meerevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,25 @@ func (s *Sync) establishMeerConnection(pe *peers.Peer) error {
if err != nil {
return common.NewErrorStr(common.ErrStreamBase, fmt.Sprintf("open stream on topic %v failed", topic)).ToError()
}
defer stream.Close()

common.EgressConnectMeter.Mark(1)
qc, err := NewMeerConn(stream)
if err != nil {
return err
}
pe.SetMeerConn(true)
defer pe.SetMeerConn(false)

_, err = s.p2p.MeerServer().Connect(qc, dest)

go func() {
_, err := s.p2p.MeerServer().Connect(qc, dest)
if err != nil {
log.Error(err.Error())
}
pe.SetMeerConn(false)
err = stream.Close()
if err != nil {
log.Error("Close meer conn", "err", err.Error())
}
}()
return err
}

Expand Down
23 changes: 17 additions & 6 deletions p2p/synch/peersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (ps *PeerSync) Stop() error {
ps.wg.Wait()

log.Info("P2P PeerSync Stoped")
ps.saveSnapSync()
return nil
}

Expand Down Expand Up @@ -399,12 +400,6 @@ func (ps *PeerSync) getBestPeer(snap bool, exclude map[peer.ID]struct{}) *peers.
var bestPeer *peers.Peer
equalPeers := []*peers.Peer{}
for _, sp := range ps.sy.peers.CanSyncPeers() {
if len(exclude) > 0 {
_, ok := exclude[sp.GetID()]
if ok {
continue
}
}
if snap {
if !isValidSnapPeer(sp) {
continue
Expand Down Expand Up @@ -445,6 +440,21 @@ func (ps *PeerSync) getBestPeer(snap bool, exclude map[peer.ID]struct{}) *peers.
if len(equalPeers) == 1 {
return equalPeers[0]
}
if len(exclude) > 0 {
filter := []*peers.Peer{}
for _, sp := range equalPeers {
_, ok := exclude[sp.GetID()]
if ok {
continue
}
filter = append(filter, sp)
}
if len(filter) == 1 {
return filter[0]
} else if len(filter) > 1 {
equalPeers = filter
}
}

index := int(rand.Int63n(int64(len(equalPeers))))
if index >= len(equalPeers) {
Expand Down Expand Up @@ -756,5 +766,6 @@ func NewPeerSync(sy *Sync) *PeerSync {
interrupt: make(chan struct{}),
lastBlockID: meerdag.MaxId,
}
peerSync.loadSnapSync()
return peerSync
}
2 changes: 1 addition & 1 deletion p2p/synch/snapstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (ss *SnapStatus) Decode(data []byte) error {
if err != nil {
return err
}
ss.peid, err = peer.Decode(string(peid))
ss.peid, err = peer.IDFromBytes(peid)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/synch/snapsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (ps *PeerSync) loadSnapSync() {
}
snapStatus.syncPoint = ps.Chain().BlockDAG().GetBlockById(uint(snapStatus.PointID))
if snapStatus.syncPoint == nil {
log.Error("Can't find snap status point", "id", snapStatus.PointID)
return
}
ps.snapStatus = snapStatus
Expand All @@ -51,7 +52,6 @@ func (ps *PeerSync) loadSnapSync() {
log.Info("End snap-sync", "err", err.Error())
return
}

}

func (ps *PeerSync) saveSnapSync() {
Expand Down

0 comments on commit 947a8de

Please sign in to comment.