Skip to content

Commit

Permalink
Merge pull request #860 from lochjin/dev2.0
Browse files Browse the repository at this point in the history
feat:compatible with peers(protover<=45) for tx propagation
  • Loading branch information
dindinw authored Dec 20, 2024
2 parents 09ae62d + f2a65d3 commit 51245c9
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 29 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ type Config struct {
NoSnapSyncPeerTimeout int `long:"nosnapsyncpeertimeout" description:"During the IBD phase, if no peer node with snap-sync service enabled is found after the timeout (seconds), the node will switch to using the normal sync method."`

PowDiffMode int `long:"powdiffmode" description:"Pow difficult mode:(0:meer,1:ghostdag,2:develop)"`

TranferTxLegacyMode bool `long:"tranfertxlegacymode" description:"Support transaction transmission mode compatible with older versions before the snapsync P2P"`
}

func (c *Config) GetMinningAddrs() []types.Address {
Expand Down
3 changes: 1 addition & 2 deletions consensus/model/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ package model

import (
"github.com/Qitmeer/qng/core/types"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/peer"
)

// Notify interface manage message announce & relay & notification between mempool, websocket, gbt long pull
// and rpc server.
type Notify interface {
AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID)
AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID)
RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID)
BroadcastMessage(data interface{})
TransactionConfirmed(tx *types.Tx)
Expand Down
8 changes: 2 additions & 6 deletions meerevm/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"math/big"
"strconv"
"strings"
"time"
)

func ReverseBytes(bs *[]byte) *[]byte {
Expand Down Expand Up @@ -84,7 +83,7 @@ func FromEVMHash(h common.Hash) *hash.Hash {
return th
}

func ToQNGTx(tx *types.Transaction, timestamp int64, newEncoding bool) *qtypes.Tx {
func ToQNGTx(tx *types.Transaction, newEncoding bool) *qtypes.Tx {
txmb, err := tx.MarshalBinary()
if err != nil {
log.Error(err.Error())
Expand All @@ -101,10 +100,7 @@ func ToQNGTx(tx *types.Transaction, timestamp int64, newEncoding bool) *qtypes.T
qtxh := hash.MustBytesToHash(qtxhb)

mtx := qtypes.NewTransaction()

if timestamp > 0 {
mtx.Timestamp = time.Unix(timestamp, 0)
}
mtx.Timestamp = tx.Time()

mtx.AddTxIn(&qtypes.TxInput{
PreviousOut: *qtypes.NewOutPoint(&qtxh, qtypes.SupperPrevOutIndex),
Expand Down
81 changes: 67 additions & 14 deletions meerevm/meer/meerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ func (m *MeerPool) handler() {
}
continue
}
if !m.prepareMeerChangeTxs(ev.Txs) {
ret := m.prepareMeerChangeTxs(ev.Txs)
if len(ret) <= 0 {
continue
}

m.qTxPool.TriggerDirty()
m.p2pSer.Notify().AnnounceNewTransactions(nil, ev.Txs, nil)
m.AnnounceNewTransactions(ev.Txs)
m.dirty.Store(true)
// System stopped
case <-m.quit:
Expand All @@ -158,32 +159,26 @@ func (m *MeerPool) handler() {
}
}

func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) bool {
remove := 0
all := 0
func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) []*types.Transaction {
remaining := []*types.Transaction{}
for _, tx := range txs {
if tx == nil {
continue
}
all++
if meerchange.IsMeerChangeTx(tx) {
vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, 0, true).Tx, nil)
vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, true).Tx, nil)
if err != nil {
log.Error(err.Error())
m.ethTxPool.RemoveTx(tx.Hash(), true)
remove++
continue
}
err = m.consensus.BlockChain().VerifyMeerTx(vmtx)
if err != nil {
log.Error(err.Error())
m.ethTxPool.RemoveTx(tx.Hash(), true)
remove++
continue
}
}
remaining = append(remaining, tx)
}
return remove != all
return remaining
}

func (m *MeerPool) handleStallSample() {
Expand Down Expand Up @@ -244,7 +239,7 @@ func (m *MeerPool) updateTemplate(force bool) error {
txsNum := len(block.Transactions())
if txsNum > 0 {
for _, tx := range block.Transactions() {
mtx := qcommon.ToQNGTx(tx, 0, true)
mtx := qcommon.ToQNGTx(tx, true)
stx := &snapshotTx{tx: mtx, eHash: tx.Hash()}
m.snapshotQTxsM[mtx.Hash().String()] = stx
m.snapshotTxsM[tx.Hash().String()] = stx
Expand Down Expand Up @@ -295,6 +290,32 @@ func (m *MeerPool) GetSize() int64 {
return 0
}

func (m *MeerPool) AddTx(tx *qtypes.Tx) (int64, error) {
h := qcommon.ToEVMHash(&tx.Tx.TxIn[0].PreviousOut.Hash)
if m.eth.TxPool().Has(h) {
return 0, fmt.Errorf("already exists:%s (evm:%s)", tx.Hash().String(), h.String())
}
txb := qcommon.ToTxHex(tx.Tx.TxIn[0].SignScript)
var txmb = &types.Transaction{}
err := txmb.UnmarshalBinary(txb)
if err != nil {
return 0, err
}

errs := m.ethTxPool.AddRemotesSync(types.Transactions{txmb})
if len(errs) > 0 && errs[0] != nil {
return 0, errs[0]
}

log.Debug("Meer pool:add", "hash", tx.Hash(), "eHash", txmb.Hash())

//
cost := txmb.Cost()
cost = cost.Sub(cost, txmb.Value())
cost = cost.Div(cost, qcommon.Precision)
return cost.Int64(), nil
}

func (m *MeerPool) RemoveTx(tx *qtypes.Tx) error {
if !m.isRunning() {
return fmt.Errorf("meer pool is not running")
Expand Down Expand Up @@ -368,6 +389,38 @@ func (m *MeerPool) subscribe() {
}()
}

func (m *MeerPool) AnnounceNewTransactions(txs []*types.Transaction) error {
txds := []*qtypes.TxDesc{}

for _, tx := range txs {
m.snapshotMu.RLock()
qtx, ok := m.snapshotTxsM[tx.Hash().String()]
m.snapshotMu.RUnlock()
if !ok || qtx == nil {
qtx = &snapshotTx{tx: qcommon.ToQNGTx(tx, true), eHash: tx.Hash()}
if qtx.tx == nil {
continue
}
}
cost := tx.Cost()
cost = cost.Sub(cost, tx.Value())
cost = cost.Div(cost, qcommon.Precision)
fee := cost.Int64()

td := &qtypes.TxDesc{
Tx: qtx.tx,
Added: time.Now(),
Height: m.qTxPool.GetMainHeight(),
Fee: fee,
FeePerKB: fee * 1000 / int64(qtx.tx.Tx.SerializeSize()),
}

txds = append(txds, td)
}
m.p2pSer.Notify().AnnounceNewTransactions(nil, txds, nil)
return nil
}

func newMeerPool(consensus model.Consensus, eth *eth.Ethereum) *MeerPool {
log.Info(fmt.Sprintf("New Meer pool"))
m := &MeerPool{}
Expand Down
10 changes: 10 additions & 0 deletions p2p/peers/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,16 @@ func (p *Peer) IsSupportMeerP2PBridging() bool {
return p.chainState.ProtocolVersion >= protocol.MeerP2PBridgingProtocolVersion
}

func (p *Peer) IsSupportMeerpoolTransmission() bool {
p.lock.RLock()
defer p.lock.RUnlock()

if p.chainState == nil {
return false
}
return p.chainState.ProtocolVersion >= protocol.MeerPoolProtocolVersion
}

func (p *Peer) GetMeerConn() bool {
p.lock.RLock()
defer p.lock.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion p2p/synch/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (ps *PeerSync) OnMemPool(sp *peers.Peer, msg *MsgMemPool) error {
// per message. The NewMsgInvSizeHint function automatically limits
// the passed hint to the maximum allowed, so it's safe to pass it
// without double checking it here.
txDescs := ps.sy.p2p.TxMemPool().TxDescs(false)
txDescs := ps.sy.p2p.TxMemPool().TxDescs(ps.sy.p2p.Consensus().Config().TranferTxLegacyMode && !sp.IsSupportMeerpoolTransmission())

invs := []*pb.InvVect{}
for _, txDesc := range txDescs {
Expand Down
6 changes: 6 additions & 0 deletions p2p/synch/peersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,12 @@ func (ps *PeerSync) RelayInventory(nds []*notify.NotifyData) {

switch value := nd.Data.(type) {
case *types.TxDesc:
if types.IsCrossChainVMTx(value.Tx.Tx) {
if !ps.sy.p2p.Consensus().Config().TranferTxLegacyMode ||
pe.IsSupportMeerpoolTransmission() {
continue
}
}
if pe.HasBroadcast(value.Tx.Hash().String()) {
continue
}
Expand Down
7 changes: 7 additions & 0 deletions services/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ var (
Value: defaultPowDiffMode,
Destination: &cfg.PowDiffMode,
},
&cli.BoolFlag{
Name: "tranfertxlegacymode",
Usage: "Support transaction transmission mode compatible with older versions before the snapsync P2P",
Value: true,
Destination: &cfg.TranferTxLegacyMode,
},
}
)

Expand Down Expand Up @@ -702,6 +708,7 @@ func DefaultConfig(homeDir string) *config.Config {
GBTTimeOut: defaultGBTTimeout,
NoSnapSyncPeerTimeout: defaultSnapTimeout,
PowDiffMode: defaultPowDiffMode,
TranferTxLegacyMode: true,
}
if len(homeDir) > 0 {
hd, err := filepath.Abs(homeDir)
Expand Down
18 changes: 16 additions & 2 deletions services/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,21 @@ func (mp *TxPool) maybeAcceptTransaction(tx *types.Tx, isNew, rateLimit, allowHi
log.Debug(fmt.Sprintf("Accepted import transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee))
return nil, txD, nil
} else if opreturn.IsMeerEVMTx(tx.Tx) {
return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash)
if !mp.cfg.BC.Consensus().Config().TranferTxLegacyMode {
return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash)
}
if mp.cfg.BC.HasTx(txHash) {
return nil, nil, fmt.Errorf("Already have transaction %v", txHash)
}
fee, err := mp.cfg.BC.MeerChain().(*meer.MeerChain).MeerPool().AddTx(tx)
if err != nil {
return nil, nil, err
}

txD := mp.addTransaction(nil, tx, nextBlockHeight, fee)

log.Debug(fmt.Sprintf("Accepted meerevm transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee))
return nil, txD, nil
}
// Fetch all of the unspent transaction outputs referenced by the inputs
// to this transaction. This function also attempts to fetch the
Expand Down Expand Up @@ -1170,7 +1184,7 @@ func (mp *TxPool) HaveTransaction(hash *hash.Hash) bool {

func (mp *TxPool) haveTransaction(hash *hash.Hash) bool {
// Protect concurrent access.
haveTx := mp.isTransactionInPool(hash, false) || mp.isOrphanInPool(hash)
haveTx := mp.isTransactionInPool(hash, true) || mp.isOrphanInPool(hash)
return haveTx
}

Expand Down
10 changes: 6 additions & 4 deletions services/notifymgr/notifymgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/Qitmeer/qng/rpc"
"github.com/Qitmeer/qng/services/notifymgr/notify"
"github.com/Qitmeer/qng/services/zmq"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/peer"
"sync"
"time"
Expand Down Expand Up @@ -47,7 +46,7 @@ type NotifyMgr struct {
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
// are added to the mempool.
func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) {
func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) {
if ntmgr.IsShutdown() {
return
}
Expand Down Expand Up @@ -181,17 +180,20 @@ func (ntmgr *NotifyMgr) handleStallSample() {
case *types.TxDesc:
log.Trace(fmt.Sprintf("Announce new transaction :hash=%s height=%d add=%s", value.Tx.Hash().String(), value.Height, value.Added.String()))
txds = append(txds, value.Tx)
if types.IsCrossChainVMTx(value.Tx.Tx) &&
!ntmgr.Server.Consensus().Config().TranferTxLegacyMode {
continue
}
nds = append(nds, nd)
case types.BlockHeader:
nds = append(nds, nd)
case *etypes.Transaction:
txds = append(txds, value)
}

}
ntmgr.Server.RelayInventory(nds)

if len(txds) > 0 {

if ntmgr.RpcServer != nil && ntmgr.RpcServer.IsStarted() && !ntmgr.RpcServer.IsShutdown() {
ntmgr.RpcServer.NotifyNewTransactions(txds)
}
Expand Down

0 comments on commit 51245c9

Please sign in to comment.