Skip to content

Commit

Permalink
feat:compatible with peers(protover<=45) for tx propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
lochjin committed Dec 19, 2024
1 parent be7c0cf commit 4883ee1
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 21 deletions.
3 changes: 1 addition & 2 deletions consensus/model/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ package model

import (
"github.com/Qitmeer/qng/core/types"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/peer"
)

// Notify interface manage message announce & relay & notification between mempool, websocket, gbt long pull
// and rpc server.
type Notify interface {
AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID)
AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID)
RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID)
BroadcastMessage(data interface{})
TransactionConfirmed(tx *types.Tx)
Expand Down
77 changes: 65 additions & 12 deletions meerevm/meer/meerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ func (m *MeerPool) handler() {
}
continue
}
if !m.prepareMeerChangeTxs(ev.Txs) {
ret := m.prepareMeerChangeTxs(ev.Txs)
if len(ret) <= 0 {
continue
}

m.qTxPool.TriggerDirty()
m.p2pSer.Notify().AnnounceNewTransactions(nil, ev.Txs, nil)
m.AnnounceNewTransactions(ev.Txs)
m.dirty.Store(true)
// System stopped
case <-m.quit:
Expand All @@ -158,32 +159,26 @@ func (m *MeerPool) handler() {
}
}

func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) bool {
remove := 0
all := 0
func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) []*types.Transaction {
remaining := []*types.Transaction{}
for _, tx := range txs {
if tx == nil {
continue
}
all++
if meerchange.IsMeerChangeTx(tx) {
vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, true).Tx, nil)
if err != nil {
log.Error(err.Error())
m.ethTxPool.RemoveTx(tx.Hash(), true)
remove++
continue
}
err = m.consensus.BlockChain().VerifyMeerTx(vmtx)
if err != nil {
log.Error(err.Error())
m.ethTxPool.RemoveTx(tx.Hash(), true)
remove++
continue
}
}
remaining = append(remaining, tx)
}
return remove != all
return remaining
}

func (m *MeerPool) handleStallSample() {
Expand Down Expand Up @@ -295,6 +290,32 @@ func (m *MeerPool) GetSize() int64 {
return 0
}

func (m *MeerPool) AddTx(tx *qtypes.Tx) (int64, error) {
h := qcommon.ToEVMHash(&tx.Tx.TxIn[0].PreviousOut.Hash)
if m.eth.TxPool().Has(h) {
return 0, fmt.Errorf("already exists:%s (evm:%s)", tx.Hash().String(), h.String())
}
txb := qcommon.ToTxHex(tx.Tx.TxIn[0].SignScript)
var txmb = &types.Transaction{}
err := txmb.UnmarshalBinary(txb)
if err != nil {
return 0, err
}

errs := m.ethTxPool.AddRemotesSync(types.Transactions{txmb})
if len(errs) > 0 && errs[0] != nil {
return 0, errs[0]
}

log.Debug("Meer pool:add", "hash", tx.Hash(), "eHash", txmb.Hash())

//
cost := txmb.Cost()
cost = cost.Sub(cost, txmb.Value())
cost = cost.Div(cost, qcommon.Precision)
return cost.Int64(), nil
}

func (m *MeerPool) RemoveTx(tx *qtypes.Tx) error {
if !m.isRunning() {
return fmt.Errorf("meer pool is not running")
Expand Down Expand Up @@ -368,6 +389,38 @@ func (m *MeerPool) subscribe() {
}()
}

func (m *MeerPool) AnnounceNewTransactions(txs []*types.Transaction) error {
txds := []*qtypes.TxDesc{}

for _, tx := range txs {
m.snapshotMu.RLock()
qtx, ok := m.snapshotTxsM[tx.Hash().String()]
m.snapshotMu.RUnlock()
if !ok || qtx == nil {
qtx = &snapshotTx{tx: qcommon.ToQNGTx(tx, true), eHash: tx.Hash()}
if qtx.tx == nil {
continue
}
}
cost := tx.Cost()
cost = cost.Sub(cost, tx.Value())
cost = cost.Div(cost, qcommon.Precision)
fee := cost.Int64()

td := &qtypes.TxDesc{
Tx: qtx.tx,
Added: time.Now(),
Height: m.qTxPool.GetMainHeight(),
Fee: fee,
FeePerKB: fee * 1000 / int64(qtx.tx.Tx.SerializeSize()),
}

txds = append(txds, td)
}
m.p2pSer.Notify().AnnounceNewTransactions(nil, txds, nil)
return nil
}

func newMeerPool(consensus model.Consensus, eth *eth.Ethereum) *MeerPool {
log.Info(fmt.Sprintf("New Meer pool"))
m := &MeerPool{}
Expand Down
10 changes: 10 additions & 0 deletions p2p/peers/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,16 @@ func (p *Peer) IsSupportMeerP2PBridging() bool {
return p.chainState.ProtocolVersion >= protocol.MeerP2PBridgingProtocolVersion
}

func (p *Peer) IsSupportMeerpoolTransmission() bool {
p.lock.RLock()
defer p.lock.RUnlock()

if p.chainState == nil {
return false
}
return p.chainState.ProtocolVersion >= protocol.MeerPoolProtocolVersion
}

func (p *Peer) GetMeerConn() bool {
p.lock.RLock()
defer p.lock.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion p2p/synch/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (ps *PeerSync) OnMemPool(sp *peers.Peer, msg *MsgMemPool) error {
// per message. The NewMsgInvSizeHint function automatically limits
// the passed hint to the maximum allowed, so it's safe to pass it
// without double checking it here.
txDescs := ps.sy.p2p.TxMemPool().TxDescs(false)
txDescs := ps.sy.p2p.TxMemPool().TxDescs(!sp.IsSupportMeerpoolTransmission())

invs := []*pb.InvVect{}
for _, txDesc := range txDescs {
Expand Down
3 changes: 3 additions & 0 deletions p2p/synch/peersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ func (ps *PeerSync) RelayInventory(nds []*notify.NotifyData) {

switch value := nd.Data.(type) {
case *types.TxDesc:
if types.IsCrossChainVMTx(value.Tx.Tx) && pe.IsSupportMeerpoolTransmission() {
continue
}
if pe.HasBroadcast(value.Tx.Hash().String()) {
continue
}
Expand Down
15 changes: 13 additions & 2 deletions services/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,18 @@ func (mp *TxPool) maybeAcceptTransaction(tx *types.Tx, isNew, rateLimit, allowHi
log.Debug(fmt.Sprintf("Accepted import transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee))
return nil, txD, nil
} else if opreturn.IsMeerEVMTx(tx.Tx) {
return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash)
if mp.cfg.BC.HasTx(txHash) {
return nil, nil, fmt.Errorf("Already have transaction %v", txHash)
}
fee, err := mp.cfg.BC.MeerChain().(*meer.MeerChain).MeerPool().AddTx(tx)
if err != nil {
return nil, nil, err
}

txD := mp.addTransaction(nil, tx, nextBlockHeight, fee)

log.Debug(fmt.Sprintf("Accepted meerevm transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee))
return nil, txD, nil
}
// Fetch all of the unspent transaction outputs referenced by the inputs
// to this transaction. This function also attempts to fetch the
Expand Down Expand Up @@ -1170,7 +1181,7 @@ func (mp *TxPool) HaveTransaction(hash *hash.Hash) bool {

func (mp *TxPool) haveTransaction(hash *hash.Hash) bool {
// Protect concurrent access.
haveTx := mp.isTransactionInPool(hash, false) || mp.isOrphanInPool(hash)
haveTx := mp.isTransactionInPool(hash, true) || mp.isOrphanInPool(hash)
return haveTx
}

Expand Down
6 changes: 2 additions & 4 deletions services/notifymgr/notifymgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/Qitmeer/qng/rpc"
"github.com/Qitmeer/qng/services/notifymgr/notify"
"github.com/Qitmeer/qng/services/zmq"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/peer"
"sync"
"time"
Expand Down Expand Up @@ -47,7 +46,7 @@ type NotifyMgr struct {
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
// are added to the mempool.
func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) {
func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) {
if ntmgr.IsShutdown() {
return
}
Expand Down Expand Up @@ -184,14 +183,13 @@ func (ntmgr *NotifyMgr) handleStallSample() {
nds = append(nds, nd)
case types.BlockHeader:
nds = append(nds, nd)
case *etypes.Transaction:
txds = append(txds, value)
}

}
ntmgr.Server.RelayInventory(nds)

if len(txds) > 0 {

if ntmgr.RpcServer != nil && ntmgr.RpcServer.IsStarted() && !ntmgr.RpcServer.IsShutdown() {
ntmgr.RpcServer.NotifyNewTransactions(txds)
}
Expand Down

0 comments on commit 4883ee1

Please sign in to comment.