Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:compatible with peers(protover<=45) for tx propagation #860

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ type Config struct {
NoSnapSyncPeerTimeout int `long:"nosnapsyncpeertimeout" description:"During the IBD phase, if no peer node with snap-sync service enabled is found after the timeout (seconds), the node will switch to using the normal sync method."`

PowDiffMode int `long:"powdiffmode" description:"Pow difficult mode:(0:meer,1:ghostdag,2:develop)"`

TransferVer1Txs bool `long:"transferver1txs" description:"Transmission of txs sent by first generation peers"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be better described like:TranferTxLegacyMode
"Support transaction transmission mode compatible with older versions before the snapsync P2P"

}

func (c *Config) GetMinningAddrs() []types.Address {
Expand Down
3 changes: 1 addition & 2 deletions consensus/model/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ package model

import (
"github.com/Qitmeer/qng/core/types"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/peer"
)

// Notify interface manage message announce & relay & notification between mempool, websocket, gbt long pull
// and rpc server.
type Notify interface {
AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID)
AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID)
RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID)
BroadcastMessage(data interface{})
TransactionConfirmed(tx *types.Tx)
Expand Down
8 changes: 2 additions & 6 deletions meerevm/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"math/big"
"strconv"
"strings"
"time"
)

func ReverseBytes(bs *[]byte) *[]byte {
Expand Down Expand Up @@ -84,7 +83,7 @@ func FromEVMHash(h common.Hash) *hash.Hash {
return th
}

func ToQNGTx(tx *types.Transaction, timestamp int64, newEncoding bool) *qtypes.Tx {
func ToQNGTx(tx *types.Transaction, newEncoding bool) *qtypes.Tx {
txmb, err := tx.MarshalBinary()
if err != nil {
log.Error(err.Error())
Expand All @@ -101,10 +100,7 @@ func ToQNGTx(tx *types.Transaction, timestamp int64, newEncoding bool) *qtypes.T
qtxh := hash.MustBytesToHash(qtxhb)

mtx := qtypes.NewTransaction()

if timestamp > 0 {
mtx.Timestamp = time.Unix(timestamp, 0)
}
mtx.Timestamp = tx.Time()

mtx.AddTxIn(&qtypes.TxInput{
PreviousOut: *qtypes.NewOutPoint(&qtxh, qtypes.SupperPrevOutIndex),
Expand Down
81 changes: 67 additions & 14 deletions meerevm/meer/meerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ func (m *MeerPool) handler() {
}
continue
}
if !m.prepareMeerChangeTxs(ev.Txs) {
ret := m.prepareMeerChangeTxs(ev.Txs)
if len(ret) <= 0 {
continue
}

m.qTxPool.TriggerDirty()
m.p2pSer.Notify().AnnounceNewTransactions(nil, ev.Txs, nil)
m.AnnounceNewTransactions(ev.Txs)
m.dirty.Store(true)
// System stopped
case <-m.quit:
Expand All @@ -158,32 +159,26 @@ func (m *MeerPool) handler() {
}
}

func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) bool {
remove := 0
all := 0
func (m *MeerPool) prepareMeerChangeTxs(txs []*types.Transaction) []*types.Transaction {
remaining := []*types.Transaction{}
for _, tx := range txs {
if tx == nil {
continue
}
all++
if meerchange.IsMeerChangeTx(tx) {
vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, 0, true).Tx, nil)
vmtx, err := meer.NewVMTx(qcommon.ToQNGTx(tx, true).Tx, nil)
if err != nil {
log.Error(err.Error())
m.ethTxPool.RemoveTx(tx.Hash(), true)
remove++
continue
}
err = m.consensus.BlockChain().VerifyMeerTx(vmtx)
if err != nil {
log.Error(err.Error())
m.ethTxPool.RemoveTx(tx.Hash(), true)
remove++
continue
}
}
remaining = append(remaining, tx)
}
return remove != all
return remaining
}

func (m *MeerPool) handleStallSample() {
Expand Down Expand Up @@ -244,7 +239,7 @@ func (m *MeerPool) updateTemplate(force bool) error {
txsNum := len(block.Transactions())
if txsNum > 0 {
for _, tx := range block.Transactions() {
mtx := qcommon.ToQNGTx(tx, 0, true)
mtx := qcommon.ToQNGTx(tx, true)
stx := &snapshotTx{tx: mtx, eHash: tx.Hash()}
m.snapshotQTxsM[mtx.Hash().String()] = stx
m.snapshotTxsM[tx.Hash().String()] = stx
Expand Down Expand Up @@ -295,6 +290,32 @@ func (m *MeerPool) GetSize() int64 {
return 0
}

func (m *MeerPool) AddTx(tx *qtypes.Tx) (int64, error) {
h := qcommon.ToEVMHash(&tx.Tx.TxIn[0].PreviousOut.Hash)
if m.eth.TxPool().Has(h) {
return 0, fmt.Errorf("already exists:%s (evm:%s)", tx.Hash().String(), h.String())
}
txb := qcommon.ToTxHex(tx.Tx.TxIn[0].SignScript)
var txmb = &types.Transaction{}
err := txmb.UnmarshalBinary(txb)
if err != nil {
return 0, err
}

errs := m.ethTxPool.AddRemotesSync(types.Transactions{txmb})
if len(errs) > 0 && errs[0] != nil {
return 0, errs[0]
}

log.Debug("Meer pool:add", "hash", tx.Hash(), "eHash", txmb.Hash())

//
cost := txmb.Cost()
cost = cost.Sub(cost, txmb.Value())
cost = cost.Div(cost, qcommon.Precision)
return cost.Int64(), nil
}

func (m *MeerPool) RemoveTx(tx *qtypes.Tx) error {
if !m.isRunning() {
return fmt.Errorf("meer pool is not running")
Expand Down Expand Up @@ -368,6 +389,38 @@ func (m *MeerPool) subscribe() {
}()
}

func (m *MeerPool) AnnounceNewTransactions(txs []*types.Transaction) error {
txds := []*qtypes.TxDesc{}

for _, tx := range txs {
m.snapshotMu.RLock()
qtx, ok := m.snapshotTxsM[tx.Hash().String()]
m.snapshotMu.RUnlock()
if !ok || qtx == nil {
qtx = &snapshotTx{tx: qcommon.ToQNGTx(tx, true), eHash: tx.Hash()}
if qtx.tx == nil {
continue
}
}
cost := tx.Cost()
cost = cost.Sub(cost, tx.Value())
cost = cost.Div(cost, qcommon.Precision)
fee := cost.Int64()

td := &qtypes.TxDesc{
Tx: qtx.tx,
Added: time.Now(),
Height: m.qTxPool.GetMainHeight(),
Fee: fee,
FeePerKB: fee * 1000 / int64(qtx.tx.Tx.SerializeSize()),
}

txds = append(txds, td)
}
m.p2pSer.Notify().AnnounceNewTransactions(nil, txds, nil)
return nil
}

func newMeerPool(consensus model.Consensus, eth *eth.Ethereum) *MeerPool {
log.Info(fmt.Sprintf("New Meer pool"))
m := &MeerPool{}
Expand Down
10 changes: 10 additions & 0 deletions p2p/peers/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,16 @@ func (p *Peer) IsSupportMeerP2PBridging() bool {
return p.chainState.ProtocolVersion >= protocol.MeerP2PBridgingProtocolVersion
}

func (p *Peer) IsSupportMeerpoolTransmission() bool {
p.lock.RLock()
defer p.lock.RUnlock()

if p.chainState == nil {
return false
}
return p.chainState.ProtocolVersion >= protocol.MeerPoolProtocolVersion
}

func (p *Peer) GetMeerConn() bool {
p.lock.RLock()
defer p.lock.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion p2p/synch/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (ps *PeerSync) OnMemPool(sp *peers.Peer, msg *MsgMemPool) error {
// per message. The NewMsgInvSizeHint function automatically limits
// the passed hint to the maximum allowed, so it's safe to pass it
// without double checking it here.
txDescs := ps.sy.p2p.TxMemPool().TxDescs(false)
txDescs := ps.sy.p2p.TxMemPool().TxDescs(ps.sy.p2p.Consensus().Config().TransferVer1Txs && !sp.IsSupportMeerpoolTransmission())

invs := []*pb.InvVect{}
for _, txDesc := range txDescs {
Expand Down
6 changes: 6 additions & 0 deletions p2p/synch/peersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,12 @@ func (ps *PeerSync) RelayInventory(nds []*notify.NotifyData) {

switch value := nd.Data.(type) {
case *types.TxDesc:
if types.IsCrossChainVMTx(value.Tx.Tx) {
if !ps.sy.p2p.Consensus().Config().TransferVer1Txs ||
pe.IsSupportMeerpoolTransmission() {
continue
}
}
if pe.HasBroadcast(value.Tx.Hash().String()) {
continue
}
Expand Down
7 changes: 7 additions & 0 deletions services/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ var (
Value: defaultPowDiffMode,
Destination: &cfg.PowDiffMode,
},
&cli.BoolFlag{
Name: "transferver1txs",
Usage: "Transmission of txs sent by first generation peers",
Value: false,
Destination: &cfg.TransferVer1Txs,
},
}
)

Expand Down Expand Up @@ -702,6 +708,7 @@ func DefaultConfig(homeDir string) *config.Config {
GBTTimeOut: defaultGBTTimeout,
NoSnapSyncPeerTimeout: defaultSnapTimeout,
PowDiffMode: defaultPowDiffMode,
TransferVer1Txs: false,
}
if len(homeDir) > 0 {
hd, err := filepath.Abs(homeDir)
Expand Down
18 changes: 16 additions & 2 deletions services/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,21 @@ func (mp *TxPool) maybeAcceptTransaction(tx *types.Tx, isNew, rateLimit, allowHi
log.Debug(fmt.Sprintf("Accepted import transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee))
return nil, txD, nil
} else if opreturn.IsMeerEVMTx(tx.Tx) {
return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash)
if !mp.cfg.BC.Consensus().Config().TransferVer1Txs {
return nil, nil, fmt.Errorf("Unsupported this MeerEVMTx %v", txHash)
}
if mp.cfg.BC.HasTx(txHash) {
return nil, nil, fmt.Errorf("Already have transaction %v", txHash)
}
fee, err := mp.cfg.BC.MeerChain().(*meer.MeerChain).MeerPool().AddTx(tx)
if err != nil {
return nil, nil, err
}

txD := mp.addTransaction(nil, tx, nextBlockHeight, fee)

log.Debug(fmt.Sprintf("Accepted meerevm transaction ,txHash(qng):%s ,pool size:%d , fee:%d", txHash, len(mp.pool), fee))
return nil, txD, nil
}
// Fetch all of the unspent transaction outputs referenced by the inputs
// to this transaction. This function also attempts to fetch the
Expand Down Expand Up @@ -1170,7 +1184,7 @@ func (mp *TxPool) HaveTransaction(hash *hash.Hash) bool {

func (mp *TxPool) haveTransaction(hash *hash.Hash) bool {
// Protect concurrent access.
haveTx := mp.isTransactionInPool(hash, false) || mp.isOrphanInPool(hash)
haveTx := mp.isTransactionInPool(hash, true) || mp.isOrphanInPool(hash)
return haveTx
}

Expand Down
10 changes: 6 additions & 4 deletions services/notifymgr/notifymgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/Qitmeer/qng/rpc"
"github.com/Qitmeer/qng/services/notifymgr/notify"
"github.com/Qitmeer/qng/services/zmq"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/libp2p/go-libp2p/core/peer"
"sync"
"time"
Expand Down Expand Up @@ -47,7 +46,7 @@ type NotifyMgr struct {
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
// are added to the mempool.
func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*etypes.Transaction, filters []peer.ID) {
func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, meerTxs []*types.TxDesc, filters []peer.ID) {
if ntmgr.IsShutdown() {
return
}
Expand Down Expand Up @@ -181,17 +180,20 @@ func (ntmgr *NotifyMgr) handleStallSample() {
case *types.TxDesc:
log.Trace(fmt.Sprintf("Announce new transaction :hash=%s height=%d add=%s", value.Tx.Hash().String(), value.Height, value.Added.String()))
txds = append(txds, value.Tx)
if types.IsCrossChainVMTx(value.Tx.Tx) &&
!ntmgr.Server.Consensus().Config().TransferVer1Txs {
continue
}
nds = append(nds, nd)
case types.BlockHeader:
nds = append(nds, nd)
case *etypes.Transaction:
txds = append(txds, value)
}

}
ntmgr.Server.RelayInventory(nds)

if len(txds) > 0 {

if ntmgr.RpcServer != nil && ntmgr.RpcServer.IsStarted() && !ntmgr.RpcServer.IsShutdown() {
ntmgr.RpcServer.NotifyNewTransactions(txds)
}
Expand Down
Loading