-
Notifications
You must be signed in to change notification settings - Fork 291
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor offchain data commit; Make block onchain/offchain commit ato…
…mic (#2279) * Refactor offchain data; Add epoch to ValidatorSnapshot * Make block onchain/offchain data commit atomically
- Loading branch information
Showing
8 changed files
with
631 additions
and
913 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package core | ||
|
||
import ( | ||
"errors" | ||
"math/big" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/ethereum/go-ethereum/rlp" | ||
"github.com/harmony-one/harmony/core/rawdb" | ||
"github.com/harmony-one/harmony/core/state" | ||
"github.com/harmony-one/harmony/core/types" | ||
"github.com/harmony-one/harmony/internal/utils" | ||
"github.com/harmony-one/harmony/shard" | ||
) | ||
|
||
// CommitOffChainData write off chain data of a block onto db writer. | ||
func (bc *BlockChain) CommitOffChainData( | ||
batch rawdb.DatabaseWriter, block *types.Block, receipts []*types.Receipt, | ||
cxReceipts []*types.CXReceipt, payout *big.Int, state *state.DB, root common.Hash) (status WriteStatus, err error) { | ||
//// Write receipts of the block | ||
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) | ||
|
||
//// Cross-shard txns | ||
epoch := block.Header().Epoch() | ||
if bc.chainConfig.HasCrossTxFields(block.Epoch()) { | ||
shardingConfig := shard.Schedule.InstanceForEpoch(epoch) | ||
shardNum := int(shardingConfig.NumShards()) | ||
for i := 0; i < shardNum; i++ { | ||
if i == int(block.ShardID()) { | ||
continue | ||
} | ||
|
||
shardReceipts := types.CXReceipts(cxReceipts).GetToShardReceipts(uint32(i)) | ||
err := rawdb.WriteCXReceipts(batch, uint32(i), block.NumberU64(), block.Hash(), shardReceipts) | ||
if err != nil { | ||
utils.Logger().Error().Err(err).Interface("shardReceipts", shardReceipts).Int("toShardID", i).Msg("WriteCXReceipts cannot write into database") | ||
return NonStatTy, err | ||
} | ||
} | ||
// Mark incomingReceipts in the block as spent | ||
bc.WriteCXReceiptsProofSpent(batch, block.IncomingReceipts()) | ||
} | ||
|
||
//// VRF + VDF | ||
//check non zero VRF field in header and add to local db | ||
//if len(block.Vrf()) > 0 { | ||
// vrfBlockNumbers, _ := bc.ReadEpochVrfBlockNums(block.Header().Epoch()) | ||
// if (len(vrfBlockNumbers) > 0) && (vrfBlockNumbers[len(vrfBlockNumbers)-1] == block.NumberU64()) { | ||
// utils.Logger().Error(). | ||
// Str("number", block.Number().String()). | ||
// Str("epoch", block.Header().Epoch().String()). | ||
// Msg("VRF block number is already in local db") | ||
// } else { | ||
// vrfBlockNumbers = append(vrfBlockNumbers, block.NumberU64()) | ||
// err = bc.WriteEpochVrfBlockNums(block.Header().Epoch(), vrfBlockNumbers) | ||
// if err != nil { | ||
// utils.Logger().Error(). | ||
// Str("number", block.Number().String()). | ||
// Str("epoch", block.Header().Epoch().String()). | ||
// Msg("failed to write VRF block number to local db") | ||
// return NonStatTy, err | ||
// } | ||
// } | ||
//} | ||
// | ||
////check non zero Vdf in header and add to local db | ||
//if len(block.Vdf()) > 0 { | ||
// err = bc.WriteEpochVdfBlockNum(block.Header().Epoch(), block.Number()) | ||
// if err != nil { | ||
// utils.Logger().Error(). | ||
// Str("number", block.Number().String()). | ||
// Str("epoch", block.Header().Epoch().String()). | ||
// Msg("failed to write VDF block number to local db") | ||
// return NonStatTy, err | ||
// } | ||
//} | ||
|
||
//// Shard State and Validator Update | ||
header := block.Header() | ||
if len(header.ShardState()) > 0 { | ||
// Write shard state for the new epoch | ||
epoch := new(big.Int).Add(header.Epoch(), common.Big1) | ||
shardState, err := block.Header().GetShardState() | ||
if err == nil && shardState.Epoch != nil && bc.chainConfig.IsStaking(shardState.Epoch) { | ||
// After staking, the epoch will be decided by the epoch in the shard state. | ||
epoch = new(big.Int).Set(shardState.Epoch) | ||
} | ||
|
||
newShardState, err := bc.WriteShardStateBytes(batch, epoch, header.ShardState()) | ||
if err != nil { | ||
header.Logger(utils.Logger()).Warn().Err(err).Msg("cannot store shard state") | ||
return NonStatTy, err | ||
} | ||
|
||
// Find all the active validator addresses and store them in db | ||
allActiveValidators := []common.Address{} | ||
processed := make(map[common.Address]struct{}) | ||
for i := range newShardState.Shards { | ||
shard := newShardState.Shards[i] | ||
for j := range shard.Slots { | ||
slot := shard.Slots[j] | ||
if slot.EffectiveStake != nil { // For external validator | ||
_, ok := processed[slot.EcdsaAddress] | ||
if !ok { | ||
processed[slot.EcdsaAddress] = struct{}{} | ||
allActiveValidators = append(allActiveValidators, shard.Slots[j].EcdsaAddress) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Update active validators | ||
if err := bc.WriteActiveValidatorList(batch, allActiveValidators); err != nil { | ||
return NonStatTy, err | ||
} | ||
|
||
// Update snapshots for all validators | ||
if err := bc.UpdateValidatorSnapshots(batch, epoch); err != nil { | ||
return NonStatTy, err | ||
} | ||
} | ||
|
||
// Do bookkeeping for new staking txns | ||
for _, tx := range block.StakingTransactions() { | ||
err = bc.UpdateStakingMetaData(batch, tx, root) | ||
// keep offchain database consistency with onchain we need revert | ||
// but it should not happend unless local database corrupted | ||
if err != nil { | ||
utils.Logger().Debug().Msgf("oops, UpdateStakingMetaData failed, err: %+v", err) | ||
return NonStatTy, err | ||
} | ||
} | ||
|
||
// Update voting power of validators for all shards | ||
if block.ShardID() == shard.BeaconChainShardID && | ||
len(block.Header().ShardState()) > 0 { | ||
shardState := new(shard.State) | ||
|
||
if shardState, err = shard.DecodeWrapper(block.Header().ShardState()); err == nil { | ||
if err = bc.UpdateValidatorVotingPower(batch, shardState); err != nil { | ||
utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to update voting power") | ||
} | ||
} else { | ||
utils.Logger().Err(err).Msg("[UpdateValidatorVotingPower] Failed to decode shard state") | ||
} | ||
} | ||
|
||
//// Writing beacon chain cross links | ||
if header.ShardID() == shard.BeaconChainShardID && | ||
bc.chainConfig.IsCrossLink(block.Epoch()) && | ||
len(header.CrossLinks()) > 0 { | ||
crossLinks := &types.CrossLinks{} | ||
err = rlp.DecodeBytes(header.CrossLinks(), crossLinks) | ||
if err != nil { | ||
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cannot parse cross links") | ||
return NonStatTy, err | ||
} | ||
if !crossLinks.IsSorted() { | ||
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain/crosslinks] cross links are not sorted") | ||
return NonStatTy, errors.New("proposed cross links are not sorted") | ||
} | ||
for _, crossLink := range *crossLinks { | ||
// Process crosslink | ||
if err := bc.WriteCrossLinks(batch, types.CrossLinks{crossLink}); err == nil { | ||
utils.Logger().Info().Uint64("blockNum", crossLink.BlockNum()).Uint32("shardID", crossLink.ShardID()).Msg("[insertChain/crosslinks] Cross Link Added to Beaconchain") | ||
} | ||
bc.LastContinuousCrossLink(batch, crossLink) | ||
} | ||
|
||
//clean/update local database cache after crosslink inserted into blockchain | ||
num, err := bc.DeleteCommittedFromPendingCrossLinks(*crossLinks) | ||
utils.Logger().Debug().Msgf("DeleteCommittedFromPendingCrossLinks, crosslinks in header %d, pending crosslinks: %d, error: %+v", len(*crossLinks), num, err) | ||
} | ||
|
||
if bc.CurrentHeader().ShardID() == shard.BeaconChainShardID { | ||
if bc.chainConfig.IsStaking(block.Epoch()) { | ||
bc.UpdateBlockRewardAccumulator(batch, payout, block.Number().Uint64()) | ||
} else { | ||
// block reward never accumulate before staking | ||
bc.WriteBlockRewardAccumulator(batch, big.NewInt(0), block.Number().Uint64()) | ||
} | ||
} | ||
return CanonStatTy, nil | ||
} |
Oops, something went wrong.