diff --git a/config/config.go b/config/config.go index 9a2f31ac..4cab9ff5 100644 --- a/config/config.go +++ b/config/config.go @@ -148,6 +148,8 @@ type Config struct { NoSnapSyncPeerTimeout int `long:"nosnapsyncpeertimeout" description:"During the IBD phase, if no peer node with snap-sync service enabled is found after the timeout (seconds), the node will switch to using the normal sync method."` PowDiffMode int `long:"powdiffmode" description:"Pow difficult mode:(0:meer,1:ghostdag,2:develop)"` + + TranferTxLegacyMode bool `long:"tranfertxlegacymode" description:"Support transaction transmission mode compatible with older versions before the snapsync P2P"` } func (c *Config) GetMinningAddrs() []types.Address { diff --git a/consensus/model/notify.go b/consensus/model/notify.go index a73aa63c..d80b8d99 100644 --- a/consensus/model/notify.go +++ b/consensus/model/notify.go @@ -6,14 +6,13 @@ package model import ( "github.com/Qitmeer/qng/core/types" - etypes "github.com/ethereum/go-ethereum/core/types" "github.com/libp2p/go-libp2p/core/peer" ) // Notify interface manage message announce & relay & notification between mempool, websocket, gbt long pull // and rpc server. type Notify interface { - AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) + AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID) BroadcastMessage(data interface{}) TransactionConfirmed(tx *types.Tx) diff --git a/meerevm/common/utils.go b/meerevm/common/utils.go index 71fe8cdc..02485188 100644 --- a/meerevm/common/utils.go +++ b/meerevm/common/utils.go @@ -24,7 +24,6 @@ import ( "math/big" "strconv" "strings" - "time" ) func ReverseBytes(bs *[]byte) *[]byte { @@ -84,7 +83,7 @@ func FromEVMHash(h common.Hash) *hash.Hash { return th } -func ToQNGTx(tx *types.Transaction, timestamp int64, newEncoding bool) *qtypes.Tx { +func ToQNGTx(tx *types.Transaction, newEncoding bool) *qtypes.Tx { txmb, err := tx.MarshalBinary() if err != nil { log.Error(err.Error()) @@ -101,10 +100,7 @@ func ToQNGTx(tx *types.Transaction, timestamp int64, newEncoding bool) *qtypes.T qtxh := hash.MustBytesToHash(qtxhb) mtx := qtypes.NewTransaction() - - if timestamp > 0 { - mtx.Timestamp = time.Unix(timestamp, 0) - } + mtx.Timestamp = tx.Time() mtx.AddTxIn(&qtypes.TxInput{ PreviousOut: *qtypes.NewOutPoint(&qtxh, qtypes.SupperPrevOutIndex), diff --git a/meerevm/meer/meerpool.go b/meerevm/meer/meerpool.go index 9d731a77..df259683 100644 --- a/meerevm/meer/meerpool.go +++ b/meerevm/meer/meerpool.go @@ -128,12 +128,13 @@ func (m *MeerPool) handler() { } continue } - if !m.prepareMeerChangeTxs(ev.Txs) { + ret := m.prepareMeerChangeTxs(ev.Txs) + if len(ret) <= 0 { continue } m.qTxPool.TriggerDirty() - m.p2pSer.Notify().AnnounceNewTransactions(nil, ev.Txs, nil) + m.AnnounceNewTransactions(ev.Txs) m.dirty.Store(true) // System stopped case <-m.quit: @@ -158,32 +159,26 @@ func (m *MeerPool) handler() { } } -func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) bool { - remove := 0 - all := 0 +func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) []*types.Transaction { + remaining := []*types.Transaction{} for _, tx := range txs { - if tx == nil { - continue - } - all++ if meerchange.IsMeerChangeTx(tx) { - vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, 0, true).Tx, nil) + vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, true).Tx, nil) if err != nil { log.Error(err.Error()) m.ethTxPool.RemoveTx(tx.Hash(), true) - remove++ continue } err = m.consensus.BlockChain().VerifyMeerTx(vmtx) if err != nil { log.Error(err.Error()) m.ethTxPool.RemoveTx(tx.Hash(), true) - remove++ continue } } + remaining = append(remaining, tx) } - return remove != all + return remaining } func (m *MeerPool) handleStallSample() { @@ -244,7 +239,7 @@ func (m *MeerPool) updateTemplate(force bool) error { txsNum := len(block.Transactions()) if txsNum > 0 { for _, tx := range block.Transactions() { - mtx := qcommon.ToQNGTx(tx, 0, true) + mtx := qcommon.ToQNGTx(tx, true) stx := &snapshotTx{tx: mtx, eHash: tx.Hash()} m.snapshotQTxsM[mtx.Hash().String()] = stx m.snapshotTxsM[tx.Hash().String()] = stx @@ -295,6 +290,32 @@ func (m *MeerPool) GetSize() int64 { return 0 } +func (m *MeerPool) AddTx(tx *qtypes.Tx) (int64, error) { + h := qcommon.ToEVMHash(&tx.Tx.TxIn[0].PreviousOut.Hash) + if m.eth.TxPool().Has(h) { + return 0, fmt.Errorf("already exists:%s (evm:%s)", tx.Hash().String(), h.String()) + } + txb := qcommon.ToTxHex(tx.Tx.TxIn[0].SignScript) + var txmb = &types.Transaction{} + err := txmb.UnmarshalBinary(txb) + if err != nil { + return 0, err + } + + errs := m.ethTxPool.AddRemotesSync(types.Transactions{txmb}) + if len(errs) > 0 && errs[0] != nil { + return 0, errs[0] + } + + log.Debug("Meer pool:add", "hash", tx.Hash(), "eHash", txmb.Hash()) + + // + cost := txmb.Cost() + cost = cost.Sub(cost, txmb.Value()) + cost = cost.Div(cost, qcommon.Precision) + return cost.Int64(), nil +} + func (m *MeerPool) RemoveTx(tx *qtypes.Tx) error { if !m.isRunning() { return fmt.Errorf("meer pool is not running") @@ -368,6 +389,38 @@ func (m *MeerPool) subscribe() { }() } +func (m *MeerPool) AnnounceNewTransactions(txs []*types.Transaction) error { + txds := []*qtypes.TxDesc{} + + for _, tx := range txs { + m.snapshotMu.RLock() + qtx, ok := m.snapshotTxsM[tx.Hash().String()] + m.snapshotMu.RUnlock() + if !ok || qtx == nil { + qtx = &snapshotTx{tx: qcommon.ToQNGTx(tx, true), eHash: tx.Hash()} + if qtx.tx == nil { + continue + } + } + cost := tx.Cost() + cost = cost.Sub(cost, tx.Value()) + cost = cost.Div(cost, qcommon.Precision) + fee := cost.Int64() + + td := &qtypes.TxDesc{ + Tx: qtx.tx, + Added: time.Now(), + Height: m.qTxPool.GetMainHeight(), + Fee: fee, + FeePerKB: fee * 1000 / int64(qtx.tx.Tx.SerializeSize()), + } + + txds = append(txds, td) + } + m.p2pSer.Notify().AnnounceNewTransactions(nil, txds, nil) + return nil +} + func newMeerPool(consensus model.Consensus, eth *eth.Ethereum) *MeerPool { log.Info(fmt.Sprintf("New Meer pool")) m := &MeerPool{} diff --git a/p2p/peers/peer.go b/p2p/peers/peer.go index 318d3af0..17d388d5 100644 --- a/p2p/peers/peer.go +++ b/p2p/peers/peer.go @@ -812,6 +812,16 @@ func (p *Peer) IsSupportMeerP2PBridging() bool { return p.chainState.ProtocolVersion >= protocol.MeerP2PBridgingProtocolVersion } +func (p *Peer) IsSupportMeerpoolTransmission() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + if p.chainState == nil { + return false + } + return p.chainState.ProtocolVersion >= protocol.MeerPoolProtocolVersion +} + func (p *Peer) GetMeerConn() bool { p.lock.RLock() defer p.lock.RUnlock() diff --git a/p2p/synch/mempool.go b/p2p/synch/mempool.go index d2a0db6b..5763560f 100644 --- a/p2p/synch/mempool.go +++ b/p2p/synch/mempool.go @@ -60,7 +60,7 @@ func (ps *PeerSync) OnMemPool(sp *peers.Peer, msg *MsgMemPool) error { // per message. The NewMsgInvSizeHint function automatically limits // the passed hint to the maximum allowed, so it's safe to pass it // without double checking it here. - txDescs := ps.sy.p2p.TxMemPool().TxDescs(false) + txDescs := ps.sy.p2p.TxMemPool().TxDescs(ps.sy.p2p.Consensus().Config().TranferTxLegacyMode && !sp.IsSupportMeerpoolTransmission()) invs := []*pb.InvVect{} for _, txDesc := range txDescs { diff --git a/p2p/synch/peersync.go b/p2p/synch/peersync.go index 459c14c4..a25e18fd 100644 --- a/p2p/synch/peersync.go +++ b/p2p/synch/peersync.go @@ -597,6 +597,12 @@ func (ps *PeerSync) RelayInventory(nds []*notify.NotifyData) { switch value := nd.Data.(type) { case *types.TxDesc: + if types.IsCrossChainVMTx(value.Tx.Tx) { + if !ps.sy.p2p.Consensus().Config().TranferTxLegacyMode || + pe.IsSupportMeerpoolTransmission() { + continue + } + } if pe.HasBroadcast(value.Tx.Hash().String()) { continue } diff --git a/services/common/flags.go b/services/common/flags.go index 82053f57..8294c59d 100644 --- a/services/common/flags.go +++ b/services/common/flags.go @@ -664,6 +664,12 @@ var ( Value: defaultPowDiffMode, Destination: &cfg.PowDiffMode, }, + &cli.BoolFlag{ + Name: "tranfertxlegacymode", + Usage: "Support transaction transmission mode compatible with older versions before the snapsync P2P", + Value: true, + Destination: &cfg.TranferTxLegacyMode, + }, } ) @@ -702,6 +708,7 @@ func DefaultConfig(homeDir string) *config.Config { GBTTimeOut: defaultGBTTimeout, NoSnapSyncPeerTimeout: defaultSnapTimeout, PowDiffMode: defaultPowDiffMode, + TranferTxLegacyMode: true, } if len(homeDir) > 0 { hd, err := filepath.Abs(homeDir) diff --git a/services/mempool/mempool.go b/services/mempool/mempool.go index 32186918..ed8d3234 100644 --- a/services/mempool/mempool.go +++ b/services/mempool/mempool.go @@ -508,7 +508,21 @@ func (mp *TxPool) maybeAcceptTransaction(tx *types.Tx, isNew, rateLimit, allowHi log.Debug(fmt.Sprintf("Accepted import transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee)) return nil, txD, nil } else if opreturn.IsMeerEVMTx(tx.Tx) { - return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash) + if !mp.cfg.BC.Consensus().Config().TranferTxLegacyMode { + return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash) + } + if mp.cfg.BC.HasTx(txHash) { + return nil, nil, fmt.Errorf("Already have transaction %v", txHash) + } + fee, err := mp.cfg.BC.MeerChain().(*meer.MeerChain).MeerPool().AddTx(tx) + if err != nil { + return nil, nil, err + } + + txD := mp.addTransaction(nil, tx, nextBlockHeight, fee) + + log.Debug(fmt.Sprintf("Accepted meerevm transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee)) + return nil, txD, nil } // Fetch all of the unspent transaction outputs referenced by the inputs // to this transaction. This function also attempts to fetch the @@ -1170,7 +1184,7 @@ func (mp *TxPool) HaveTransaction(hash *hash.Hash) bool { func (mp *TxPool) haveTransaction(hash *hash.Hash) bool { // Protect concurrent access. - haveTx := mp.isTransactionInPool(hash, false) || mp.isOrphanInPool(hash) + haveTx := mp.isTransactionInPool(hash, true) || mp.isOrphanInPool(hash) return haveTx } diff --git a/services/notifymgr/notifymgr.go b/services/notifymgr/notifymgr.go index bf8a67bb..788c7bd4 100644 --- a/services/notifymgr/notifymgr.go +++ b/services/notifymgr/notifymgr.go @@ -12,7 +12,6 @@ import ( "github.com/Qitmeer/qng/rpc" "github.com/Qitmeer/qng/services/notifymgr/notify" "github.com/Qitmeer/qng/services/zmq" - etypes "github.com/ethereum/go-ethereum/core/types" "github.com/libp2p/go-libp2p/core/peer" "sync" "time" @@ -47,7 +46,7 @@ type NotifyMgr struct { // both websocket and getblocktemplate long poll clients of the passed // transactions. This function should be called whenever new transactions // are added to the mempool. -func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) { +func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) { if ntmgr.IsShutdown() { return } @@ -181,17 +180,20 @@ func (ntmgr *NotifyMgr) handleStallSample() { case *types.TxDesc: log.Trace(fmt.Sprintf("Announce new transaction :hash=%s height=%d add=%s", value.Tx.Hash().String(), value.Height, value.Added.String())) txds = append(txds, value.Tx) + if types.IsCrossChainVMTx(value.Tx.Tx) && + !ntmgr.Server.Consensus().Config().TranferTxLegacyMode { + continue + } nds = append(nds, nd) case types.BlockHeader: nds = append(nds, nd) - case *etypes.Transaction: - txds = append(txds, value) } } ntmgr.Server.RelayInventory(nds) if len(txds) > 0 { + if ntmgr.RpcServer != nil && ntmgr.RpcServer.IsStarted() && !ntmgr.RpcServer.IsShutdown() { ntmgr.RpcServer.NotifyNewTransactions(txds) }