diff --git a/vms/platformvm/state/diff.go b/vms/platformvm/state/diff.go index f0a90e66170b..1aafcf079969 100644 --- a/vms/platformvm/state/diff.go +++ b/vms/platformvm/state/diff.go @@ -331,9 +331,12 @@ func (d *diff) GetSubnetTransformation(subnetID ids.ID) (*txs.Tx, error) { func (d *diff) AddSubnetTransformation(transformSubnetTxIntf *txs.Tx) { transformSubnetTx := transformSubnetTxIntf.Unsigned.(*txs.TransformSubnetTx) if d.transformedSubnets == nil { - d.transformedSubnets = make(map[ids.ID]*txs.Tx) + d.transformedSubnets = map[ids.ID]*txs.Tx{ + transformSubnetTx.Subnet: transformSubnetTxIntf, + } + } else { + d.transformedSubnets[transformSubnetTx.Subnet] = transformSubnetTxIntf } - d.transformedSubnets[transformSubnetTx.Subnet] = transformSubnetTxIntf } func (d *diff) GetChains(subnetID ids.ID) ([]*txs.Tx, error) { diff --git a/vms/platformvm/state/masked_iterator_test.go b/vms/platformvm/state/masked_iterator_test.go index a3c43818d3b4..8ba719d3e732 100644 --- a/vms/platformvm/state/masked_iterator_test.go +++ b/vms/platformvm/state/masked_iterator_test.go @@ -17,23 +17,19 @@ func TestMaskedIterator(t *testing.T) { stakers := []*Staker{ { TxID: ids.GenerateTestID(), - Weight: 0, // just to simplify debugging NextTime: time.Unix(0, 0), }, { TxID: ids.GenerateTestID(), - Weight: 10, // just to simplify debugging - NextTime: time.Unix(10, 0), + NextTime: time.Unix(1, 0), }, { TxID: ids.GenerateTestID(), - Weight: 20, // just to simplify debugging - NextTime: time.Unix(20, 0), + NextTime: time.Unix(2, 0), }, { TxID: ids.GenerateTestID(), - Weight: 30, // just to simplify debugging - NextTime: time.Unix(30, 0), + NextTime: time.Unix(3, 0), }, } maskedStakers := map[ids.ID]*Staker{ diff --git a/vms/platformvm/state/staker_test.go b/vms/platformvm/state/staker_test.go index a6faa4ab704f..747f442e5eda 100644 --- a/vms/platformvm/state/staker_test.go +++ b/vms/platformvm/state/staker_test.go @@ -144,7 +144,7 @@ func TestNewCurrentStaker(t *testing.T) { subnetID := ids.GenerateTestID() weight := uint64(12345) startTime := time.Now() - endTime := startTime.Add(time.Hour) + endTime := time.Now() potentialReward := uint64(54321) currentPriority := txs.SubnetPermissionedValidatorCurrentPriority diff --git a/vms/platformvm/state/state.go b/vms/platformvm/state/state.go index d08623045929..c1cc51bc5413 100644 --- a/vms/platformvm/state/state.go +++ b/vms/platformvm/state/state.go @@ -10,6 +10,7 @@ import ( "fmt" "time" + "github.com/google/btree" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -25,12 +26,14 @@ import ( "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" + "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/uptime" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/trace" "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/hashing" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/avalanchego/utils/wrappers" @@ -38,6 +41,7 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/block" "github.com/ava-labs/avalanchego/vms/platformvm/config" "github.com/ava-labs/avalanchego/vms/platformvm/fx" + "github.com/ava-labs/avalanchego/vms/platformvm/genesis" "github.com/ava-labs/avalanchego/vms/platformvm/metrics" "github.com/ava-labs/avalanchego/vms/platformvm/reward" "github.com/ava-labs/avalanchego/vms/platformvm/status" @@ -192,8 +196,173 @@ type State interface { Close() error } +// TODO: Remove after v1.11.x is activated +type stateBlk struct { + Blk block.Block + Bytes []byte `serialize:"true"` + Status choices.Status `serialize:"true"` +} + +// Stores global state in a merkle trie. This means that each state corresponds +// to a unique merkle root. Specifically, the following state is merkleized. +// - Delegatee Rewards +// - UTXOs +// - Current Supply +// - Subnet Creation Transactions +// - Subnet Owners +// - Subnet Transformation Transactions +// - Chain Creation Transactions +// - Chain time +// - Last Accepted Block ID +// - Current Staker Set +// - Pending Staker Set +// +// Changing any of the above state will cause the merkle root to change. +// +// The following state is not merkleized: +// - Database Initialization Status +// - Blocks +// - Block IDs +// - Transactions (note some transactions are also stored merkleized) +// - Uptimes +// - Weight Diffs +// - BLS Key Diffs +// - Reward UTXOs +type state struct { + validators validators.Manager + ctx *snow.Context + metrics metrics.Metrics + rewards reward.Calculator + + baseDB *versiondb.Database + singletonDB database.Database + baseMerkleDB database.Database + merkleDB merkledb.MerkleDB // Stores merkleized state + + // stakers section (missing Delegatee piece) + // TODO: Consider moving delegatee to UTXOs section + currentStakers *baseStakers + pendingStakers *baseStakers + + delegateeRewardCache map[ids.NodeID]map[ids.ID]uint64 // (nodeID, subnetID) --> delegatee amount + modifiedDelegateeReward map[ids.NodeID]set.Set[ids.ID] // tracks (nodeID, subnetID) pairs updated after last commit + + // UTXOs section + modifiedUTXOs map[ids.ID]*avax.UTXO // map of UTXO ID -> *UTXO + utxoCache cache.Cacher[ids.ID, *avax.UTXO] // UTXO ID -> *UTXO. If the *UTXO is nil the UTXO doesn't exist + + // Metadata section + chainTime, latestComittedChainTime time.Time + lastAcceptedBlkID, latestCommittedLastAcceptedBlkID ids.ID + lastAcceptedHeight uint64 // TODO: Should this be written to state?? + modifiedSupplies map[ids.ID]uint64 // map of subnetID -> current supply + suppliesCache cache.Cacher[ids.ID, *uint64] // cache of subnetID -> current supply if the entry is nil, it is not in the database + + // Subnets section + // Subnet ID --> Owner of the subnet + subnetOwners map[ids.ID]fx.Owner + subnetOwnerCache cache.Cacher[ids.ID, fxOwnerAndSize] // cache of subnetID -> owner if the entry is nil, it is not in the database + + addedPermissionedSubnets []*txs.Tx // added SubnetTxs, waiting to be committed + permissionedSubnetCache []*txs.Tx // nil if the subnets haven't been loaded + addedElasticSubnets map[ids.ID]*txs.Tx // map of subnetID -> transformSubnetTx + elasticSubnetCache cache.Cacher[ids.ID, *txs.Tx] // cache of subnetID -> transformSubnetTx if the entry is nil, it is not in the database + + // Chains section + addedChains map[ids.ID][]*txs.Tx // maps subnetID -> the newly added chains to the subnet + chainCache cache.Cacher[ids.ID, []*txs.Tx] // cache of subnetID -> the chains after all local modifications []*txs.Tx + + // Blocks section + // Note: addedBlocks is a list because multiple blocks can be committed at one (proposal + accepted option) + addedBlocks map[ids.ID]block.Block // map of blockID -> Block. + blockCache cache.Cacher[ids.ID, block.Block] // cache of blockID -> Block. If the entry is nil, it is not in the database + blockDB database.Database + + addedBlockIDs map[uint64]ids.ID // map of height -> blockID + blockIDCache cache.Cacher[uint64, ids.ID] // cache of height -> blockID. If the entry is ids.Empty, it is not in the database + blockIDDB database.Database + + // Txs section + // FIND a way to reduce use of these. No use in verification of addedTxs + // a limited windows to support APIs + addedTxs map[ids.ID]*txAndStatus // map of txID -> {*txs.Tx, Status} + txCache cache.Cacher[ids.ID, *txAndStatus] // txID -> {*txs.Tx, Status}. If the entry is nil, it isn't in the database + txDB database.Database + + indexedUTXOsDB database.Database + + localUptimesCache map[ids.NodeID]map[ids.ID]*uptimes // vdrID -> subnetID -> metadata + modifiedLocalUptimes map[ids.NodeID]set.Set[ids.ID] // vdrID -> subnetIDs + localUptimesDB database.Database + + flatValidatorWeightDiffsDB database.Database + flatValidatorPublicKeyDiffsDB database.Database + + // Reward UTXOs section + addedRewardUTXOs map[ids.ID][]*avax.UTXO // map of txID -> []*UTXO + rewardUTXOsCache cache.Cacher[ids.ID, []*avax.UTXO] // txID -> []*UTXO + rewardUTXOsDB database.Database +} + +type ValidatorWeightDiff struct { + Decrease bool `serialize:"true"` + Amount uint64 `serialize:"true"` +} + +func (v *ValidatorWeightDiff) Add(negative bool, amount uint64) error { + if v.Decrease == negative { + var err error + v.Amount, err = safemath.Add64(v.Amount, amount) + return err + } + + if v.Amount > amount { + v.Amount -= amount + } else { + v.Amount = safemath.AbsDiff(v.Amount, amount) + v.Decrease = negative + } + return nil +} + +type txBytesAndStatus struct { + Tx []byte `serialize:"true"` + Status status.Status `serialize:"true"` +} + +type txAndStatus struct { + tx *txs.Tx + status status.Status +} + +type fxOwnerAndSize struct { + owner fx.Owner + size int +} + +func txSize(_ ids.ID, tx *txs.Tx) int { + if tx == nil { + return ids.IDLen + constants.PointerOverhead + } + return ids.IDLen + len(tx.Bytes()) + constants.PointerOverhead +} + +func txAndStatusSize(_ ids.ID, t *txAndStatus) int { + if t == nil { + return ids.IDLen + constants.PointerOverhead + } + return ids.IDLen + len(t.tx.Bytes()) + wrappers.IntLen + 2*constants.PointerOverhead +} + +func blockSize(_ ids.ID, blk block.Block) int { + if blk == nil { + return ids.IDLen + constants.PointerOverhead + } + return ids.IDLen + len(blk.Bytes()) + constants.PointerOverhead +} + func New( - rawDB database.Database, + db database.Database, genesisBytes []byte, metricsReg prometheus.Registerer, validators validators.Manager, @@ -202,8 +371,8 @@ func New( metrics metrics.Metrics, rewards reward.Calculator, ) (State, error) { - res, err := newState( - rawDB, + s, err := newState( + db, metrics, validators, execCfg, @@ -215,17 +384,17 @@ func New( return nil, err } - if err := res.sync(genesisBytes); err != nil { + if err := s.sync(genesisBytes); err != nil { // Drop any errors on close to return the first error - _ = res.Close() + _ = s.Close() return nil, err } - return res, nil + return s, nil } func newState( - rawDB database.Database, + db database.Database, metrics metrics.Metrics, validators validators.Manager, execCfg *config.ExecutionConfig, @@ -234,7 +403,7 @@ func newState( rewards reward.Calculator, ) (*state, error) { var ( - baseDB = versiondb.New(rawDB) + baseDB = versiondb.New(db) baseMerkleDB = prefixdb.New(merkleStatePrefix, baseDB) singletonDB = prefixdb.New(merkleSingletonPrefix, baseDB) blockDB = prefixdb.New(merkleBlockPrefix, baseDB) @@ -264,19 +433,19 @@ func newState( return nil, fmt.Errorf("failed creating merkleDB: %w", err) } - rewardUTXOsCache, err := metercacher.New[ids.ID, []*avax.UTXO]( - "reward_utxos_cache", + txCache, err := metercacher.New( + "tx_cache", metricsReg, - &cache.LRU[ids.ID, []*avax.UTXO]{Size: execCfg.RewardUTXOsCacheSize}, + cache.NewSizedLRU[ids.ID, *txAndStatus](execCfg.TxCacheSize, txAndStatusSize), ) if err != nil { return nil, err } - suppliesCache, err := metercacher.New[ids.ID, *uint64]( - "supply_cache", + rewardUTXOsCache, err := metercacher.New[ids.ID, []*avax.UTXO]( + "reward_utxos_cache", metricsReg, - &cache.LRU[ids.ID, *uint64]{Size: execCfg.ChainCacheSize}, + &cache.LRU[ids.ID, []*avax.UTXO]{Size: execCfg.RewardUTXOsCacheSize}, ) if err != nil { return nil, err @@ -302,6 +471,15 @@ func newState( return nil, err } + supplyCache, err := metercacher.New[ids.ID, *uint64]( + "supply_cache", + metricsReg, + &cache.LRU[ids.ID, *uint64]{Size: execCfg.ChainCacheSize}, + ) + if err != nil { + return nil, err + } + chainCache, err := metercacher.New[ids.ID, []*txs.Tx]( "chain_cache", metricsReg, @@ -329,15 +507,6 @@ func newState( return nil, err } - txCache, err := metercacher.New( - "tx_cache", - metricsReg, - cache.NewSizedLRU[ids.ID, *txAndStatus](execCfg.TxCacheSize, txAndStatusSize), - ) - if err != nil { - return nil, err - } - return &state{ validators: validators, ctx: ctx, @@ -359,7 +528,7 @@ func newState( utxoCache: &cache.LRU[ids.ID, *avax.UTXO]{Size: utxoCacheSize}, modifiedSupplies: make(map[ids.ID]uint64), - suppliesCache: suppliesCache, + suppliesCache: supplyCache, subnetOwners: make(map[ids.ID]fx.Owner), subnetOwnerCache: subnetOwnerCache, @@ -399,135 +568,33 @@ func newState( }, nil } -// Stores global state in a merkle trie. This means that each state corresponds -// to a unique merkle root. Specifically, the following state is merkleized. -// - Delegatee Rewards -// - UTXOs -// - Current Supply -// - Subnet Creation Transactions -// - Subnet Owners -// - Subnet Transformation Transactions -// - Chain Creation Transactions -// - Chain time -// - Last Accepted Block ID -// - Current Staker Set -// - Pending Staker Set -// -// Changing any of the above state will cause the merkle root to change. -// -// The following state is not merkleized: -// - Database Initialization Status -// - Blocks -// - Block IDs -// - Transactions (note some transactions are also stored merkleized) -// - Uptimes -// - Weight Diffs -// - BLS Key Diffs -// - Reward UTXOs -type state struct { - validators validators.Manager - ctx *snow.Context - metrics metrics.Metrics - rewards reward.Calculator +func (s *state) GetCurrentValidator(subnetID ids.ID, nodeID ids.NodeID) (*Staker, error) { + return s.currentStakers.GetValidator(subnetID, nodeID) +} - baseDB *versiondb.Database - singletonDB database.Database - baseMerkleDB database.Database - merkleDB merkledb.MerkleDB // Stores merkleized state +func (s *state) PutCurrentValidator(staker *Staker) { + s.currentStakers.PutValidator(staker) - // stakers section (missing Delegatee piece) - // TODO: Consider moving delegatee to UTXOs section - currentStakers *baseStakers - pendingStakers *baseStakers + // make sure that each new validator has an uptime entry + // and a delegatee reward entry. MerkleState implementations + // of SetUptime and SetDelegateeReward must not err + err := s.SetUptime(staker.NodeID, staker.SubnetID, 0 /*duration*/, staker.StartTime) + if err != nil { + panic(err) + } + err = s.SetDelegateeReward(staker.SubnetID, staker.NodeID, 0) + if err != nil { + panic(err) + } +} - delegateeRewardCache map[ids.NodeID]map[ids.ID]uint64 // (nodeID, subnetID) --> delegatee amount - modifiedDelegateeReward map[ids.NodeID]set.Set[ids.ID] // tracks (nodeID, subnetID) pairs updated after last commit +func (s *state) DeleteCurrentValidator(staker *Staker) { + s.currentStakers.DeleteValidator(staker) +} - // UTXOs section - modifiedUTXOs map[ids.ID]*avax.UTXO // map of UTXO ID -> *UTXO - utxoCache cache.Cacher[ids.ID, *avax.UTXO] // UTXO ID -> *UTXO. If the *UTXO is nil the UTXO doesn't exist - - // Metadata section - chainTime, latestComittedChainTime time.Time - lastAcceptedBlkID, latestCommittedLastAcceptedBlkID ids.ID - lastAcceptedHeight uint64 // TODO: Should this be written to state?? - modifiedSupplies map[ids.ID]uint64 // map of subnetID -> current supply - suppliesCache cache.Cacher[ids.ID, *uint64] // cache of subnetID -> current supply if the entry is nil, it is not in the database - - // Subnets section - // Subnet ID --> Owner of the subnet - subnetOwners map[ids.ID]fx.Owner - subnetOwnerCache cache.Cacher[ids.ID, fxOwnerAndSize] // cache of subnetID -> owner if the entry is nil, it is not in the database - - addedPermissionedSubnets []*txs.Tx // added SubnetTxs, waiting to be committed - permissionedSubnetCache []*txs.Tx // nil if the subnets haven't been loaded - addedElasticSubnets map[ids.ID]*txs.Tx // map of subnetID -> transformSubnetTx - elasticSubnetCache cache.Cacher[ids.ID, *txs.Tx] // cache of subnetID -> transformSubnetTx if the entry is nil, it is not in the database - - // Chains section - addedChains map[ids.ID][]*txs.Tx // maps subnetID -> the newly added chains to the subnet - chainCache cache.Cacher[ids.ID, []*txs.Tx] // cache of subnetID -> the chains after all local modifications []*txs.Tx - - // Blocks section - // Note: addedBlocks is a list because multiple blocks can be committed at one (proposal + accepted option) - addedBlocks map[ids.ID]block.Block // map of blockID -> Block. - blockCache cache.Cacher[ids.ID, block.Block] // cache of blockID -> Block. If the entry is nil, it is not in the database - blockDB database.Database - - addedBlockIDs map[uint64]ids.ID // map of height -> blockID - blockIDCache cache.Cacher[uint64, ids.ID] // cache of height -> blockID. If the entry is ids.Empty, it is not in the database - blockIDDB database.Database - - // Txs section - // FIND a way to reduce use of these. No use in verification of addedTxs - // a limited windows to support APIs - addedTxs map[ids.ID]*txAndStatus // map of txID -> {*txs.Tx, Status} - txCache cache.Cacher[ids.ID, *txAndStatus] // txID -> {*txs.Tx, Status}. If the entry is nil, it isn't in the database - txDB database.Database - - indexedUTXOsDB database.Database - - localUptimesCache map[ids.NodeID]map[ids.ID]*uptimes // vdrID -> subnetID -> metadata - modifiedLocalUptimes map[ids.NodeID]set.Set[ids.ID] // vdrID -> subnetIDs - localUptimesDB database.Database - - flatValidatorWeightDiffsDB database.Database - flatValidatorPublicKeyDiffsDB database.Database - - // Reward UTXOs section - addedRewardUTXOs map[ids.ID][]*avax.UTXO // map of txID -> []*UTXO - rewardUTXOsCache cache.Cacher[ids.ID, []*avax.UTXO] // txID -> []*UTXO - rewardUTXOsDB database.Database -} - -// STAKERS section -func (s *state) GetCurrentValidator(subnetID ids.ID, nodeID ids.NodeID) (*Staker, error) { - return s.currentStakers.GetValidator(subnetID, nodeID) -} - -func (s *state) PutCurrentValidator(staker *Staker) { - s.currentStakers.PutValidator(staker) - - // make sure that each new validator has an uptime entry - // and a delegatee reward entry. MerkleState implementations - // of SetUptime and SetDelegateeReward must not err - err := s.SetUptime(staker.NodeID, staker.SubnetID, 0 /*duration*/, staker.StartTime) - if err != nil { - panic(err) - } - err = s.SetDelegateeReward(staker.SubnetID, staker.NodeID, 0) - if err != nil { - panic(err) - } -} - -func (s *state) DeleteCurrentValidator(staker *Staker) { - s.currentStakers.DeleteValidator(staker) -} - -func (s *state) GetCurrentDelegatorIterator(subnetID ids.ID, nodeID ids.NodeID) (StakerIterator, error) { - return s.currentStakers.GetDelegatorIterator(subnetID, nodeID), nil -} +func (s *state) GetCurrentDelegatorIterator(subnetID ids.ID, nodeID ids.NodeID) (StakerIterator, error) { + return s.currentStakers.GetDelegatorIterator(subnetID, nodeID), nil +} func (s *state) PutCurrentDelegator(staker *Staker) { s.currentStakers.PutDelegator(staker) @@ -569,181 +636,13 @@ func (s *state) GetPendingStakerIterator() (StakerIterator, error) { return s.pendingStakers.GetStakerIterator(), nil } -func (s *state) GetDelegateeReward(subnetID ids.ID, vdrID ids.NodeID) (uint64, error) { - nodeDelegateeRewards, exists := s.delegateeRewardCache[vdrID] - if exists { - delegateeReward, exists := nodeDelegateeRewards[subnetID] - if exists { - return delegateeReward, nil - } - } - - // try loading from the db - key := merkleDelegateeRewardsKey(vdrID, subnetID) - amountBytes, err := s.merkleDB.Get(key) - if err != nil { - return 0, err - } - delegateeReward, err := database.ParseUInt64(amountBytes) - if err != nil { - return 0, err - } - - if _, found := s.delegateeRewardCache[vdrID]; !found { - s.delegateeRewardCache[vdrID] = make(map[ids.ID]uint64) - } - s.delegateeRewardCache[vdrID][subnetID] = delegateeReward - return delegateeReward, nil -} - -func (s *state) SetDelegateeReward(subnetID ids.ID, vdrID ids.NodeID, amount uint64) error { - nodeDelegateeRewards, exists := s.delegateeRewardCache[vdrID] - if !exists { - nodeDelegateeRewards = make(map[ids.ID]uint64) - s.delegateeRewardCache[vdrID] = nodeDelegateeRewards - } - nodeDelegateeRewards[subnetID] = amount - - // track diff - updatedDelegateeRewards, ok := s.modifiedDelegateeReward[vdrID] - if !ok { - updatedDelegateeRewards = set.Set[ids.ID]{} - s.modifiedDelegateeReward[vdrID] = updatedDelegateeRewards - } - updatedDelegateeRewards.Add(subnetID) - return nil -} - -// UTXOs section -func (s *state) GetUTXO(utxoID ids.ID) (*avax.UTXO, error) { - if utxo, exists := s.modifiedUTXOs[utxoID]; exists { - if utxo == nil { - return nil, database.ErrNotFound - } - return utxo, nil - } - if utxo, found := s.utxoCache.Get(utxoID); found { - if utxo == nil { - return nil, database.ErrNotFound - } - return utxo, nil - } - - key := merkleUtxoIDKey(utxoID) - - switch bytes, err := s.merkleDB.Get(key); err { - case nil: - utxo := &avax.UTXO{} - if _, err := txs.GenesisCodec.Unmarshal(bytes, utxo); err != nil { - return nil, err - } - s.utxoCache.Put(utxoID, utxo) - return utxo, nil - - case database.ErrNotFound: - s.utxoCache.Put(utxoID, nil) - return nil, database.ErrNotFound - - default: - return nil, err - } -} - -func (s *state) UTXOIDs(addr []byte, start ids.ID, limit int) ([]ids.ID, error) { - var ( - prefix = slices.Clone(addr) - key = merkleUtxoIndexKey(addr, start) - ) - - iter := s.indexedUTXOsDB.NewIteratorWithStartAndPrefix(key, prefix) - defer iter.Release() - - utxoIDs := []ids.ID(nil) - for len(utxoIDs) < limit && iter.Next() { - itAddr, utxoID := splitUtxoIndexKey(iter.Key()) - if !bytes.Equal(itAddr, addr) { - break - } - if utxoID == start { - continue - } - - start = ids.Empty - utxoIDs = append(utxoIDs, utxoID) - } - return utxoIDs, iter.Error() -} - -func (s *state) AddUTXO(utxo *avax.UTXO) { - s.modifiedUTXOs[utxo.InputID()] = utxo -} - -func (s *state) DeleteUTXO(utxoID ids.ID) { - s.modifiedUTXOs[utxoID] = nil -} - -// METADATA Section -func (s *state) GetTimestamp() time.Time { - return s.chainTime -} - -func (s *state) SetTimestamp(tm time.Time) { - s.chainTime = tm -} - -func (s *state) GetLastAccepted() ids.ID { - return s.lastAcceptedBlkID -} - -func (s *state) SetLastAccepted(lastAccepted ids.ID) { - s.lastAcceptedBlkID = lastAccepted -} - -func (s *state) SetHeight(height uint64) { - s.lastAcceptedHeight = height -} - -func (s *state) GetCurrentSupply(subnetID ids.ID) (uint64, error) { - supply, ok := s.modifiedSupplies[subnetID] - if ok { - return supply, nil - } - cachedSupply, ok := s.suppliesCache.Get(subnetID) - if ok { - if cachedSupply == nil { - return 0, database.ErrNotFound - } - return *cachedSupply, nil - } - - key := merkleSuppliesKey(subnetID) - - switch supplyBytes, err := s.merkleDB.Get(key); err { - case nil: - supply, err := database.ParseUInt64(supplyBytes) - if err != nil { - return 0, fmt.Errorf("failed parsing supply: %w", err) - } - s.suppliesCache.Put(subnetID, &supply) - return supply, nil - - case database.ErrNotFound: - s.suppliesCache.Put(subnetID, nil) - return 0, database.ErrNotFound - - default: - return 0, err - } -} - -func (s *state) SetCurrentSupply(subnetID ids.ID, cs uint64) { - s.modifiedSupplies[subnetID] = cs +func (s *state) shouldInit() (bool, error) { + has, err := s.singletonDB.Has(initializedKey) + return !has, err } -// SUBNETS Section -type fxOwnerAndSize struct { - owner fx.Owner - size int +func (s *state) doneInit() error { + return s.singletonDB.Put(initializedKey, nil) } func (s *state) GetSubnets() ([]*txs.Tx, error) { @@ -753,10 +652,10 @@ func (s *state) GetSubnets() ([]*txs.Tx, error) { return s.permissionedSubnetCache, nil } - subnets := make([]*txs.Tx, 0) subnetDBIt := s.merkleDB.NewIteratorWithPrefix(permissionedSubnetSectionPrefix) defer subnetDBIt.Release() + subnets := make([]*txs.Tx, 0) for subnetDBIt.Next() { subnetTxBytes := subnetDBIt.Value() subnetTx, err := txs.Parse(txs.GenesisCodec, subnetTxBytes) @@ -864,17 +763,17 @@ func (s *state) AddSubnetTransformation(transformSubnetTxIntf *txs.Tx) { s.addedElasticSubnets[transformSubnetTx.Subnet] = transformSubnetTxIntf } -// CHAINS Section func (s *state) GetChains(subnetID ids.ID) ([]*txs.Tx, error) { if chains, cached := s.chainCache.Get(subnetID); cached { return chains, nil } - chains := make([]*txs.Tx, 0) prefix := merkleChainPrefix(subnetID) - chainDBIt := s.merkleDB.NewIteratorWithPrefix(prefix) defer chainDBIt.Release() + + chains := make([]*txs.Tx, 0) + for chainDBIt.Next() { chainTxBytes := chainDBIt.Value() chainTx, err := txs.Parse(txs.GenesisCodec, chainTxBytes) @@ -898,31 +797,6 @@ func (s *state) AddChain(createChainTxIntf *txs.Tx) { s.addedChains[subnetID] = append(s.addedChains[subnetID], createChainTxIntf) } -// TXs Section -type txBytesAndStatus struct { - Tx []byte `serialize:"true"` - Status status.Status `serialize:"true"` -} - -type txAndStatus struct { - tx *txs.Tx - status status.Status -} - -func txSize(_ ids.ID, tx *txs.Tx) int { - if tx == nil { - return ids.IDLen + constants.PointerOverhead - } - return ids.IDLen + len(tx.Bytes()) + constants.PointerOverhead -} - -func txAndStatusSize(_ ids.ID, t *txAndStatus) int { - if t == nil { - return ids.IDLen + constants.PointerOverhead - } - return ids.IDLen + len(t.tx.Bytes()) + wrappers.IntLen + 2*constants.PointerOverhead -} - func (s *state) GetTx(txID ids.ID) (*txs.Tx, status.Status, error) { if tx, exists := s.addedTxs[txID]; exists { return tx.tx, tx.status, nil @@ -971,41 +845,66 @@ func (s *state) AddTx(tx *txs.Tx, status status.Status) { } } -// BLOCKs Section -func blockSize(_ ids.ID, blk block.Block) int { - if blk == nil { - return ids.IDLen + constants.PointerOverhead +func (s *state) GetRewardUTXOs(txID ids.ID) ([]*avax.UTXO, error) { + if utxos, exists := s.addedRewardUTXOs[txID]; exists { + return utxos, nil + } + if utxos, exists := s.rewardUTXOsCache.Get(txID); exists { + return utxos, nil } - return ids.IDLen + len(blk.Bytes()) + constants.PointerOverhead -} -func (s *state) GetStatelessBlock(blockID ids.ID) (block.Block, error) { - if blk, exists := s.addedBlocks[blockID]; exists { - return blk, nil + rawTxDB := prefixdb.New(txID[:], s.rewardUTXOsDB) + txDB := linkeddb.NewDefault(rawTxDB) + it := txDB.NewIterator() + defer it.Release() + + utxos := []*avax.UTXO(nil) + for it.Next() { + utxo := &avax.UTXO{} + if _, err := txs.Codec.Unmarshal(it.Value(), utxo); err != nil { + return nil, err + } + utxos = append(utxos, utxo) + } + if err := it.Error(); err != nil { + return nil, err } - if blk, cached := s.blockCache.Get(blockID); cached { - if blk == nil { + s.rewardUTXOsCache.Put(txID, utxos) + return utxos, nil +} + +func (s *state) AddRewardUTXO(txID ids.ID, utxo *avax.UTXO) { + s.addedRewardUTXOs[txID] = append(s.addedRewardUTXOs[txID], utxo) +} + +func (s *state) GetUTXO(utxoID ids.ID) (*avax.UTXO, error) { + if utxo, exists := s.modifiedUTXOs[utxoID]; exists { + if utxo == nil { return nil, database.ErrNotFound } - - return blk, nil + return utxo, nil + } + if utxo, found := s.utxoCache.Get(utxoID); found { + if utxo == nil { + return nil, database.ErrNotFound + } + return utxo, nil } - blkBytes, err := s.blockDB.Get(blockID[:]) - switch err { + key := merkleUtxoIDKey(utxoID) + + switch bytes, err := s.merkleDB.Get(key); err { case nil: - // Note: stored blocks are verified, so it's safe to unmarshal them with GenesisCodec - blk, err := block.Parse(block.GenesisCodec, blkBytes) - if err != nil { + utxo := &avax.UTXO{} + if _, err := txs.GenesisCodec.Unmarshal(bytes, utxo); err != nil { return nil, err } - - s.blockCache.Put(blockID, blk) - return blk, nil + s.utxoCache.Put(utxoID, utxo) + return utxo, nil case database.ErrNotFound: - s.blockCache.Put(blockID, nil) + s.utxoCache.Put(utxoID, nil) return nil, database.ErrNotFound default: @@ -1013,90 +912,37 @@ func (s *state) GetStatelessBlock(blockID ids.ID) (block.Block, error) { } } -func (s *state) AddStatelessBlock(block block.Block) { - s.addedBlocks[block.ID()] = block -} - -func (s *state) GetBlockIDAtHeight(height uint64) (ids.ID, error) { - if blkID, exists := s.addedBlockIDs[height]; exists { - return blkID, nil - } - if blkID, cached := s.blockIDCache.Get(height); cached { - if blkID == ids.Empty { - return ids.Empty, database.ErrNotFound - } - - return blkID, nil - } - - heightKey := database.PackUInt64(height) - - blkID, err := database.GetID(s.blockIDDB, heightKey) - if err == database.ErrNotFound { - s.blockIDCache.Put(height, ids.Empty) - return ids.Empty, database.ErrNotFound - } - if err != nil { - return ids.Empty, err - } +func (s *state) UTXOIDs(addr []byte, start ids.ID, limit int) ([]ids.ID, error) { + var ( + prefix = slices.Clone(addr) + key = merkleUtxoIndexKey(addr, start) + ) - s.blockIDCache.Put(height, blkID) - return blkID, nil -} + iter := s.indexedUTXOsDB.NewIteratorWithStartAndPrefix(key, prefix) + defer iter.Release() -// UPTIMES SECTION -func (s *state) GetUptime(vdrID ids.NodeID, subnetID ids.ID) (upDuration time.Duration, lastUpdated time.Time, err error) { - nodeUptimes, exists := s.localUptimesCache[vdrID] - if exists { - uptime, exists := nodeUptimes[subnetID] - if exists { - return uptime.Duration, uptime.lastUpdated, nil + utxoIDs := []ids.ID(nil) + for len(utxoIDs) < limit && iter.Next() { + itAddr, utxoID := splitUtxoIndexKey(iter.Key()) + if !bytes.Equal(itAddr, addr) { + break } - } - - // try loading from DB - key := merkleLocalUptimesKey(vdrID, subnetID) - uptimeBytes, err := s.localUptimesDB.Get(key) - switch err { - case nil: - upTm := &uptimes{} - if _, err := txs.GenesisCodec.Unmarshal(uptimeBytes, upTm); err != nil { - return 0, time.Time{}, err + if utxoID == start { + continue } - upTm.lastUpdated = time.Unix(int64(upTm.LastUpdated), 0) - s.localUptimesCache[vdrID] = make(map[ids.ID]*uptimes) - s.localUptimesCache[vdrID][subnetID] = upTm - return upTm.Duration, upTm.lastUpdated, nil - case database.ErrNotFound: - // no local data for this staker uptime - return 0, time.Time{}, database.ErrNotFound - default: - return 0, time.Time{}, err + start = ids.Empty + utxoIDs = append(utxoIDs, utxoID) } + return utxoIDs, iter.Error() } -func (s *state) SetUptime(vdrID ids.NodeID, subnetID ids.ID, upDuration time.Duration, lastUpdated time.Time) error { - nodeUptimes, exists := s.localUptimesCache[vdrID] - if !exists { - nodeUptimes = make(map[ids.ID]*uptimes) - s.localUptimesCache[vdrID] = nodeUptimes - } - - nodeUptimes[subnetID] = &uptimes{ - Duration: upDuration, - LastUpdated: uint64(lastUpdated.Unix()), - lastUpdated: lastUpdated, - } +func (s *state) AddUTXO(utxo *avax.UTXO) { + s.modifiedUTXOs[utxo.InputID()] = utxo +} - // track diff - updatedNodeUptimes, ok := s.modifiedLocalUptimes[vdrID] - if !ok { - updatedNodeUptimes = set.Set[ids.ID]{} - s.modifiedLocalUptimes[vdrID] = updatedNodeUptimes - } - updatedNodeUptimes.Add(subnetID) - return nil +func (s *state) DeleteUTXO(utxoID ids.ID) { + s.modifiedUTXOs[utxoID] = nil } func (s *state) GetStartTime(nodeID ids.NodeID, subnetID ids.ID) (time.Time, error) { @@ -1107,97 +953,57 @@ func (s *state) GetStartTime(nodeID ids.NodeID, subnetID ids.ID) (time.Time, err return staker.StartTime, nil } -// REWARD UTXOs SECTION -func (s *state) GetRewardUTXOs(txID ids.ID) ([]*avax.UTXO, error) { - if utxos, exists := s.addedRewardUTXOs[txID]; exists { - return utxos, nil - } - if utxos, exists := s.rewardUTXOsCache.Get(txID); exists { - return utxos, nil - } - - rawTxDB := prefixdb.New(txID[:], s.rewardUTXOsDB) - txDB := linkeddb.NewDefault(rawTxDB) - it := txDB.NewIterator() - defer it.Release() - - utxos := []*avax.UTXO(nil) - for it.Next() { - utxo := &avax.UTXO{} - if _, err := txs.Codec.Unmarshal(it.Value(), utxo); err != nil { - return nil, err - } - utxos = append(utxos, utxo) - } - if err := it.Error(); err != nil { - return nil, err - } +func (s *state) GetTimestamp() time.Time { + return s.chainTime +} - s.rewardUTXOsCache.Put(txID, utxos) - return utxos, nil +func (s *state) SetTimestamp(tm time.Time) { + s.chainTime = tm } -func (s *state) AddRewardUTXO(txID ids.ID, utxo *avax.UTXO) { - s.addedRewardUTXOs[txID] = append(s.addedRewardUTXOs[txID], utxo) +func (s *state) GetLastAccepted() ids.ID { + return s.lastAcceptedBlkID } -// VALIDATORS Section -type ValidatorWeightDiff struct { - Decrease bool `serialize:"true"` - Amount uint64 `serialize:"true"` +func (s *state) SetLastAccepted(lastAccepted ids.ID) { + s.lastAcceptedBlkID = lastAccepted } -func (v *ValidatorWeightDiff) Add(negative bool, amount uint64) error { - if v.Decrease == negative { - var err error - v.Amount, err = safemath.Add64(v.Amount, amount) - return err +func (s *state) GetCurrentSupply(subnetID ids.ID) (uint64, error) { + supply, ok := s.modifiedSupplies[subnetID] + if ok { + return supply, nil } - - if v.Amount > amount { - v.Amount -= amount - } else { - v.Amount = safemath.AbsDiff(v.Amount, amount) - v.Decrease = negative + cachedSupply, ok := s.suppliesCache.Get(subnetID) + if ok { + if cachedSupply == nil { + return 0, database.ErrNotFound + } + return *cachedSupply, nil } - return nil -} -func applyWeightDiff( - vdrs map[ids.NodeID]*validators.GetValidatorOutput, - nodeID ids.NodeID, - weightDiff *ValidatorWeightDiff, -) error { - vdr, ok := vdrs[nodeID] - if !ok { - // This node isn't in the current validator set. - vdr = &validators.GetValidatorOutput{ - NodeID: nodeID, + key := merkleSuppliesKey(subnetID) + + switch supplyBytes, err := s.merkleDB.Get(key); err { + case nil: + supply, err := database.ParseUInt64(supplyBytes) + if err != nil { + return 0, fmt.Errorf("failed parsing supply: %w", err) } - vdrs[nodeID] = vdr - } + s.suppliesCache.Put(subnetID, &supply) + return supply, nil - // The weight of this node changed at this block. - var err error - if weightDiff.Decrease { - // The validator's weight was decreased at this block, so in the - // prior block it was higher. - vdr.Weight, err = safemath.Add64(vdr.Weight, weightDiff.Amount) - } else { - // The validator's weight was increased at this block, so in the - // prior block it was lower. - vdr.Weight, err = safemath.Sub(vdr.Weight, weightDiff.Amount) - } - if err != nil { - return err - } + case database.ErrNotFound: + s.suppliesCache.Put(subnetID, nil) + return 0, database.ErrNotFound - if vdr.Weight == 0 { - // The validator's weight was 0 before this block so they weren't in the - // validator set. - delete(vdrs, nodeID) + default: + return 0, err } - return nil +} + +func (s *state) SetCurrentSupply(subnetID ids.ID, cs uint64) { + s.modifiedSupplies[subnetID] = cs } func (s *state) ApplyValidatorWeightDiffs( @@ -1241,6 +1047,43 @@ func (s *state) ApplyValidatorWeightDiffs( return diffIter.Error() } +func applyWeightDiff( + vdrs map[ids.NodeID]*validators.GetValidatorOutput, + nodeID ids.NodeID, + weightDiff *ValidatorWeightDiff, +) error { + vdr, ok := vdrs[nodeID] + if !ok { + // This node isn't in the current validator set. + vdr = &validators.GetValidatorOutput{ + NodeID: nodeID, + } + vdrs[nodeID] = vdr + } + + // The weight of this node changed at this block. + var err error + if weightDiff.Decrease { + // The validator's weight was decreased at this block, so in the + // prior block it was higher. + vdr.Weight, err = safemath.Add64(vdr.Weight, weightDiff.Amount) + } else { + // The validator's weight was increased at this block, so in the + // prior block it was lower. + vdr.Weight, err = safemath.Sub(vdr.Weight, weightDiff.Amount) + } + if err != nil { + return err + } + + if vdr.Weight == 0 { + // The validator's weight was 0 before this block so they weren't in the + // validator set. + delete(vdrs, nodeID) + } + return nil +} + func (s *state) ApplyValidatorPublicKeyDiffs( ctx context.Context, validators map[ids.NodeID]*validators.GetValidatorOutput, @@ -1279,74 +1122,528 @@ func (s *state) ApplyValidatorPublicKeyDiffs( continue } - vdr.PublicKey = new(bls.PublicKey).Deserialize(pkBytes) + vdr.PublicKey = new(bls.PublicKey).Deserialize(pkBytes) + } + return diffIter.Error() +} + +// Loads the state from [genesisBls] and [genesis] into [ms]. +func (s *state) syncGenesis(genesisBlk block.Block, genesis *genesis.Genesis) error { + genesisBlkID := genesisBlk.ID() + s.SetLastAccepted(genesisBlkID) + s.SetTimestamp(time.Unix(int64(genesis.Timestamp), 0)) + s.SetCurrentSupply(constants.PrimaryNetworkID, genesis.InitialSupply) + s.AddStatelessBlock(genesisBlk) + + // Persist UTXOs that exist at genesis + for _, utxo := range genesis.UTXOs { + avaxUTXO := utxo.UTXO + s.AddUTXO(&avaxUTXO) + } + + // Persist primary network validator set at genesis + for _, vdrTx := range genesis.Validators { + validatorTx, ok := vdrTx.Unsigned.(txs.ValidatorTx) + if !ok { + return fmt.Errorf("expected tx type txs.ValidatorTx but got %T", vdrTx.Unsigned) + } + + stakeAmount := validatorTx.Weight() + stakeDuration := validatorTx.EndTime().Sub(validatorTx.StartTime()) + currentSupply, err := s.GetCurrentSupply(constants.PrimaryNetworkID) + if err != nil { + return err + } + + potentialReward := s.rewards.Calculate( + stakeDuration, + stakeAmount, + currentSupply, + ) + newCurrentSupply, err := safemath.Add64(currentSupply, potentialReward) + if err != nil { + return err + } + + staker, err := NewCurrentStaker(vdrTx.ID(), validatorTx, potentialReward) + if err != nil { + return err + } + + s.PutCurrentValidator(staker) + s.AddTx(vdrTx, status.Committed) + s.SetCurrentSupply(constants.PrimaryNetworkID, newCurrentSupply) + } + + for _, chain := range genesis.Chains { + unsignedChain, ok := chain.Unsigned.(*txs.CreateChainTx) + if !ok { + return fmt.Errorf("expected tx type *txs.CreateChainTx but got %T", chain.Unsigned) + } + + // Ensure all chains that the genesis bytes say to create have the right + // network ID + if unsignedChain.NetworkID != s.ctx.NetworkID { + return avax.ErrWrongNetworkID + } + + s.AddChain(chain) + s.AddTx(chain, status.Committed) + } + + // updateValidators is set to false here to maintain the invariant that the + // primary network's validator set is empty before the validator sets are + // initialized. + return s.write(false /*=updateValidators*/, 0) +} + +// Load pulls data previously stored on disk that is expected to be in memory. +func (s *state) load(hasSynced bool) error { + return utils.Err( + s.loadMerkleMetadata(), + s.loadCurrentStakers(), + s.loadPendingStakers(), + s.initValidatorSets(), + + s.logMerkleRoot(!hasSynced), // we already logged if sync has happened + ) +} + +// Loads the chain time and last accepted block ID from disk +// and populates them in [ms]. +func (s *state) loadMerkleMetadata() error { + // load chain time + chainTimeBytes, err := s.merkleDB.Get(merkleChainTimeKey) + if err != nil { + return err + } + var chainTime time.Time + if err := chainTime.UnmarshalBinary(chainTimeBytes); err != nil { + return err + } + s.latestComittedChainTime = chainTime + s.SetTimestamp(chainTime) + + // load last accepted block + blkIDBytes, err := s.merkleDB.Get(merkleLastAcceptedBlkIDKey) + if err != nil { + return err + } + lastAcceptedBlkID := ids.Empty + copy(lastAcceptedBlkID[:], blkIDBytes) + s.latestCommittedLastAcceptedBlkID = lastAcceptedBlkID + s.SetLastAccepted(lastAcceptedBlkID) + + // We don't need to load supplies. Unlike chain time and last block ID, + // which have the persisted* attribute, we signify that a supply hasn't + // been modified by making it nil. + return nil +} + +// Loads current stakes from disk and populates them in [ms]. +func (s *state) loadCurrentStakers() error { + // TODO ABENEGIA: Check missing metadata + s.currentStakers = newBaseStakers() + + prefix := make([]byte, len(currentStakersSectionPrefix)) + copy(prefix, currentStakersSectionPrefix) + + iter := s.merkleDB.NewIteratorWithPrefix(prefix) + defer iter.Release() + for iter.Next() { + data := &stakersData{} + if _, err := txs.GenesisCodec.Unmarshal(iter.Value(), data); err != nil { + return fmt.Errorf("failed to deserialize current stakers data: %w", err) + } + + tx, err := txs.Parse(txs.GenesisCodec, data.TxBytes) + if err != nil { + return fmt.Errorf("failed to parsing current stakerTx: %w", err) + } + stakerTx, ok := tx.Unsigned.(txs.Staker) + if !ok { + return fmt.Errorf("expected tx type txs.Staker but got %T", tx.Unsigned) + } + + staker, err := NewCurrentStaker(tx.ID(), stakerTx, data.PotentialReward) + if err != nil { + return err + } + if staker.Priority.IsValidator() { + // TODO: why not PutValidator/PutDelegator?? + validator := s.currentStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) + validator.validator = staker + s.currentStakers.stakers.ReplaceOrInsert(staker) + } else { + validator := s.currentStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) + if validator.delegators == nil { + validator.delegators = btree.NewG(defaultTreeDegree, (*Staker).Less) + } + validator.delegators.ReplaceOrInsert(staker) + s.currentStakers.stakers.ReplaceOrInsert(staker) + } + } + return iter.Error() +} + +func (s *state) loadPendingStakers() error { + // TODO ABENEGIA: Check missing metadata + s.pendingStakers = newBaseStakers() + + prefix := make([]byte, len(pendingStakersSectionPrefix)) + copy(prefix, pendingStakersSectionPrefix) + + iter := s.merkleDB.NewIteratorWithPrefix(prefix) + defer iter.Release() + for iter.Next() { + data := &stakersData{} + if _, err := txs.GenesisCodec.Unmarshal(iter.Value(), data); err != nil { + return fmt.Errorf("failed to deserialize pending stakers data: %w", err) + } + + tx, err := txs.Parse(txs.GenesisCodec, data.TxBytes) + if err != nil { + return fmt.Errorf("failed to parsing pending stakerTx: %w", err) + } + stakerTx, ok := tx.Unsigned.(txs.Staker) + if !ok { + return fmt.Errorf("expected tx type txs.Staker but got %T", tx.Unsigned) + } + + staker, err := NewPendingStaker(tx.ID(), stakerTx) + if err != nil { + return err + } + if staker.Priority.IsValidator() { + validator := s.pendingStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) + validator.validator = staker + s.pendingStakers.stakers.ReplaceOrInsert(staker) + } else { + validator := s.pendingStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) + if validator.delegators == nil { + validator.delegators = btree.NewG(defaultTreeDegree, (*Staker).Less) + } + validator.delegators.ReplaceOrInsert(staker) + s.pendingStakers.stakers.ReplaceOrInsert(staker) + } + } + return iter.Error() +} + +// Invariant: initValidatorSets requires loadCurrentValidators to have already +// been called. +func (s *state) initValidatorSets() error { + for subnetID, validators := range s.currentStakers.validators { + if s.validators.Count(subnetID) != 0 { + // Enforce the invariant that the validator set is empty here. + return fmt.Errorf("%w: %s", errValidatorSetAlreadyPopulated, subnetID) + } + + for nodeID, validator := range validators { + validatorStaker := validator.validator + if err := s.validators.AddStaker(subnetID, nodeID, validatorStaker.PublicKey, validatorStaker.TxID, validatorStaker.Weight); err != nil { + return err + } + + delegatorIterator := NewTreeIterator(validator.delegators) + for delegatorIterator.Next() { + delegatorStaker := delegatorIterator.Value() + if err := s.validators.AddWeight(subnetID, nodeID, delegatorStaker.Weight); err != nil { + delegatorIterator.Release() + return err + } + } + delegatorIterator.Release() + } + } + + s.metrics.SetLocalStake(s.validators.GetWeight(constants.PrimaryNetworkID, s.ctx.NodeID)) + totalWeight, err := s.validators.TotalWeight(constants.PrimaryNetworkID) + if err != nil { + return fmt.Errorf("failed to get total weight of primary network validators: %w", err) + } + s.metrics.SetTotalStake(totalWeight) + return nil +} + +func (s *state) write(updateValidators bool, height uint64) error { + currentData, weightDiffs, blsKeyDiffs, valSetDiff, err := s.processCurrentStakers() + if err != nil { + return err + } + pendingData, err := s.processPendingStakers() + if err != nil { + return err + } + + return utils.Err( + s.writeMerkleState(currentData, pendingData), + s.writeBlocks(), + s.writeTxs(), + s.writeLocalUptimes(), + s.writeWeightDiffs(height, weightDiffs), + s.writeBlsKeyDiffs(height, blsKeyDiffs), + s.writeRewardUTXOs(), + s.updateValidatorSet(updateValidators, valSetDiff, weightDiffs), + ) +} + +func (s *state) Close() error { + return utils.Err( + s.flatValidatorWeightDiffsDB.Close(), + s.flatValidatorPublicKeyDiffsDB.Close(), + s.localUptimesDB.Close(), + s.indexedUTXOsDB.Close(), + s.txDB.Close(), + s.blockDB.Close(), + s.blockIDDB.Close(), + s.merkleDB.Close(), + s.baseMerkleDB.Close(), + ) +} + +// If [ms] isn't initialized, initializes it with [genesis]. +// Then loads [ms] from disk. +func (s *state) sync(genesis []byte) error { + shouldInit, err := s.shouldInit() + if err != nil { + return fmt.Errorf( + "failed to check if the database is initialized: %w", + err, + ) + } + + // If the database is empty, create the platform chain anew using the + // provided genesis state + if shouldInit { + if err := s.init(genesis); err != nil { + return fmt.Errorf( + "failed to initialize the database: %w", + err, + ) + } + } + + return s.load(shouldInit) +} + +// Creates a genesis from [genesisBytes] and initializes [ms] with it. +func (s *state) init(genesisBytes []byte) error { + // Create the genesis block and save it as being accepted (We don't do + // genesisBlock.Accept() because then it'd look for genesisBlock's + // non-existent parent) + genesisID := hashing.ComputeHash256Array(genesisBytes) + genesisBlock, err := block.NewApricotCommitBlock(genesisID, 0 /*height*/) + if err != nil { + return err + } + + genesisState, err := genesis.Parse(genesisBytes) + if err != nil { + return err + } + if err := s.syncGenesis(genesisBlock, genesisState); err != nil { + return err + } + + if err := s.doneInit(); err != nil { + return err + } + + return s.Commit() +} + +func (s *state) AddStatelessBlock(block block.Block) { + s.addedBlocks[block.ID()] = block +} + +func (s *state) SetHeight(height uint64) { + s.lastAcceptedHeight = height +} + +func (s *state) Commit() error { + defer s.Abort() + batch, err := s.CommitBatch() + if err != nil { + return err + } + return batch.Write() +} + +func (s *state) Abort() { + s.baseDB.Abort() +} + +func (*state) Checksum() ids.ID { + return ids.Empty +} + +func (s *state) CommitBatch() (database.Batch, error) { + // updateValidators is set to true here so that the validator manager is + // kept up to date with the last accepted state. + if err := s.write(true /*updateValidators*/, s.lastAcceptedHeight); err != nil { + return nil, err + } + return s.baseDB.CommitBatch() +} + +func (s *state) writeBlocks() error { + for blkID, blk := range s.addedBlocks { + var ( + blkID = blkID + blkHeight = blk.Height() + ) + + delete(s.addedBlockIDs, blkHeight) + s.blockIDCache.Put(blkHeight, blkID) + if err := database.PutID(s.blockIDDB, database.PackUInt64(blkHeight), blkID); err != nil { + return fmt.Errorf("failed to write block height index: %w", err) + } + + delete(s.addedBlocks, blkID) + // Note: Evict is used rather than Put here because blk may end up + // referencing additional data (because of shared byte slices) that + // would not be properly accounted for in the cache sizing. + s.blockCache.Evict(blkID) + + if err := s.blockDB.Put(blkID[:], blk.Bytes()); err != nil { + return fmt.Errorf("failed to write block %s: %w", blkID, err) + } + } + return nil +} + +func (s *state) GetStatelessBlock(blockID ids.ID) (block.Block, error) { + if blk, exists := s.addedBlocks[blockID]; exists { + return blk, nil + } + + if blk, cached := s.blockCache.Get(blockID); cached { + if blk == nil { + return nil, database.ErrNotFound + } + + return blk, nil + } + + blkBytes, err := s.blockDB.Get(blockID[:]) + switch err { + case nil: + // Note: stored blocks are verified, so it's safe to unmarshal them with GenesisCodec + blk, err := block.Parse(block.GenesisCodec, blkBytes) + if err != nil { + return nil, err + } + + s.blockCache.Put(blockID, blk) + return blk, nil + + case database.ErrNotFound: + s.blockCache.Put(blockID, nil) + return nil, database.ErrNotFound + + default: + return nil, err + } +} + +func (s *state) GetBlockIDAtHeight(height uint64) (ids.ID, error) { + if blkID, exists := s.addedBlockIDs[height]; exists { + return blkID, nil + } + if blkID, cached := s.blockIDCache.Get(height); cached { + if blkID == ids.Empty { + return ids.Empty, database.ErrNotFound + } + + return blkID, nil } - return diffIter.Error() -} -// DB Operations -func (s *state) Abort() { - s.baseDB.Abort() -} + heightKey := database.PackUInt64(height) -func (s *state) Commit() error { - defer s.Abort() - batch, err := s.CommitBatch() + blkID, err := database.GetID(s.blockIDDB, heightKey) + if err == database.ErrNotFound { + s.blockIDCache.Put(height, ids.Empty) + return ids.Empty, database.ErrNotFound + } if err != nil { - return err + return ids.Empty, err } - return batch.Write() -} -func (s *state) CommitBatch() (database.Batch, error) { - // updateValidators is set to true here so that the validator manager is - // kept up to date with the last accepted state. - if err := s.write(true /*updateValidators*/, s.lastAcceptedHeight); err != nil { - return nil, err - } - return s.baseDB.CommitBatch() + s.blockIDCache.Put(height, blkID) + return blkID, nil } -func (*state) Checksum() ids.ID { - return ids.Empty -} +func (*state) writeCurrentStakers(batchOps *[]database.BatchOp, currentData map[ids.ID]*stakersData) error { + for stakerTxID, data := range currentData { + key := merkleCurrentStakersKey(stakerTxID) -func (s *state) Close() error { - return utils.Err( - s.flatValidatorWeightDiffsDB.Close(), - s.flatValidatorPublicKeyDiffsDB.Close(), - s.localUptimesDB.Close(), - s.indexedUTXOsDB.Close(), - s.txDB.Close(), - s.blockDB.Close(), - s.blockIDDB.Close(), - s.merkleDB.Close(), - s.baseMerkleDB.Close(), - ) + if data.TxBytes == nil { + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Delete: true, + }) + continue + } + + dataBytes, err := txs.GenesisCodec.Marshal(txs.Version, data) + if err != nil { + return fmt.Errorf("failed to serialize current stakers data, stakerTxID %v: %w", stakerTxID, err) + } + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Value: dataBytes, + }) + } + return nil } -func (s *state) write(updateValidators bool, height uint64) error { - currentData, weightDiffs, blsKeyDiffs, valSetDiff, err := s.processCurrentStakers() +func (s *state) GetDelegateeReward(subnetID ids.ID, vdrID ids.NodeID) (uint64, error) { + nodeDelegateeRewards, exists := s.delegateeRewardCache[vdrID] + if exists { + delegateeReward, exists := nodeDelegateeRewards[subnetID] + if exists { + return delegateeReward, nil + } + } + + // try loading from the db + key := merkleDelegateeRewardsKey(vdrID, subnetID) + amountBytes, err := s.merkleDB.Get(key) if err != nil { - return err + return 0, err } - pendingData, err := s.processPendingStakers() + delegateeReward, err := database.ParseUInt64(amountBytes) if err != nil { - return err + return 0, err } - return utils.Err( - s.writeMerkleState(currentData, pendingData), - s.writeBlocks(), - s.writeTxs(), - s.writeLocalUptimes(), - s.writeWeightDiffs(height, weightDiffs), - s.writeBlsKeyDiffs(height, blsKeyDiffs), - s.writeRewardUTXOs(), - s.updateValidatorSet(updateValidators, valSetDiff, weightDiffs), - ) + if _, found := s.delegateeRewardCache[vdrID]; !found { + s.delegateeRewardCache[vdrID] = make(map[ids.ID]uint64) + } + s.delegateeRewardCache[vdrID][subnetID] = delegateeReward + return delegateeReward, nil +} + +func (s *state) SetDelegateeReward(subnetID ids.ID, vdrID ids.NodeID, amount uint64) error { + nodeDelegateeRewards, exists := s.delegateeRewardCache[vdrID] + if !exists { + nodeDelegateeRewards = make(map[ids.ID]uint64) + s.delegateeRewardCache[vdrID] = nodeDelegateeRewards + } + nodeDelegateeRewards[subnetID] = amount + + // track diff + updatedDelegateeRewards, ok := s.modifiedDelegateeReward[vdrID] + if !ok { + updatedDelegateeRewards = set.Set[ids.ID]{} + s.modifiedDelegateeReward[vdrID] = updatedDelegateeRewards + } + updatedDelegateeRewards.Add(subnetID) + return nil } +// DB Operations func (s *state) processCurrentStakers() ( map[ids.ID]*stakersData, map[weightDiffKey]*ValidatorWeightDiff, @@ -1537,156 +1834,92 @@ func (s *state) writeMerkleState(currentData, pendingData map[ids.ID]*stakersDat return s.logMerkleRoot(len(batchOps) != 0) } -func (s *state) writeMetadata(batchOps *[]database.BatchOp) error { - if !s.chainTime.Equal(s.latestComittedChainTime) { - encodedChainTime, err := s.chainTime.MarshalBinary() - if err != nil { - return fmt.Errorf("failed to encoding chainTime: %w", err) - } - - *batchOps = append(*batchOps, database.BatchOp{ - Key: merkleChainTimeKey, - Value: encodedChainTime, - }) - s.latestComittedChainTime = s.chainTime - } - - if s.lastAcceptedBlkID != s.latestCommittedLastAcceptedBlkID { - *batchOps = append(*batchOps, database.BatchOp{ - Key: merkleLastAcceptedBlkIDKey, - Value: s.lastAcceptedBlkID[:], - }) - s.latestCommittedLastAcceptedBlkID = s.lastAcceptedBlkID - } - - // lastAcceptedBlockHeight not persisted yet in merkleDB state. - // TODO: Consider if it should be - - for subnetID, supply := range s.modifiedSupplies { - supply := supply - delete(s.modifiedSupplies, subnetID) // clear up s.supplies to avoid potential double commits - s.suppliesCache.Put(subnetID, &supply) - - key := merkleSuppliesKey(subnetID) - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Value: database.PackUInt64(supply), - }) - } - return nil -} - -func (s *state) writePermissionedSubnets(batchOps *[]database.BatchOp) error { //nolint:golint,unparam - for _, subnetTx := range s.addedPermissionedSubnets { - key := merklePermissionedSubnetKey(subnetTx.ID()) - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Value: subnetTx.Bytes(), - }) - } - s.addedPermissionedSubnets = make([]*txs.Tx, 0) - return nil -} +func (*state) writePendingStakers(batchOps *[]database.BatchOp, pendingData map[ids.ID]*stakersData) error { + for stakerTxID, data := range pendingData { + key := merklePendingStakersKey(stakerTxID) -func (s *state) writeSubnetOwners(batchOps *[]database.BatchOp) error { - for subnetID, owner := range s.subnetOwners { - owner := owner + if data.TxBytes == nil { + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Delete: true, + }) + continue + } - ownerBytes, err := block.GenesisCodec.Marshal(block.Version, &owner) + dataBytes, err := txs.GenesisCodec.Marshal(txs.Version, data) if err != nil { - return fmt.Errorf("failed to marshal subnet owner: %w", err) + return fmt.Errorf("failed to serialize pending stakers data, stakerTxID %v: %w", stakerTxID, err) } - - s.subnetOwnerCache.Put(subnetID, fxOwnerAndSize{ - owner: owner, - size: len(ownerBytes), - }) - - key := merkleSubnetOwnersKey(subnetID) *batchOps = append(*batchOps, database.BatchOp{ Key: key, - Value: ownerBytes, + Value: dataBytes, }) } - maps.Clear(s.subnetOwners) return nil } -func (s *state) writeElasticSubnets(batchOps *[]database.BatchOp) error { //nolint:golint,unparam - for subnetID, transforkSubnetTx := range s.addedElasticSubnets { - key := merkleElasticSubnetKey(subnetID) - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Value: transforkSubnetTx.Bytes(), - }) - delete(s.addedElasticSubnets, subnetID) - - // Note: Evict is used rather than Put here because tx may end up - // referencing additional data (because of shared byte slices) that - // would not be properly accounted for in the cache sizing. - s.elasticSubnetCache.Evict(subnetID) - } - return nil -} +func (s *state) writeDelegateeRewards(batchOps *[]database.BatchOp) error { //nolint:golint,unparam + for nodeID, nodeDelegateeRewards := range s.modifiedDelegateeReward { + nodeDelegateeRewardsList := nodeDelegateeRewards.List() + for _, subnetID := range nodeDelegateeRewardsList { + delegateeReward := s.delegateeRewardCache[nodeID][subnetID] -func (s *state) writeChains(batchOps *[]database.BatchOp) error { //nolint:golint,unparam - for subnetID, chains := range s.addedChains { - for _, chainTx := range chains { - key := merkleChainKey(subnetID, chainTx.ID()) + key := merkleDelegateeRewardsKey(nodeID, subnetID) *batchOps = append(*batchOps, database.BatchOp{ Key: key, - Value: chainTx.Bytes(), + Value: database.PackUInt64(delegateeReward), }) } - delete(s.addedChains, subnetID) + delete(s.modifiedDelegateeReward, nodeID) } return nil } -func (*state) writeCurrentStakers(batchOps *[]database.BatchOp, currentData map[ids.ID]*stakersData) error { - for stakerTxID, data := range currentData { - key := merkleCurrentStakersKey(stakerTxID) +func (s *state) writeTxs() error { + for txID, txStatus := range s.addedTxs { + txID := txID - if data.TxBytes == nil { - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Delete: true, - }) - continue + stx := txBytesAndStatus{ + Tx: txStatus.tx.Bytes(), + Status: txStatus.status, } - dataBytes, err := txs.GenesisCodec.Marshal(txs.Version, data) + // Note that we're serializing a [txBytesAndStatus] here, not a + // *txs.Tx, so we don't use [txs.Codec]. + txBytes, err := txs.GenesisCodec.Marshal(txs.Version, &stx) if err != nil { - return fmt.Errorf("failed to serialize current stakers data, stakerTxID %v: %w", stakerTxID, err) + return fmt.Errorf("failed to serialize tx: %w", err) + } + + delete(s.addedTxs, txID) + // Note: Evict is used rather than Put here because stx may end up + // referencing additional data (because of shared byte slices) that + // would not be properly accounted for in the cache sizing. + s.txCache.Evict(txID) + if err := s.txDB.Put(txID[:], txBytes); err != nil { + return fmt.Errorf("failed to add tx: %w", err) } - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Value: dataBytes, - }) } return nil } -func (*state) writePendingStakers(batchOps *[]database.BatchOp, pendingData map[ids.ID]*stakersData) error { - for stakerTxID, data := range pendingData { - key := merklePendingStakersKey(stakerTxID) - - if data.TxBytes == nil { - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Delete: true, - }) - continue - } +func (s *state) writeRewardUTXOs() error { + for txID, utxos := range s.addedRewardUTXOs { + delete(s.addedRewardUTXOs, txID) + s.rewardUTXOsCache.Put(txID, utxos) + rawTxDB := prefixdb.New(txID[:], s.rewardUTXOsDB) + txDB := linkeddb.NewDefault(rawTxDB) - dataBytes, err := txs.GenesisCodec.Marshal(txs.Version, data) - if err != nil { - return fmt.Errorf("failed to serialize pending stakers data, stakerTxID %v: %w", stakerTxID, err) + for _, utxo := range utxos { + utxoBytes, err := txs.GenesisCodec.Marshal(txs.Version, utxo) + if err != nil { + return fmt.Errorf("failed to serialize reward UTXO: %w", err) + } + utxoID := utxo.InputID() + if err := txDB.Put(utxoID[:], utxoBytes); err != nil { + return fmt.Errorf("failed to add reward UTXO: %w", err) + } } - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Value: dataBytes, - }) } return nil } @@ -1737,74 +1970,56 @@ func (s *state) writeUTXOs(batchOps *[]database.BatchOp) error { return nil } -func (s *state) writeDelegateeRewards(batchOps *[]database.BatchOp) error { //nolint:golint,unparam - for nodeID, nodeDelegateeRewards := range s.modifiedDelegateeReward { - nodeDelegateeRewardsList := nodeDelegateeRewards.List() - for _, subnetID := range nodeDelegateeRewardsList { - delegateeReward := s.delegateeRewardCache[nodeID][subnetID] - - key := merkleDelegateeRewardsKey(nodeID, subnetID) - *batchOps = append(*batchOps, database.BatchOp{ - Key: key, - Value: database.PackUInt64(delegateeReward), - }) - } - delete(s.modifiedDelegateeReward, nodeID) +func (s *state) writePermissionedSubnets(batchOps *[]database.BatchOp) error { //nolint:golint,unparam + for _, subnetTx := range s.addedPermissionedSubnets { + key := merklePermissionedSubnetKey(subnetTx.ID()) + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Value: subnetTx.Bytes(), + }) } + s.addedPermissionedSubnets = make([]*txs.Tx, 0) return nil } -func (s *state) writeBlocks() error { - for blkID, blk := range s.addedBlocks { - var ( - blkID = blkID - blkHeight = blk.Height() - ) - - delete(s.addedBlockIDs, blkHeight) - s.blockIDCache.Put(blkHeight, blkID) - if err := database.PutID(s.blockIDDB, database.PackUInt64(blkHeight), blkID); err != nil { - return fmt.Errorf("failed to write block height index: %w", err) - } +func (s *state) writeElasticSubnets(batchOps *[]database.BatchOp) error { //nolint:golint,unparam + for subnetID, transforkSubnetTx := range s.addedElasticSubnets { + key := merkleElasticSubnetKey(subnetID) + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Value: transforkSubnetTx.Bytes(), + }) + delete(s.addedElasticSubnets, subnetID) - delete(s.addedBlocks, blkID) - // Note: Evict is used rather than Put here because blk may end up + // Note: Evict is used rather than Put here because tx may end up // referencing additional data (because of shared byte slices) that // would not be properly accounted for in the cache sizing. - s.blockCache.Evict(blkID) - - if err := s.blockDB.Put(blkID[:], blk.Bytes()); err != nil { - return fmt.Errorf("failed to write block %s: %w", blkID, err) - } + s.elasticSubnetCache.Evict(subnetID) } return nil } -func (s *state) writeTxs() error { - for txID, txStatus := range s.addedTxs { - txID := txID - - stx := txBytesAndStatus{ - Tx: txStatus.tx.Bytes(), - Status: txStatus.status, - } +func (s *state) writeSubnetOwners(batchOps *[]database.BatchOp) error { + for subnetID, owner := range s.subnetOwners { + owner := owner - // Note that we're serializing a [txBytesAndStatus] here, not a - // *txs.Tx, so we don't use [txs.Codec]. - txBytes, err := txs.GenesisCodec.Marshal(txs.Version, &stx) + ownerBytes, err := block.GenesisCodec.Marshal(block.Version, &owner) if err != nil { - return fmt.Errorf("failed to serialize tx: %w", err) + return fmt.Errorf("failed to marshal subnet owner: %w", err) } - delete(s.addedTxs, txID) - // Note: Evict is used rather than Put here because stx may end up - // referencing additional data (because of shared byte slices) that - // would not be properly accounted for in the cache sizing. - s.txCache.Evict(txID) - if err := s.txDB.Put(txID[:], txBytes); err != nil { - return fmt.Errorf("failed to add tx: %w", err) - } + s.subnetOwnerCache.Put(subnetID, fxOwnerAndSize{ + owner: owner, + size: len(ownerBytes), + }) + + key := merkleSubnetOwnersKey(subnetID) + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Value: ownerBytes, + }) } + maps.Clear(s.subnetOwners) return nil } @@ -1851,6 +2066,59 @@ func (s *state) writeLocalUptimes() error { return nil } +func (s *state) writeChains(batchOps *[]database.BatchOp) error { //nolint:golint,unparam + for subnetID, chains := range s.addedChains { + for _, chainTx := range chains { + key := merkleChainKey(subnetID, chainTx.ID()) + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Value: chainTx.Bytes(), + }) + } + delete(s.addedChains, subnetID) + } + return nil +} + +func (s *state) writeMetadata(batchOps *[]database.BatchOp) error { + if !s.chainTime.Equal(s.latestComittedChainTime) { + encodedChainTime, err := s.chainTime.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to encoding chainTime: %w", err) + } + + *batchOps = append(*batchOps, database.BatchOp{ + Key: merkleChainTimeKey, + Value: encodedChainTime, + }) + s.latestComittedChainTime = s.chainTime + } + + if s.lastAcceptedBlkID != s.latestCommittedLastAcceptedBlkID { + *batchOps = append(*batchOps, database.BatchOp{ + Key: merkleLastAcceptedBlkIDKey, + Value: s.lastAcceptedBlkID[:], + }) + s.latestCommittedLastAcceptedBlkID = s.lastAcceptedBlkID + } + + // lastAcceptedBlockHeight not persisted yet in merkleDB state. + // TODO: Consider if it should be + + for subnetID, supply := range s.modifiedSupplies { + supply := supply + delete(s.modifiedSupplies, subnetID) // clear up s.supplies to avoid potential double commits + s.suppliesCache.Put(subnetID, &supply) + + key := merkleSuppliesKey(subnetID) + *batchOps = append(*batchOps, database.BatchOp{ + Key: key, + Value: database.PackUInt64(supply), + }) + } + return nil +} + func (s *state) writeWeightDiffs(height uint64, weightDiffs map[weightDiffKey]*ValidatorWeightDiff) error { for weightKey, weightDiff := range weightDiffs { if weightDiff.Amount == 0 { @@ -1884,27 +2152,6 @@ func (s *state) writeBlsKeyDiffs(height uint64, blsKeyDiffs map[ids.NodeID]*bls. return nil } -func (s *state) writeRewardUTXOs() error { - for txID, utxos := range s.addedRewardUTXOs { - delete(s.addedRewardUTXOs, txID) - s.rewardUTXOsCache.Put(txID, utxos) - rawTxDB := prefixdb.New(txID[:], s.rewardUTXOsDB) - txDB := linkeddb.NewDefault(rawTxDB) - - for _, utxo := range utxos { - utxoBytes, err := txs.GenesisCodec.Marshal(txs.Version, utxo) - if err != nil { - return fmt.Errorf("failed to serialize reward UTXO: %w", err) - } - utxoID := utxo.InputID() - if err := txDB.Put(utxoID[:], utxoBytes); err != nil { - return fmt.Errorf("failed to add reward UTXO: %w", err) - } - } - } - return nil -} - func (s *state) updateValidatorSet( updateValidators bool, valSetDiff map[weightDiffKey]*diffValidator, @@ -1990,3 +2237,57 @@ func (s *state) logMerkleRoot(hasChanges bool) error { ) return nil } + +func (s *state) GetUptime(vdrID ids.NodeID, subnetID ids.ID) (upDuration time.Duration, lastUpdated time.Time, err error) { + nodeUptimes, exists := s.localUptimesCache[vdrID] + if exists { + uptime, exists := nodeUptimes[subnetID] + if exists { + return uptime.Duration, uptime.lastUpdated, nil + } + } + + // try loading from DB + key := merkleLocalUptimesKey(vdrID, subnetID) + uptimeBytes, err := s.localUptimesDB.Get(key) + switch err { + case nil: + upTm := &uptimes{} + if _, err := txs.GenesisCodec.Unmarshal(uptimeBytes, upTm); err != nil { + return 0, time.Time{}, err + } + upTm.lastUpdated = time.Unix(int64(upTm.LastUpdated), 0) + s.localUptimesCache[vdrID] = make(map[ids.ID]*uptimes) + s.localUptimesCache[vdrID][subnetID] = upTm + return upTm.Duration, upTm.lastUpdated, nil + + case database.ErrNotFound: + // no local data for this staker uptime + return 0, time.Time{}, database.ErrNotFound + default: + return 0, time.Time{}, err + } +} + +func (s *state) SetUptime(vdrID ids.NodeID, subnetID ids.ID, upDuration time.Duration, lastUpdated time.Time) error { + nodeUptimes, exists := s.localUptimesCache[vdrID] + if !exists { + nodeUptimes = make(map[ids.ID]*uptimes) + s.localUptimesCache[vdrID] = nodeUptimes + } + + nodeUptimes[subnetID] = &uptimes{ + Duration: upDuration, + LastUpdated: uint64(lastUpdated.Unix()), + lastUpdated: lastUpdated, + } + + // track diff + updatedNodeUptimes, ok := s.modifiedLocalUptimes[vdrID] + if !ok { + updatedNodeUptimes = set.Set[ids.ID]{} + s.modifiedLocalUptimes[vdrID] = updatedNodeUptimes + } + updatedNodeUptimes.Add(subnetID) + return nil +} diff --git a/vms/platformvm/state/state_load_ops.go b/vms/platformvm/state/state_load_ops.go deleted file mode 100644 index 1db86ffc01b2..000000000000 --- a/vms/platformvm/state/state_load_ops.go +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package state - -import ( - "fmt" - "time" - - "github.com/google/btree" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils" - "github.com/ava-labs/avalanchego/utils/constants" - "github.com/ava-labs/avalanchego/utils/hashing" - "github.com/ava-labs/avalanchego/vms/components/avax" - "github.com/ava-labs/avalanchego/vms/platformvm/block" - "github.com/ava-labs/avalanchego/vms/platformvm/genesis" - "github.com/ava-labs/avalanchego/vms/platformvm/status" - "github.com/ava-labs/avalanchego/vms/platformvm/txs" - - safemath "github.com/ava-labs/avalanchego/utils/math" -) - -// var errNotYetImplemented = errors.New("NOT YET IMPLEMENTED") - -// If [ms] isn't initialized, initializes it with [genesis]. -// Then loads [ms] from disk. -func (s *state) sync(genesis []byte) error { - shouldInit, err := s.shouldInit() - if err != nil { - return fmt.Errorf( - "failed to check if the database is initialized: %w", - err, - ) - } - - // If the database is empty, create the platform chain anew using the - // provided genesis state - if shouldInit { - if err := s.init(genesis); err != nil { - return fmt.Errorf( - "failed to initialize the database: %w", - err, - ) - } - } - - return s.load(shouldInit) -} - -func (s *state) shouldInit() (bool, error) { - has, err := s.singletonDB.Has(initializedKey) - return !has, err -} - -func (s *state) doneInit() error { - return s.singletonDB.Put(initializedKey, nil) -} - -// Creates a genesis from [genesisBytes] and initializes [ms] with it. -func (s *state) init(genesisBytes []byte) error { - // Create the genesis block and save it as being accepted (We don't do - // genesisBlock.Accept() because then it'd look for genesisBlock's - // non-existent parent) - genesisID := hashing.ComputeHash256Array(genesisBytes) - genesisBlock, err := block.NewApricotCommitBlock(genesisID, 0 /*height*/) - if err != nil { - return err - } - - genesisState, err := genesis.Parse(genesisBytes) - if err != nil { - return err - } - if err := s.syncGenesis(genesisBlock, genesisState); err != nil { - return err - } - - if err := s.doneInit(); err != nil { - return err - } - - return s.Commit() -} - -// Loads the state from [genesisBls] and [genesis] into [ms]. -func (s *state) syncGenesis(genesisBlk block.Block, genesis *genesis.Genesis) error { - genesisBlkID := genesisBlk.ID() - s.SetLastAccepted(genesisBlkID) - s.SetTimestamp(time.Unix(int64(genesis.Timestamp), 0)) - s.SetCurrentSupply(constants.PrimaryNetworkID, genesis.InitialSupply) - s.AddStatelessBlock(genesisBlk) - - // Persist UTXOs that exist at genesis - for _, utxo := range genesis.UTXOs { - avaxUTXO := utxo.UTXO - s.AddUTXO(&avaxUTXO) - } - - // Persist primary network validator set at genesis - for _, vdrTx := range genesis.Validators { - validatorTx, ok := vdrTx.Unsigned.(txs.ValidatorTx) - if !ok { - return fmt.Errorf("expected tx type txs.ValidatorTx but got %T", vdrTx.Unsigned) - } - - stakeAmount := validatorTx.Weight() - stakeDuration := validatorTx.EndTime().Sub(validatorTx.StartTime()) - currentSupply, err := s.GetCurrentSupply(constants.PrimaryNetworkID) - if err != nil { - return err - } - - potentialReward := s.rewards.Calculate( - stakeDuration, - stakeAmount, - currentSupply, - ) - newCurrentSupply, err := safemath.Add64(currentSupply, potentialReward) - if err != nil { - return err - } - - staker, err := NewCurrentStaker(vdrTx.ID(), validatorTx, potentialReward) - if err != nil { - return err - } - - s.PutCurrentValidator(staker) - s.AddTx(vdrTx, status.Committed) - s.SetCurrentSupply(constants.PrimaryNetworkID, newCurrentSupply) - } - - for _, chain := range genesis.Chains { - unsignedChain, ok := chain.Unsigned.(*txs.CreateChainTx) - if !ok { - return fmt.Errorf("expected tx type *txs.CreateChainTx but got %T", chain.Unsigned) - } - - // Ensure all chains that the genesis bytes say to create have the right - // network ID - if unsignedChain.NetworkID != s.ctx.NetworkID { - return avax.ErrWrongNetworkID - } - - s.AddChain(chain) - s.AddTx(chain, status.Committed) - } - - // updateValidators is set to false here to maintain the invariant that the - // primary network's validator set is empty before the validator sets are - // initialized. - return s.write(false /*=updateValidators*/, 0) -} - -// Load pulls data previously stored on disk that is expected to be in memory. -func (s *state) load(hasSynced bool) error { - return utils.Err( - s.loadMerkleMetadata(), - s.loadCurrentStakers(), - s.loadPendingStakers(), - s.initValidatorSets(), - - s.logMerkleRoot(!hasSynced), // we already logged if sync has happened - ) -} - -// Loads the chain time and last accepted block ID from disk -// and populates them in [ms]. -func (s *state) loadMerkleMetadata() error { - // load chain time - chainTimeBytes, err := s.merkleDB.Get(merkleChainTimeKey) - if err != nil { - return err - } - var chainTime time.Time - if err := chainTime.UnmarshalBinary(chainTimeBytes); err != nil { - return err - } - s.latestComittedChainTime = chainTime - s.SetTimestamp(chainTime) - - // load last accepted block - blkIDBytes, err := s.merkleDB.Get(merkleLastAcceptedBlkIDKey) - if err != nil { - return err - } - lastAcceptedBlkID := ids.Empty - copy(lastAcceptedBlkID[:], blkIDBytes) - s.latestCommittedLastAcceptedBlkID = lastAcceptedBlkID - s.SetLastAccepted(lastAcceptedBlkID) - - // We don't need to load supplies. Unlike chain time and last block ID, - // which have the persisted* attribute, we signify that a supply hasn't - // been modified by making it nil. - return nil -} - -// Loads current stakes from disk and populates them in [ms]. -func (s *state) loadCurrentStakers() error { - // TODO ABENEGIA: Check missing metadata - s.currentStakers = newBaseStakers() - - prefix := make([]byte, len(currentStakersSectionPrefix)) - copy(prefix, currentStakersSectionPrefix) - - iter := s.merkleDB.NewIteratorWithPrefix(prefix) - defer iter.Release() - for iter.Next() { - data := &stakersData{} - if _, err := txs.GenesisCodec.Unmarshal(iter.Value(), data); err != nil { - return fmt.Errorf("failed to deserialize current stakers data: %w", err) - } - - tx, err := txs.Parse(txs.GenesisCodec, data.TxBytes) - if err != nil { - return fmt.Errorf("failed to parsing current stakerTx: %w", err) - } - stakerTx, ok := tx.Unsigned.(txs.Staker) - if !ok { - return fmt.Errorf("expected tx type txs.Staker but got %T", tx.Unsigned) - } - - staker, err := NewCurrentStaker(tx.ID(), stakerTx, data.PotentialReward) - if err != nil { - return err - } - if staker.Priority.IsValidator() { - // TODO: why not PutValidator/PutDelegator?? - validator := s.currentStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) - validator.validator = staker - s.currentStakers.stakers.ReplaceOrInsert(staker) - } else { - validator := s.currentStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) - if validator.delegators == nil { - validator.delegators = btree.NewG(defaultTreeDegree, (*Staker).Less) - } - validator.delegators.ReplaceOrInsert(staker) - s.currentStakers.stakers.ReplaceOrInsert(staker) - } - } - return iter.Error() -} - -func (s *state) loadPendingStakers() error { - // TODO ABENEGIA: Check missing metadata - s.pendingStakers = newBaseStakers() - - prefix := make([]byte, len(pendingStakersSectionPrefix)) - copy(prefix, pendingStakersSectionPrefix) - - iter := s.merkleDB.NewIteratorWithPrefix(prefix) - defer iter.Release() - for iter.Next() { - data := &stakersData{} - if _, err := txs.GenesisCodec.Unmarshal(iter.Value(), data); err != nil { - return fmt.Errorf("failed to deserialize pending stakers data: %w", err) - } - - tx, err := txs.Parse(txs.GenesisCodec, data.TxBytes) - if err != nil { - return fmt.Errorf("failed to parsing pending stakerTx: %w", err) - } - stakerTx, ok := tx.Unsigned.(txs.Staker) - if !ok { - return fmt.Errorf("expected tx type txs.Staker but got %T", tx.Unsigned) - } - - staker, err := NewPendingStaker(tx.ID(), stakerTx) - if err != nil { - return err - } - if staker.Priority.IsValidator() { - validator := s.pendingStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) - validator.validator = staker - s.pendingStakers.stakers.ReplaceOrInsert(staker) - } else { - validator := s.pendingStakers.getOrCreateValidator(staker.SubnetID, staker.NodeID) - if validator.delegators == nil { - validator.delegators = btree.NewG(defaultTreeDegree, (*Staker).Less) - } - validator.delegators.ReplaceOrInsert(staker) - s.pendingStakers.stakers.ReplaceOrInsert(staker) - } - } - return iter.Error() -} - -// Invariant: initValidatorSets requires loadCurrentValidators to have already -// been called. -func (s *state) initValidatorSets() error { - for subnetID, validators := range s.currentStakers.validators { - if s.validators.Count(subnetID) != 0 { - // Enforce the invariant that the validator set is empty here. - return fmt.Errorf("%w: %s", errValidatorSetAlreadyPopulated, subnetID) - } - - for nodeID, validator := range validators { - validatorStaker := validator.validator - if err := s.validators.AddStaker(subnetID, nodeID, validatorStaker.PublicKey, validatorStaker.TxID, validatorStaker.Weight); err != nil { - return err - } - - delegatorIterator := NewTreeIterator(validator.delegators) - for delegatorIterator.Next() { - delegatorStaker := delegatorIterator.Value() - if err := s.validators.AddWeight(subnetID, nodeID, delegatorStaker.Weight); err != nil { - delegatorIterator.Release() - return err - } - } - delegatorIterator.Release() - } - } - - s.metrics.SetLocalStake(s.validators.GetWeight(constants.PrimaryNetworkID, s.ctx.NodeID)) - totalWeight, err := s.validators.TotalWeight(constants.PrimaryNetworkID) - if err != nil { - return fmt.Errorf("failed to get total weight of primary network validators: %w", err) - } - s.metrics.SetTotalStake(totalWeight) - return nil -} diff --git a/vms/platformvm/state/state_test.go b/vms/platformvm/state/state_test.go index 88e5897ba4d4..99c90d55f202 100644 --- a/vms/platformvm/state/state_test.go +++ b/vms/platformvm/state/state_test.go @@ -722,14 +722,6 @@ func TestStateSubnetOwner(t *testing.T) { // Returns the block, status of the block, and whether it is a [stateBlk]. // Invariant: blkBytes is safe to parse with blocks.GenesisCodec -// -// TODO: Remove after v1.11.x is activated -type stateBlk struct { - Blk block.Block - Bytes []byte `serialize:"true"` - Status choices.Status `serialize:"true"` -} - func parseStoredBlock(blkBytes []byte) (block.Block, bool, error) { // Attempt to parse as blocks.Block blk, err := block.Parse(block.GenesisCodec, blkBytes) diff --git a/vms/platformvm/txs/executor/staker_tx_verification.go b/vms/platformvm/txs/executor/staker_tx_verification.go index 9d4af6f5a025..9ec4880e4a44 100644 --- a/vms/platformvm/txs/executor/staker_tx_verification.go +++ b/vms/platformvm/txs/executor/staker_tx_verification.go @@ -479,7 +479,7 @@ func verifyAddPermissionlessValidatorTx( validatorRules, err := getValidatorRules(backend, chainState, tx.Subnet) if err != nil { - return fmt.Errorf("failed retrieving validator rules: %w", err) + return err } duration := tx.Validator.Duration() diff --git a/vms/platformvm/vm_regression_test.go b/vms/platformvm/vm_regression_test.go index 81582f1b14d2..80b38a06c234 100644 --- a/vms/platformvm/vm_regression_test.go +++ b/vms/platformvm/vm_regression_test.go @@ -1436,6 +1436,7 @@ func TestSubnetValidatorBLSKeyDiffAfterExpiry(t *testing.T) { vm.ctx.Lock.Lock() defer func() { require.NoError(vm.Shutdown(context.Background())) + vm.ctx.Lock.Unlock() }() subnetID := testSubnet1.TxID