diff --git a/api/service/legacysync/syncing.go b/api/service/legacysync/syncing.go index 3375ccdc50..44c5de64a2 100644 --- a/api/service/legacysync/syncing.go +++ b/api/service/legacysync/syncing.go @@ -25,7 +25,6 @@ import ( "github.com/harmony-one/harmony/internal/chain" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" libp2p_peer "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" @@ -932,7 +931,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain } // generateNewState will construct most recent state from downloaded blocks -func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker) error { +func (ss *StateSync) generateNewState(bc core.BlockChain) error { // update blocks created before node start sync parentHash := bc.CurrentBlock().Hash() @@ -995,7 +994,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker) } // ProcessStateSync processes state sync from the blocks received but not yet processed so far -func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain, worker *worker.Worker) error { +func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain) error { // Gets consensus hashes. if err := ss.getConsensusHashes(startHash, size); err != nil { return errors.Wrap(err, "getConsensusHashes") @@ -1005,7 +1004,7 @@ func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.Blo if ss.stateSyncTaskQueue.Len() > 0 { ss.downloadBlocks(bc) } - return ss.generateNewState(bc, worker) + return ss.generateNewState(bc) } func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error { @@ -1076,7 +1075,7 @@ func (ss *StateSync) GetMaxPeerHeight() (uint64, error) { } // SyncLoop will keep syncing with peers until catches up -func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) { +func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) { utils.Logger().Info().Msgf("legacy sync is executing ...") if !isBeacon { ss.RegisterNodeInfo() @@ -1110,7 +1109,7 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco if size > SyncLoopBatchSize { size = SyncLoopBatchSize } - err := ss.ProcessStateSync(startHash[:], size, bc, worker) + err := ss.ProcessStateSync(startHash[:], size, bc) if err != nil { utils.Logger().Error().Err(err). Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", diff --git a/api/service/stagedsync/syncing.go b/api/service/stagedsync/syncing.go index a22a4e9253..623ea476d9 100644 --- a/api/service/stagedsync/syncing.go +++ b/api/service/stagedsync/syncing.go @@ -11,7 +11,6 @@ import ( "github.com/harmony-one/harmony/core" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" @@ -163,7 +162,7 @@ func initDB(ctx context.Context, db kv.RwDB) error { } // SyncLoop will keep syncing with peers until catches up -func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) { +func (s *StagedSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) { utils.Logger().Info(). Uint64("current height", bc.CurrentBlock().NumberU64()). @@ -204,7 +203,7 @@ func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco } startTime := time.Now() - if err := s.runSyncCycle(bc, worker, isBeacon, consensus, maxPeersHeight); err != nil { + if err := s.runSyncCycle(bc, isBeacon, consensus, maxPeersHeight); err != nil { utils.Logger().Error(). Err(err). Bool("isBeacon", isBeacon). @@ -266,7 +265,7 @@ func (s *StagedSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco } // runSyncCycle will run one cycle of staged syncing -func (s *StagedSync) runSyncCycle(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error { +func (s *StagedSync) runSyncCycle(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, maxPeersHeight uint64) error { canRunCycleInOneTransaction := s.MaxBlocksPerSyncCycle > 0 && s.MaxBlocksPerSyncCycle <= s.MaxMemSyncCycleSize var tx kv.RwTx if canRunCycleInOneTransaction { diff --git a/core/state_processor.go b/core/state_processor.go index 49e22461ec..3e58ddbcc4 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -364,7 +364,7 @@ func ApplyStakingTransaction( vmenv := vm.NewEVM(context, statedb, config, cfg) // Apply the transaction to the current state (included in the env) - gas, err = ApplyStakingMessage(vmenv, msg, gp, bc) + gas, err = ApplyStakingMessage(vmenv, msg, gp) if err != nil { return nil, 0, err } diff --git a/core/state_transition.go b/core/state_transition.go index 93e7f2322d..27c3590745 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -76,7 +76,6 @@ type StateTransition struct { data []byte state vm.StateDB evm *vm.EVM - bc ChainContext } // Message represents a message sent to a contract. @@ -131,7 +130,7 @@ func (result *ExecutionResult) Revert() []byte { } // NewStateTransition initialises and returns a new state transition object. -func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) *StateTransition { +func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool) *StateTransition { return &StateTransition{ gp: gp, evm: evm, @@ -140,7 +139,6 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) value: msg.Value(), data: msg.Data(), state: evm.StateDB, - bc: bc, } } @@ -152,12 +150,12 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) // indicates a core error meaning that the message would always fail for that particular // state and would never be accepted within a block. func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) (ExecutionResult, error) { - return NewStateTransition(evm, msg, gp, nil).TransitionDb() + return NewStateTransition(evm, msg, gp).TransitionDb() } // ApplyStakingMessage computes the new state for staking message -func ApplyStakingMessage(evm *vm.EVM, msg Message, gp *GasPool, bc ChainContext) (uint64, error) { - return NewStateTransition(evm, msg, gp, bc).StakingTransitionDb() +func ApplyStakingMessage(evm *vm.EVM, msg Message, gp *GasPool) (uint64, error) { + return NewStateTransition(evm, msg, gp).StakingTransitionDb() } // to returns the recipient of the message. diff --git a/core/state_transition_test.go b/core/state_transition_test.go index 2a335ef779..665549f9d9 100644 --- a/core/state_transition_test.go +++ b/core/state_transition_test.go @@ -76,7 +76,7 @@ func testApplyStakingMessage(test applyStakingMessageTest, t *testing.T) { vmenv := vm.NewEVM(ctx, db, params.TestChainConfig, vm.Config{}) // run the staking tx - _, err := ApplyStakingMessage(vmenv, msg, gp, chain) + _, err := ApplyStakingMessage(vmenv, msg, gp) if err != nil { if test.expectedError == nil { t.Errorf(fmt.Sprintf("Got error %v but expected none", err)) @@ -193,7 +193,7 @@ func TestCollectGasRounding(t *testing.T) { vmenv := vm.NewEVM(ctx, db, params.TestChainConfig, vm.Config{}) gasPool := new(GasPool).AddGas(math.MaxUint64) - st := NewStateTransition(vmenv, msg, gasPool, nil) + st := NewStateTransition(vmenv, msg, gasPool) // buy gas to set initial gas to 5: gasLimit * gasPrice if err := st.buyGas(); err != nil { t.Fatal(err) diff --git a/node/node.go b/node/node.go index 8a75e7ce6f..b3026bbeb1 100644 --- a/node/node.go +++ b/node/node.go @@ -89,7 +89,7 @@ type ISync interface { AddLastMileBlock(block *types.Block) GetActivePeerNumber() int CreateSyncConfig(peers []p2p.Peer, shardID uint32, selfPeerID libp2p_peer.ID, waitForEachPeerToConnect bool) error - SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) + SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) IsSynchronized() bool IsSameBlockchainHeight(bc core.BlockChain) (uint64, bool) AddNewBlock(peerHash []byte, block *types.Block) diff --git a/node/node_newblock.go b/node/node_newblock.go index a342ee14ca..4529cd7ab3 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -1,7 +1,6 @@ package node import ( - "errors" "sort" "strings" "time" @@ -9,6 +8,7 @@ import ( "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/crypto/bls" + "github.com/pkg/errors" staking "github.com/harmony-one/harmony/staking/types" @@ -116,16 +116,18 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) utils.AnalysisStart("ProposeNewBlock", nowEpoch, blockNow) defer utils.AnalysisEnd("ProposeNewBlock", nowEpoch, blockNow) - node.Worker.UpdateCurrent() - - header := node.Worker.GetCurrentHeader() // Update worker's current header and // state data in preparation to propose/process new transactions - leaderKey := node.Consensus.GetLeaderPubKey() + env, err := node.Worker.UpdateCurrent() + if err != nil { + return nil, errors.Wrap(err, "failed to update worker") + } + var ( + header = env.CurrentHeader() + leaderKey = node.Consensus.GetLeaderPubKey() coinbase = node.GetAddressForBLSKey(leaderKey.Object, header.Epoch()) beneficiary = coinbase - err error ) // After staking, all coinbase will be the address of bls pub key @@ -134,8 +136,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) coinbase.SetBytes(blsPubKeyBytes[:]) } - emptyAddr := common.Address{} - if coinbase == emptyAddr { + if coinbase == (common.Address{}) { return nil, errors.New("[ProposeNewBlock] Failed setting coinbase") } diff --git a/node/node_syncing.go b/node/node_syncing.go index 68c3338362..fa90ec5c78 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -29,7 +29,6 @@ import ( "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/shard" ) @@ -269,7 +268,7 @@ func (node *Node) doBeaconSyncing() { } // DoSyncing keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up -func (node *Node) DoSyncing(bc core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { +func (node *Node) DoSyncing(bc core.BlockChain, willJoinConsensus bool) { if node.NodeConfig.IsOffline { return } @@ -280,15 +279,15 @@ func (node *Node) DoSyncing(bc core.BlockChain, worker *worker.Worker, willJoinC for { select { case <-ticker.C: - node.doSync(bc, worker, willJoinConsensus) + node.doSync(bc, willJoinConsensus) case <-node.Consensus.BlockNumLowChan: - node.doSync(bc, worker, willJoinConsensus) + node.doSync(bc, willJoinConsensus) } } } // doSync keep the node in sync with other peers, willJoinConsensus means the node will try to join consensus after catch up -func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinConsensus bool) { +func (node *Node) doSync(bc core.BlockChain, willJoinConsensus bool) { syncInstance := node.SyncInstance() if syncInstance.GetActivePeerNumber() < legacysync.NumPeersLowBound { @@ -317,7 +316,7 @@ func (node *Node) doSync(bc core.BlockChain, worker *worker.Worker, willJoinCons node.Consensus.BlocksNotSynchronized() } isBeacon := bc.ShardID() == shard.BeaconChainShardID - syncInstance.SyncLoop(bc, worker, isBeacon, node.Consensus, legacysync.LoopMinTime) + syncInstance.SyncLoop(bc, isBeacon, node.Consensus, legacysync.LoopMinTime) if willJoinConsensus { node.IsSynchronized.Set() node.Consensus.BlocksSynchronized() @@ -388,7 +387,7 @@ func (node *Node) supportSyncing() { utils.Logger().Debug().Msg("[SYNC] initialized state for staged sync") } - go node.DoSyncing(node.Blockchain(), node.Worker, joinConsensus) + go node.DoSyncing(node.Blockchain(), joinConsensus) } // InitSyncingServer starts downloader server. diff --git a/node/worker/types.go b/node/worker/types.go new file mode 100644 index 0000000000..87b7195dd3 --- /dev/null +++ b/node/worker/types.go @@ -0,0 +1,7 @@ +package worker + +import "github.com/harmony-one/harmony/block" + +type Environment interface { + CurrentHeader() *block.Header +} diff --git a/node/worker/worker.go b/node/worker/worker.go index 8dfc2018a4..9e299281a3 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -7,10 +7,8 @@ import ( "sort" "time" - "github.com/harmony-one/harmony/consensus/reward" - "github.com/harmony-one/harmony/consensus" - + "github.com/harmony-one/harmony/consensus/reward" "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/crypto/hash" @@ -50,6 +48,10 @@ type environment struct { stakeMsgs []staking.StakeMsg } +func (env *environment) CurrentHeader() *block.Header { + return env.header +} + // Worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type Worker struct { @@ -62,6 +64,40 @@ type Worker struct { gasCeil uint64 } +// New create a new worker object. +func New( + chain core.BlockChain, beacon core.BlockChain, +) *Worker { + worker := newWorker(chain.Config(), chain, beacon) + + parent := chain.CurrentBlock().Header() + num := parent.Number() + timestamp := time.Now().Unix() + + epoch := GetNewEpoch(chain) + header := blockfactory.NewFactory(chain.Config()).NewHeader(epoch).With(). + ParentHash(parent.Hash()). + Number(num.Add(num, common.Big1)). + GasLimit(worker.GasFloor(epoch)). //core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil)). + Time(big.NewInt(timestamp)). + ShardID(chain.ShardID()). + Header() + worker.makeCurrent(parent, header) + + return worker +} + +func newWorker(config *params.ChainConfig, chain, beacon core.BlockChain) *Worker { + return &Worker{ + config: config, + factory: blockfactory.NewFactory(config), + chain: chain, + beacon: beacon, + gasFloor: 80000000, + gasCeil: 120000000, + } +} + // CommitSortedTransactions commits transactions for new block. func (w *Worker) CommitSortedTransactions( txs *types.TransactionsByPriceAndNonce, @@ -290,16 +326,16 @@ func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error { } // UpdateCurrent updates the current environment with the current state and header. -func (w *Worker) UpdateCurrent() error { +func (w *Worker) UpdateCurrent() (Environment, error) { parent := w.chain.CurrentHeader() num := parent.Number() timestamp := time.Now().Unix() - epoch := w.GetNewEpoch() + epoch := GetNewEpoch(w.chain) header := w.factory.NewHeader(epoch).With(). ParentHash(parent.Hash()). Number(num.Add(num, common.Big1)). - GasLimit(core.CalcGasLimit(parent, w.GasFloor(epoch), w.gasCeil)). + GasLimit(core.CalcGasLimit(parent, w.GasFloor(epoch), w.GasCeil())). Time(big.NewInt(timestamp)). ShardID(w.chain.ShardID()). Header() @@ -312,20 +348,28 @@ func (w *Worker) GetCurrentHeader() *block.Header { } // makeCurrent creates a new environment for the current cycle. -func (w *Worker) makeCurrent(parent *block.Header, header *block.Header) error { - state, err := w.chain.StateAt(parent.Root()) +func (w *Worker) makeCurrent(parent *block.Header, header *block.Header) (*environment, error) { + env, err := makeEnvironment(w.chain, parent, header) if err != nil { - return err + return nil, err + } + + w.current = env + return w.current, nil +} + +func makeEnvironment(chain core.BlockChain, parent *block.Header, header *block.Header) (*environment, error) { + state, err := chain.StateAt(parent.Root()) + if err != nil { + return nil, err } env := &environment{ - signer: types.NewEIP155Signer(w.config.ChainID), - ethSigner: types.NewEIP155Signer(w.config.EthCompatibleChainID), + signer: types.NewEIP155Signer(chain.Config().ChainID), + ethSigner: types.NewEIP155Signer(chain.Config().EthCompatibleChainID), state: state, header: header, } - - w.current = env - return nil + return env, nil } // GetCurrentResult gets the current block processing result. @@ -347,14 +391,14 @@ func (w *Worker) GetCurrentState() *state.DB { } // GetNewEpoch gets the current epoch. -func (w *Worker) GetNewEpoch() *big.Int { - parent := w.chain.CurrentBlock() +func GetNewEpoch(chain core.BlockChain) *big.Int { + parent := chain.CurrentBlock() epoch := new(big.Int).Set(parent.Header().Epoch()) shardState, err := parent.Header().GetShardState() if err == nil && shardState.Epoch != nil && - w.config.IsStaking(shardState.Epoch) { + chain.Config().IsStaking(shardState.Epoch) { // For shard state of staking epochs, the shard state will // have an epoch and it will decide the next epoch for following blocks epoch = new(big.Int).Set(shardState.Epoch) @@ -563,38 +607,8 @@ func (w *Worker) FinalizeNewBlock( return block, nil } -// New create a new worker object. -func New( - chain core.BlockChain, beacon core.BlockChain, -) *Worker { - worker := &Worker{ - config: chain.Config(), - factory: blockfactory.NewFactory(chain.Config()), - chain: chain, - beacon: beacon, - } - worker.gasFloor = 80000000 - worker.gasCeil = 120000000 - - parent := worker.chain.CurrentBlock().Header() - num := parent.Number() - timestamp := time.Now().Unix() - - epoch := worker.GetNewEpoch() - header := worker.factory.NewHeader(epoch).With(). - ParentHash(parent.Hash()). - Number(num.Add(num, common.Big1)). - GasLimit(worker.gasFloor). //core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil)). - Time(big.NewInt(timestamp)). - ShardID(worker.chain.ShardID()). - Header() - worker.makeCurrent(parent, header) - - return worker -} - func (w *Worker) GasFloor(epoch *big.Int) uint64 { - if w.chain.Config().IsBlockGas30M(epoch) { + if w.config.IsBlockGas30M(epoch) { return 30_000_000 } diff --git a/node/worker/worker_test.go b/node/worker/worker_test.go index 7b6fd0c366..8dc84e14e7 100644 --- a/node/worker/worker_test.go +++ b/node/worker/worker_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/harmony-one/harmony/core/rawdb" + "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -100,3 +101,12 @@ func TestCommitTransactions(t *testing.T) { t.Error("Transaction is not committed") } } + +func TestGasLimit(t *testing.T) { + w := newWorker( + ¶ms.ChainConfig{ + BlockGas30MEpoch: big.NewInt(10), + }, nil, nil) + require.EqualValues(t, 80_000_000, w.GasFloor(big.NewInt(3))) + require.EqualValues(t, 30_000_000, w.GasFloor(big.NewInt(10))) +}