Skip to content

Commit

Permalink
Merge pull request #618 from lochjin/dev1.2
Browse files Browse the repository at this point in the history
Performance optimization of parallel block generation between multiple nodes
  • Loading branch information
dindinw authored Mar 1, 2024
2 parents 1c3fd2a + b654902 commit 1b2cdc5
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 15 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Config struct {
BlockPrioritySize uint32 `long:"blockprioritysize" description:"Size in bytes for high-priority/low-fee transactions when creating a block"`
miningAddrs []types.Address
GBTNotify []string `long:"gbtnotify" description:"HTTP URL list to be notified of new block template"`
ObsoleteHeight int `long:"obsoleteheight" description:"What is the maximum allowable height of block obsolescence for submission"`
SubmitNoSynced bool `long:"allowsubmitwhennotsynced" description:"Allow the node to accept blocks from RPC while not synced (this flag is mainly used for testing)"`

//WebSocket support
RPCMaxWebsockets int `long:"rpcmaxwebsockets" description:"Max number of RPC websocket connections"`
Expand Down
20 changes: 20 additions & 0 deletions consensus/forks/emptyblockfork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package forks

import (
"github.com/Qitmeer/qng/common/math"
"github.com/Qitmeer/qng/core/protocol"
"github.com/Qitmeer/qng/params"
)

const (
// TODO:Future decision on whether to start
// Should we abolish the consensus restriction strategy when empty blocks appear
EmptyBlockForkHeight = math.MaxInt64
)

func IsEmptyBlockForkHeight(mainHeight int64) bool {
if params.ActiveNetParams.Net != protocol.MainNet {
return true
}
return mainHeight >= EmptyBlockForkHeight
}
28 changes: 28 additions & 0 deletions core/blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type BlockChain struct {

meerChain *meer.MeerChain
difficultyManager model.DifficultyManager

processQueueMap sync.Map
}

func (b *BlockChain) Init() error {
Expand Down Expand Up @@ -430,6 +432,23 @@ func (b *BlockChain) isCurrent() bool {
return lastNode.GetTimestamp() >= minus24Hours
}

func (b *BlockChain) IsNearlySynced() bool {
lastBlock := b.bd.GetMainChainTip()
if lastBlock.GetID() == 0 {
return true
}
checkpoint := b.LatestCheckpoint()
if checkpoint != nil && uint64(lastBlock.GetLayer()) < checkpoint.Layer {
return false
}
lastNode := b.GetBlockNode(lastBlock)
if lastNode == nil {
return false
}
startTargetTime := b.timeSource.AdjustedTime().Add(-b.params.TargetTimespan).Unix()
return lastNode.GetTimestamp() >= startTargetTime
}

// TipGeneration returns the entire generation of blocks stemming from the
// parent of the current tip.
//
Expand Down Expand Up @@ -1016,6 +1035,15 @@ func (b *BlockChain) Consensus() model.Consensus {
return b.consensus
}

func (b *BlockChain) ProcessQueueSize() int {
size := 0
b.processQueueMap.Range(func(key, value any) bool {
size++
return true
})
return size
}

// New returns a BlockChain instance using the provided configuration details.
func New(consensus model.Consensus) (*BlockChain, error) {
// Enforce required config fields.
Expand Down
6 changes: 6 additions & 0 deletions core/blockchain/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ func (b *BlockChain) ProcessBlock(block *types.SerializedBlock, flags BehaviorFl
return nil, false, fmt.Errorf("block chain is shutdown")
}
block.Reset()
bh := *block.Hash()
if _, ok := b.processQueueMap.Load(bh); ok {
return nil, false, fmt.Errorf("Already exists in the queue:%s", block.Hash().String())
}
b.processQueueMap.Store(bh, nil)
msg := processMsg{block: block, flags: flags, result: make(chan *processResult), source: source}
b.msgChan <- &msg
result := <-msg.result
b.processQueueMap.Delete(bh)
return result.block, result.isOrphan, result.err
}

Expand Down
16 changes: 9 additions & 7 deletions core/json/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ type MeerDAGInfoResult struct {
}

type ChainInfoResult struct {
Count uint64 `json:"count"`
Start string `json:"start"`
End string `json:"end"`
BlocksPerSecond float64 `json:"blockspersecond"`
TxsPerSecond float64 `json:"txspersecond"`
SecondPerHeight string `json:"secondperheight"`
Concurrency float64 `json:"concurrency"`
Count uint64 `json:"count"`
Start string `json:"start"`
End string `json:"end"`
BlocksPerSecond float64 `json:"blockspersecond"`
TxsPerSecond float64 `json:"txspersecond"`
SecondPerHeight string `json:"secondperheight"`
Concurrency float64 `json:"concurrency"`
EmptyBlockRate string `json:"emptyblockrate"`
ProcessQueueSize int32 `json:"processqueuesize"`
}
4 changes: 4 additions & 0 deletions meerdag/meerdag.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/Qitmeer/qng/common/hash"
"github.com/Qitmeer/qng/common/roughtime"
"github.com/Qitmeer/qng/consensus/forks"
"github.com/Qitmeer/qng/consensus/model"
l "github.com/Qitmeer/qng/log"
"github.com/Qitmeer/qng/meerdag/anticone"
Expand Down Expand Up @@ -924,6 +925,9 @@ func (bd *MeerDAG) checkLegality(parentsNode []IBlock) bool {

// Checking the priority of block legitimacy
func (bd *MeerDAG) checkPriority(parents []IBlock, b IBlockData) bool {
if forks.IsEmptyBlockForkHeight(int64(parents[0].GetHeight()) + 1) {
return true
}
if b.GetPriority() <= 0 {
return false
}
Expand Down
4 changes: 4 additions & 0 deletions meerdag/tips.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package meerdag
import (
"fmt"
"github.com/Qitmeer/qng/common/hash"
"github.com/Qitmeer/qng/consensus/forks"
"github.com/Qitmeer/qng/core/merkle"
"math"
)
Expand Down Expand Up @@ -47,6 +48,9 @@ func (bd *MeerDAG) GetValidTips(expectPriority int) []*hash.Hash {

result := []*hash.Hash{tips[0].GetHash()}
epNum := expectPriority
if forks.IsEmptyBlockForkHeight(int64(tips[0].GetHeight()) + 1) {
epNum = MaxPriority
}
for k, v := range tips {
if k == 0 {
if bd.GetBlockData(v).GetPriority() <= 1 {
Expand Down
13 changes: 12 additions & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (api *PublicBlockChainAPI) GetChainInfo(lastCount int) (interface{}, error)
var blockNode *blockchain.BlockNode
info := json.ChainInfoResult{Count: 0, End: fmt.Sprintf("%s (order:%d)", mainTip.GetHash().String(), mainTip.GetOrder())}
totalTxs := 0
emptyBlocks := 0
err := md.Foreach(mainTip, uint(count), meerdag.All, func(block meerdag.IBlock) (bool, error) {
if block.GetID() <= 0 {
return true, nil
Expand All @@ -261,6 +262,9 @@ func (api *PublicBlockChainAPI) GetChainInfo(lastCount int) (interface{}, error)
}
info.Count++
totalTxs += blockNode.GetPriority()
if blockNode.GetPriority() <= 1 {
emptyBlocks++
}
start = block
return true, nil
})
Expand All @@ -274,6 +278,9 @@ func (api *PublicBlockChainAPI) GetChainInfo(lastCount int) (interface{}, error)
if totalTxs < 0 {
totalTxs = 0
}
if blockNode.GetPriority() <= 1 {
emptyBlocks--
}
info.Start = fmt.Sprintf("%s (order:%d)", start.GetHash().String(), start.GetOrder())
startNode := api.node.GetBlockChain().GetBlockHeader(start)
if startNode == nil {
Expand All @@ -284,6 +291,9 @@ func (api *PublicBlockChainAPI) GetChainInfo(lastCount int) (interface{}, error)
return nil, fmt.Errorf("No block:%s", mainTip.GetHash().String())
}
totalTxs += endNode.GetPriority()
if endNode.GetPriority() <= 1 {
emptyBlocks++
}
totalTime := endNode.GetTimestamp() - startNode.Timestamp.Unix()
if totalTime < 0 {
totalTime = 0
Expand All @@ -303,7 +313,8 @@ func (api *PublicBlockChainAPI) GetChainInfo(lastCount int) (interface{}, error)
info.SecondPerHeight = secondPerHeight.String()
info.Concurrency = float64(info.Count) / float64(totalHeight)
}

info.EmptyBlockRate = fmt.Sprintf("%d%%", uint64(emptyBlocks*100)/info.Count)
info.ProcessQueueSize = int32(api.node.GetBlockChain().ProcessQueueSize())
return info, nil
}

Expand Down
5 changes: 4 additions & 1 deletion p2p/synch/getblockdatas.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,12 @@ func (s *Sync) broadcastBlockHandler(ctx context.Context, msg interface{}, strea
}
peid := pe.GetID()
go func() {
if s.p2p.BlockChain().BlockDAG().HasBlock(block.Hash()) {
return
}
_, _, err = s.p2p.BlockChain().ProcessBlock(block, blockchain.BFBroadcast, &peid)
if err != nil {
log.Error("Failed to process block", "hash", block.Hash(), "error", err)
log.Trace("Failed to process block", "hash", block.Hash(), "error", err)
}
}()
ret = 1
Expand Down
4 changes: 3 additions & 1 deletion p2p/synch/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func (s *Sync) tryToSendInventoryRequest(pe *peers.Peer, invs []*pb.InvVect) err

if len(invMsg.Invs) >= MaxInvPerMsg ||
(i == (len(invs)-1) && len(invMsg.Invs) > 0) {
go s.Send(pe, RPCInventory, invMsg)
go func(msg *pb.Inventory) {
s.Send(pe, RPCInventory, msg)
}(invMsg)
invMsg = nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion params/params_privnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var PrivNetParams = Params{
WorkDiffWindowSize: 160,
WorkDiffWindows: 20,
TargetTimePerBlock: time.Second * privTargetTimePerBlock,
TargetTimespan: time.Second * privTargetTimePerBlock * 16, // TimePerBlock * WindowSize
TargetTimespan: time.Second * privTargetTimePerBlock * 160, // TimePerBlock * WindowSize
RetargetAdjustmentFactor: 2,

// Subsidy parameters.
Expand Down
15 changes: 15 additions & 0 deletions services/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
defaultMinBlockPruneSize = 2000
defaultMinBlockDataCache = 2000
defaultMinRelayTxFee = int64(1e4)
defaultObsoleteHeight = 5
)
const (
defaultSigCacheMaxSize = 100000
Expand Down Expand Up @@ -619,6 +620,18 @@ var (
Usage: "Scheme to use for storing ethereum state ('hash' or 'path')",
Destination: &cfg.StateScheme,
},
&cli.IntFlag{
Name: "obsoleteheight",
Usage: "What is the maximum allowable height of block obsolescence for submission",
Value: defaultObsoleteHeight,
Destination: &cfg.ObsoleteHeight,
},
&cli.BoolFlag{
Name: "allowsubmitwhennotsynced",
Usage: "Allow the node to accept blocks from RPC while not synced (this flag is mainly used for testing)",
Value: false,
Destination: &cfg.SubmitNoSynced,
},
}
)

Expand Down Expand Up @@ -653,6 +666,8 @@ func DefaultConfig(homeDir string) *config.Config {
AcceptNonStd: true,
RPCUser: defaultRPCUser,
RPCPass: defaultRPCPass,
ObsoleteHeight: defaultObsoleteHeight,
SubmitNoSynced: false,
}
if len(homeDir) > 0 {
hd, err := filepath.Abs(homeDir)
Expand Down
40 changes: 36 additions & 4 deletions services/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (m *Miner) submitBlock(block *types.SerializedBlock) (interface{}, error) {
defer m.submitLocker.Unlock()
m.totalSubmit++

err := m.consensus.BlockChain().(*blockchain.BlockChain).BlockDAG().CheckSubMainChainTip(block.Block().Parents)
err := m.CheckSubMainChainTip(block.Block().Parents)
if err != nil {
go m.BlockChainChange()
return nil, fmt.Errorf("The tips of block is expired:%s (error:%s)\n", block.Hash().String(), err.Error())
Expand Down Expand Up @@ -685,9 +685,11 @@ func (m *Miner) submitBlockHeader(header *types.BlockHeader, extraNonce uint64)
}

func (m *Miner) CanMining() error {
currentOrder := m.BlockChain().BestSnapshot().GraphState.GetTotal() - 1
if currentOrder != 0 && !m.p2pSer.IsCurrent() {
log.Trace("Client in initial download, qitmeer is downloading blocks...")
if m.cfg.SubmitNoSynced {
return nil
}
if !m.BlockChain().IsNearlySynced() {
log.Warn("Client in initial download, qitmeer is downloading blocks...")
return rpc.RPCClientInInitialDownloadError("Client in initial download ",
"qitmeer is downloading blocks...")
}
Expand Down Expand Up @@ -902,6 +904,36 @@ func (m *Miner) BlockChain() *blockchain.BlockChain {
return m.consensus.BlockChain().(*blockchain.BlockChain)
}

// Checking the sub main chain for the parents of tip
func (m *Miner) CheckSubMainChainTip(parents []*hash.Hash) error {
if len(parents) == 0 {
return fmt.Errorf("Parents is empty")
}
var mt meerdag.IBlock
for k, pa := range parents {
ib := m.BlockChain().BlockDAG().GetBlock(pa)
if ib == nil {
return fmt.Errorf("Parent(%s) is overdue\n", pa.String())
}
if k == 0 {
mt = ib
}
}
if mt == nil {
return fmt.Errorf("No main tip:%v", parents)
}
mainTip := m.BlockChain().BlockDAG().GetMainChainTip()
if mt.GetHeight() >= mainTip.GetHeight() {
return nil
}
distance := mainTip.GetHeight() - mt.GetHeight()
if distance > uint(m.cfg.ObsoleteHeight) {
return fmt.Errorf("main chain tip is overdue,submit main parent:%v (%d), but main tip is :%v (%d). ObsoleteHeight:%d\n",
mt.GetHash().String(), mt.GetHeight(), mainTip.GetHash().String(), mainTip.GetHeight(), m.cfg.ObsoleteHeight)
}
return nil
}

func NewMiner(consensus model.Consensus, policy *mining.Policy, txpool *mempool.TxPool, p2pSer model.P2PService) *Miner {
m := Miner{
msgChan: make(chan interface{}),
Expand Down

0 comments on commit 1b2cdc5

Please sign in to comment.