Skip to content

Commit

Permalink
Revert improvements. (#4520)
Browse files Browse the repository at this point in the history
  • Loading branch information
Frozen authored Oct 17, 2023
1 parent 370d122 commit cf5dd8b
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 89 deletions.
27 changes: 9 additions & 18 deletions api/service/stagedsync/sync_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ const (
downloadBlocksRetryLimit = 3 // downloadBlocks service retry limit
RegistrationNumber = 3
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
verifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
LastMileBlocksSize = 50

// after cutting off a number of connected peers, the result number of peers
Expand Down Expand Up @@ -53,14 +52,6 @@ type SyncPeerConfig struct {
failedTimes uint64
}

// CreateTestSyncPeerConfig used for testing.
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig {
return &SyncPeerConfig{
client: client,
blockHashes: blockHashes,
}
}

// GetClient returns client pointer of downloader.Client
func (peerConfig *SyncPeerConfig) GetClient() *downloader.Client {
return peerConfig.client
Expand Down Expand Up @@ -303,21 +294,21 @@ func (sc *SyncConfig) FindPeerByHash(peerHash []byte) *SyncPeerConfig {
// getHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
// Caller shall ensure mtx is locked for reading.
func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
func getHowManyMaxConsensus(peers []*SyncPeerConfig) (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
if len(sc.peers) == 0 {
if len(peers) == 0 {
return -1, 0
} else if len(sc.peers) == 1 {
} else if len(peers) == 1 {
return 0, 1
}
maxFirstID := len(sc.peers) - 1
maxFirstID := len(peers) - 1
for i := maxFirstID - 1; i >= 0; i-- {
if CompareSyncPeerConfigByblockHashes(sc.peers[maxFirstID], sc.peers[i]) != 0 {
if CompareSyncPeerConfigByblockHashes(peers[maxFirstID], peers[i]) != 0 {
break
}
maxFirstID = i
}
maxCount := len(sc.peers) - maxFirstID
maxCount := len(peers) - maxFirstID
return maxFirstID, maxCount
}

Expand Down Expand Up @@ -386,7 +377,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp(bgMode bool) error {
sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
})
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
maxFirstID, maxCount := getHowManyMaxConsensus(sc.peers)
if maxFirstID == -1 {
return errors.New("invalid peer index -1 for block hashes query")
}
Expand Down
109 changes: 61 additions & 48 deletions cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,29 @@ func setupNodeLog(config harmonyconfig.HarmonyConfig) {
}
}

func revert(chain core.BlockChain, hc harmonyconfig.HarmonyConfig) {
curNum := chain.CurrentBlock().NumberU64()
if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) {
// Remove invalid blocks
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)
}
fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64())
utils.Logger().Warn().
Uint64("Current Block", chain.CurrentBlock().NumberU64()).
Msg("Revert finished.")
os.Exit(1)
}
}

func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
var err error

Expand Down Expand Up @@ -353,26 +376,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
if hc.Revert.RevertBeacon {
chain = currentNode.Beaconchain()
}
curNum := chain.CurrentBlock().NumberU64()
if curNum < uint64(hc.Revert.RevertBefore) && curNum >= uint64(hc.Revert.RevertTo) {
// Remove invalid blocks
for chain.CurrentBlock().NumberU64() >= uint64(hc.Revert.RevertTo) {
curBlock := chain.CurrentBlock()
rollbacks := []ethCommon.Hash{curBlock.Hash()}
if err := chain.Rollback(rollbacks); err != nil {
fmt.Printf("Revert failed: %v\n", err)
os.Exit(1)
}
lastSig := curBlock.Header().LastCommitSignature()
sigAndBitMap := append(lastSig[:], curBlock.Header().LastCommitBitmap()...)
chain.WriteCommitSig(curBlock.NumberU64()-1, sigAndBitMap)
}
fmt.Printf("Revert finished. Current block: %v\n", chain.CurrentBlock().NumberU64())
utils.Logger().Warn().
Uint64("Current Block", chain.CurrentBlock().NumberU64()).
Msg("Revert finished.")
os.Exit(1)
}
revert(chain, hc)
}

//// code to handle pre-image export, import and generation
Expand Down Expand Up @@ -727,31 +731,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
return nodeConfig, nil
}

func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
if hc.Consensus != nil {
minPeers = hc.Consensus.MinPeers
aggregateSig = hc.Consensus.AggregateSig
} else {
minPeers = defaultConsensusConfig.MinPeers
aggregateSig = defaultConsensusConfig.AggregateSig
}

blacklist, err := setupBlacklist(hc)
if err != nil {
utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error())
}
allowedTxs, err := setupAllowedTxs(hc)
if err != nil {
utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error())
}

localAccounts, err := setupLocalAccounts(hc, blacklist)
if err != nil {
utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error())
}
func setupChain(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *registry.Registry {

// Current node.
var chainDBFactory shardchain.DBFactory
Expand All @@ -770,6 +750,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
}

engine := chain.NewEngine()
registry.SetEngine(engine)

chainConfig := nodeConfig.GetNetworkType().ChainConfig()
collection := shardchain.NewCollection(
Expand All @@ -780,6 +761,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
collection.DisableCache(shardID)
}
}
registry.SetShardChainCollection(collection)

var blockchain core.BlockChain

Expand All @@ -793,17 +775,48 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
registry.SetBeaconchain(beacon)
}

blockchain, err = collection.ShardChain(nodeConfig.ShardID)
blockchain, err := collection.ShardChain(nodeConfig.ShardID)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
registry.SetBlockchain(blockchain)
registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
if registry.GetBeaconchain() == nil {
registry.SetBeaconchain(registry.GetBlockchain())
}
return registry
}

func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfig.ConfigType, registry *registry.Registry) *node.Node {
// Parse minPeers from harmonyconfig.HarmonyConfig
var minPeers int
var aggregateSig bool
if hc.Consensus != nil {
minPeers = hc.Consensus.MinPeers
aggregateSig = hc.Consensus.AggregateSig
} else {
minPeers = defaultConsensusConfig.MinPeers
aggregateSig = defaultConsensusConfig.AggregateSig
}

blacklist, err := setupBlacklist(hc)
if err != nil {
utils.Logger().Warn().Msgf("Blacklist setup error: %s", err.Error())
}
allowedTxs, err := setupAllowedTxs(hc)
if err != nil {
utils.Logger().Warn().Msgf("AllowedTxs setup error: %s", err.Error())
}
localAccounts, err := setupLocalAccounts(hc, blacklist)
if err != nil {
utils.Logger().Warn().Msgf("local accounts setup error: %s", err.Error())
}

registry = setupChain(hc, nodeConfig, registry)
if registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil1111111")
}
registry.SetWebHooks(nodeConfig.WebHooks.Hooks)
cxPool := core.NewCxPool(core.CxPoolSize)
registry.SetCxPool(cxPool)

Expand All @@ -818,7 +831,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
os.Exit(1)
}

currentNode := node.New(myHost, currentConsensus, engine, collection, blacklist, allowedTxs, localAccounts, nodeConfig.ArchiveModes(), &hc, registry)
currentNode := node.New(myHost, currentConsensus, blacklist, allowedTxs, localAccounts, &hc, registry)

if hc.Legacy != nil && hc.Legacy.TPBroadcastInvalidTxn != nil {
currentNode.BroadcastInvalidTx = *hc.Legacy.TPBroadcastInvalidTxn
Expand Down
7 changes: 5 additions & 2 deletions core_test/shardchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func TestAddNewBlock(t *testing.T) {
if err != nil {
t.Fatal("cannot get blockchain")
}
reg := registry.New().SetBlockchain(blockchain)
reg := registry.New().
SetBlockchain(blockchain).
SetEngine(engine).
SetShardChainCollection(collection)
consensus, err := consensus.New(
host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false,
)
Expand All @@ -57,7 +60,7 @@ func TestAddNewBlock(t *testing.T) {
}
nodeconfig.SetNetworkType(nodeconfig.Testnet)
var block *types.Block
node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg)
node := node.New(host, consensus, nil, nil, nil, nil, reg)
commitSigs := make(chan []byte, 1)
commitSigs <- []byte{}
block, err = node.Worker.FinalizeNewBlock(
Expand Down
38 changes: 38 additions & 0 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package registry
import (
"sync"

"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/webhooks"
)

Expand All @@ -16,6 +18,8 @@ type Registry struct {
txPool *core.TxPool
cxPool *core.CxPool
isBackup bool
engine engine.Engine
collection *shardchain.CollectionImpl
}

// New creates a new registry.
Expand Down Expand Up @@ -122,3 +126,37 @@ func (r *Registry) GetCxPool() *core.CxPool {

return r.cxPool
}

// SetEngine sets the engine to registry.
func (r *Registry) SetEngine(engine engine.Engine) *Registry {
r.mu.Lock()
defer r.mu.Unlock()

r.engine = engine
return r
}

// GetEngine gets the engine from registry.
func (r *Registry) GetEngine() engine.Engine {
r.mu.Lock()
defer r.mu.Unlock()

return r.engine
}

// SetShardChainCollection sets the shard chain collection to registry.
func (r *Registry) SetShardChainCollection(collection *shardchain.CollectionImpl) *Registry {
r.mu.Lock()
defer r.mu.Unlock()

r.collection = collection
return r
}

// GetShardChainCollection gets the shard chain collection from registry.
func (r *Registry) GetShardChainCollection() *shardchain.CollectionImpl {
r.mu.Lock()
defer r.mu.Unlock()

return r.collection
}
18 changes: 7 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/internal/registry"
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
Expand Down Expand Up @@ -49,7 +48,6 @@ import (
common2 "github.com/harmony-one/harmony/internal/common"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
Expand Down Expand Up @@ -104,8 +102,7 @@ type Node struct {
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
pendingCXMutex sync.Mutex
crosslinks *crosslinks.Crosslinks // Memory storage for crosslink processing.
// Shard databases
shardChains shardchain.Collection

SelfPeer p2p.Peer
stateMutex sync.Mutex // mutex for change node state
TxPool *core.TxPool
Expand Down Expand Up @@ -193,7 +190,10 @@ func (node *Node) Beaconchain() core.BlockChain {
}

func (node *Node) chain(shardID uint32, options core.Options) core.BlockChain {
bc, err := node.shardChains.ShardChain(shardID, options)
if node.registry.GetShardChainCollection() == nil {
panic("shard chain collection is nil")
}
bc, err := node.registry.GetShardChainCollection().ShardChain(shardID, options)
if err != nil {
utils.Logger().Error().Err(err).Msg("cannot get beaconchain")
}
Expand Down Expand Up @@ -1026,12 +1026,9 @@ func (node *Node) GetSyncID() [SyncIDLength]byte {
func New(
host p2p.Host,
consensusObj *consensus.Consensus,
engine engine.Engine,
collection *shardchain.CollectionImpl,
blacklist map[common.Address]struct{},
allowedTxs map[common.Address]core.AllowedTxData,
localAccounts []common.Address,
isArchival map[uint32]bool,
harmonyconfig *harmonyconfig.HarmonyConfig,
registry *registry.Registry,
) *Node {
Expand All @@ -1058,7 +1055,6 @@ func New(
networkType := node.NodeConfig.GetNetworkType()
chainConfig := networkType.ChainConfig()
node.chainConfig = chainConfig
node.shardChains = collection
node.IsSynchronized = abool.NewBool(false)

if host != nil {
Expand All @@ -1081,9 +1077,9 @@ func New(
if b2 {
shardID := node.NodeConfig.ShardID
// HACK get the real error reason
_, err = node.shardChains.ShardChain(shardID)
_, err = node.registry.GetShardChainCollection().ShardChain(shardID)
} else {
_, err = node.shardChains.ShardChain(shard.BeaconChainShardID)
_, err = node.registry.GetShardChainCollection().ShardChain(shard.BeaconChainShardID)
}
fmt.Fprintf(os.Stderr, "Cannot initialize node: %v\n", err)
os.Exit(-1)
Expand Down
Loading

0 comments on commit cf5dd8b

Please sign in to comment.