diff --git a/consensus/model/notify.go b/consensus/model/notify.go index a73aa63c..d80b8d99 100644 --- a/consensus/model/notify.go +++ b/consensus/model/notify.go @@ -6,14 +6,13 @@ package model import ( "github.com/Qitmeer/qng/core/types" - etypes "github.com/ethereum/go-ethereum/core/types" "github.com/libp2p/go-libp2p/core/peer" ) // Notify interface manage message announce & relay & notification between mempool, websocket, gbt long pull // and rpc server. type Notify interface { - AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) + AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID) BroadcastMessage(data interface{}) TransactionConfirmed(tx *types.Tx) diff --git a/meerevm/meer/meerpool.go b/meerevm/meer/meerpool.go index 608b6820..df259683 100644 --- a/meerevm/meer/meerpool.go +++ b/meerevm/meer/meerpool.go @@ -128,12 +128,13 @@ func (m *MeerPool) handler() { } continue } - if !m.prepareMeerChangeTxs(ev.Txs) { + ret := m.prepareMeerChangeTxs(ev.Txs) + if len(ret) <= 0 { continue } m.qTxPool.TriggerDirty() - m.p2pSer.Notify().AnnounceNewTransactions(nil, ev.Txs, nil) + m.AnnounceNewTransactions(ev.Txs) m.dirty.Store(true) // System stopped case <-m.quit: @@ -158,32 +159,26 @@ func (m *MeerPool) handler() { } } -func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) bool { - remove := 0 - all := 0 +func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) []*types.Transaction { + remaining := []*types.Transaction{} for _, tx := range txs { - if tx == nil { - continue - } - all++ if meerchange.IsMeerChangeTx(tx) { vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, true).Tx, nil) if err != nil { log.Error(err.Error()) m.ethTxPool.RemoveTx(tx.Hash(), true) - remove++ continue } err = m.consensus.BlockChain().VerifyMeerTx(vmtx) if err != nil { log.Error(err.Error()) m.ethTxPool.RemoveTx(tx.Hash(), true) - remove++ continue } } + remaining = append(remaining, tx) } - return remove != all + return remaining } func (m *MeerPool) handleStallSample() { @@ -295,6 +290,32 @@ func (m *MeerPool) GetSize() int64 { return 0 } +func (m *MeerPool) AddTx(tx *qtypes.Tx) (int64, error) { + h := qcommon.ToEVMHash(&tx.Tx.TxIn[0].PreviousOut.Hash) + if m.eth.TxPool().Has(h) { + return 0, fmt.Errorf("already exists:%s (evm:%s)", tx.Hash().String(), h.String()) + } + txb := qcommon.ToTxHex(tx.Tx.TxIn[0].SignScript) + var txmb = &types.Transaction{} + err := txmb.UnmarshalBinary(txb) + if err != nil { + return 0, err + } + + errs := m.ethTxPool.AddRemotesSync(types.Transactions{txmb}) + if len(errs) > 0 && errs[0] != nil { + return 0, errs[0] + } + + log.Debug("Meer pool:add", "hash", tx.Hash(), "eHash", txmb.Hash()) + + // + cost := txmb.Cost() + cost = cost.Sub(cost, txmb.Value()) + cost = cost.Div(cost, qcommon.Precision) + return cost.Int64(), nil +} + func (m *MeerPool) RemoveTx(tx *qtypes.Tx) error { if !m.isRunning() { return fmt.Errorf("meer pool is not running") @@ -368,6 +389,38 @@ func (m *MeerPool) subscribe() { }() } +func (m *MeerPool) AnnounceNewTransactions(txs []*types.Transaction) error { + txds := []*qtypes.TxDesc{} + + for _, tx := range txs { + m.snapshotMu.RLock() + qtx, ok := m.snapshotTxsM[tx.Hash().String()] + m.snapshotMu.RUnlock() + if !ok || qtx == nil { + qtx = &snapshotTx{tx: qcommon.ToQNGTx(tx, true), eHash: tx.Hash()} + if qtx.tx == nil { + continue + } + } + cost := tx.Cost() + cost = cost.Sub(cost, tx.Value()) + cost = cost.Div(cost, qcommon.Precision) + fee := cost.Int64() + + td := &qtypes.TxDesc{ + Tx: qtx.tx, + Added: time.Now(), + Height: m.qTxPool.GetMainHeight(), + Fee: fee, + FeePerKB: fee * 1000 / int64(qtx.tx.Tx.SerializeSize()), + } + + txds = append(txds, td) + } + m.p2pSer.Notify().AnnounceNewTransactions(nil, txds, nil) + return nil +} + func newMeerPool(consensus model.Consensus, eth *eth.Ethereum) *MeerPool { log.Info(fmt.Sprintf("New Meer pool")) m := &MeerPool{} diff --git a/p2p/peers/peer.go b/p2p/peers/peer.go index 318d3af0..17d388d5 100644 --- a/p2p/peers/peer.go +++ b/p2p/peers/peer.go @@ -812,6 +812,16 @@ func (p *Peer) IsSupportMeerP2PBridging() bool { return p.chainState.ProtocolVersion >= protocol.MeerP2PBridgingProtocolVersion } +func (p *Peer) IsSupportMeerpoolTransmission() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + if p.chainState == nil { + return false + } + return p.chainState.ProtocolVersion >= protocol.MeerPoolProtocolVersion +} + func (p *Peer) GetMeerConn() bool { p.lock.RLock() defer p.lock.RUnlock() diff --git a/p2p/synch/mempool.go b/p2p/synch/mempool.go index d2a0db6b..4397a64b 100644 --- a/p2p/synch/mempool.go +++ b/p2p/synch/mempool.go @@ -60,7 +60,7 @@ func (ps *PeerSync) OnMemPool(sp *peers.Peer, msg *MsgMemPool) error { // per message. The NewMsgInvSizeHint function automatically limits // the passed hint to the maximum allowed, so it's safe to pass it // without double checking it here. - txDescs := ps.sy.p2p.TxMemPool().TxDescs(false) + txDescs := ps.sy.p2p.TxMemPool().TxDescs(!sp.IsSupportMeerpoolTransmission()) invs := []*pb.InvVect{} for _, txDesc := range txDescs { diff --git a/p2p/synch/peersync.go b/p2p/synch/peersync.go index 459c14c4..9364f577 100644 --- a/p2p/synch/peersync.go +++ b/p2p/synch/peersync.go @@ -597,6 +597,9 @@ func (ps *PeerSync) RelayInventory(nds []*notify.NotifyData) { switch value := nd.Data.(type) { case *types.TxDesc: + if types.IsCrossChainVMTx(value.Tx.Tx) && pe.IsSupportMeerpoolTransmission() { + continue + } if pe.HasBroadcast(value.Tx.Hash().String()) { continue } diff --git a/services/mempool/mempool.go b/services/mempool/mempool.go index 32186918..fc8bc03e 100644 --- a/services/mempool/mempool.go +++ b/services/mempool/mempool.go @@ -508,7 +508,18 @@ func (mp *TxPool) maybeAcceptTransaction(tx *types.Tx, isNew, rateLimit, allowHi log.Debug(fmt.Sprintf("Accepted import transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee)) return nil, txD, nil } else if opreturn.IsMeerEVMTx(tx.Tx) { - return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash) + if mp.cfg.BC.HasTx(txHash) { + return nil, nil, fmt.Errorf("Already have transaction %v", txHash) + } + fee, err := mp.cfg.BC.MeerChain().(*meer.MeerChain).MeerPool().AddTx(tx) + if err != nil { + return nil, nil, err + } + + txD := mp.addTransaction(nil, tx, nextBlockHeight, fee) + + log.Debug(fmt.Sprintf("Accepted meerevm transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee)) + return nil, txD, nil } // Fetch all of the unspent transaction outputs referenced by the inputs // to this transaction. This function also attempts to fetch the @@ -1170,7 +1181,7 @@ func (mp *TxPool) HaveTransaction(hash *hash.Hash) bool { func (mp *TxPool) haveTransaction(hash *hash.Hash) bool { // Protect concurrent access. - haveTx := mp.isTransactionInPool(hash, false) || mp.isOrphanInPool(hash) + haveTx := mp.isTransactionInPool(hash, true) || mp.isOrphanInPool(hash) return haveTx } diff --git a/services/notifymgr/notifymgr.go b/services/notifymgr/notifymgr.go index bf8a67bb..3d91c54f 100644 --- a/services/notifymgr/notifymgr.go +++ b/services/notifymgr/notifymgr.go @@ -12,7 +12,6 @@ import ( "github.com/Qitmeer/qng/rpc" "github.com/Qitmeer/qng/services/notifymgr/notify" "github.com/Qitmeer/qng/services/zmq" - etypes "github.com/ethereum/go-ethereum/core/types" "github.com/libp2p/go-libp2p/core/peer" "sync" "time" @@ -47,7 +46,7 @@ type NotifyMgr struct { // both websocket and getblocktemplate long poll clients of the passed // transactions. This function should be called whenever new transactions // are added to the mempool. -func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) { +func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) { if ntmgr.IsShutdown() { return } @@ -184,14 +183,13 @@ func (ntmgr *NotifyMgr) handleStallSample() { nds = append(nds, nd) case types.BlockHeader: nds = append(nds, nd) - case *etypes.Transaction: - txds = append(txds, value) } } ntmgr.Server.RelayInventory(nds) if len(txds) > 0 { + if ntmgr.RpcServer != nil && ntmgr.RpcServer.IsStarted() && !ntmgr.RpcServer.IsShutdown() { ntmgr.RpcServer.NotifyNewTransactions(txds) }