Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew7234 committed Jul 5, 2024
1 parent 280f809 commit c89e957
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 39 deletions.
8 changes: 4 additions & 4 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ var (
vote = excluded.vote,
height = excluded.height;`

ConsensusValidatorEpochs = `
ValidatorBalancesUnprocessedEpochs = `
SELECT epochs.id, epochs.start_height
FROM chain.epochs as epochs
LEFT JOIN chain.validator_balance_history as history
Expand All @@ -472,10 +472,10 @@ var (
LIMIT $1`

ConsensusValidatorBalanceInsert = `
INSERT INTO chain.validator_balance_history (id, epoch, escrow_balance_active, escrow_balance_debonding)
VALUES ($1, $2, $3, $4)`
INSERT INTO chain.validator_balance_history (id, epoch, escrow_balance_active, escrow_balance_debonding, escrow_total_shares_active, escrow_total_shares_debonding)
VALUES ($1, $2, $3, $4, $5, $6)`

ConsensusValidatorEpochsCount = `
ValidatorBalancesUnprocessedCount = `
SELECT COUNT(*)
FROM chain.epochs AS epochs
LEFT JOIN chain.validator_balance_history AS history
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package evmtokens
package validatorstakinghistory

import (
"context"
Expand All @@ -19,11 +19,12 @@ import (
//
// WARNING: This analyzer SHOULD NOT run while block analyzers are in fast sync. This analyzer expects that
// when a row appears in `chain.epochs`, its first block is already processed. In fast sync, blocks are processed
// out of order. (As of 2024-06, fast-sync actually does not update chain.epochs directly so it might not cause bugs
// out of order. (As of 2024-06, fast-sync actually does not update chain.epochs directly so it might not cause bugs
// when running in parallel with this analyzer, but that's by chance rather than by design.)

const (
//nolint:gosec // thinks this is a hardcoded credential

Check failure on line 26 in analyzer/validatorstakinghistory/validatorstakinghistory.go

View workflow job for this annotation

GitHub Actions / lint-go

directive `//nolint:gosec // thinks this is a hardcoded credential` is unused for linter "gosec" (nolintlint)
validatorBalanceAnalyzerName = "validator_balance_"
validatorHistoryAnalyzerName = "validator_history_"
)

type processor struct {
Expand All @@ -40,15 +41,15 @@ func NewAnalyzer(
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
logger = logger.With("analyzer", validatorBalanceAnalyzerName)
logger = logger.With("analyzer", validatorHistoryAnalyzerName)
p := &processor{
source: sourceClient,
target: target,
logger: logger,
}

return item.NewAnalyzer[*Epoch](
validatorBalanceAnalyzerName,
validatorHistoryAnalyzerName,
cfg,
p,
target,
Expand All @@ -57,22 +58,22 @@ func NewAnalyzer(
}

type Epoch struct {
Epoch uint64
StartHeight int64
epoch uint64
startHeight int64
}

func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*Epoch, error) {
var epochs []*Epoch
rows, err := p.target.Query(ctx, queries.ConsensusValidatorEpochs, limit)
rows, err := p.target.Query(ctx, queries.ValidatorBalancesUnprocessedEpochs, limit)
if err != nil {
return nil, fmt.Errorf("querying epochs for validator history: %w", err)
}
defer rows.Close()
for rows.Next() {
var epoch Epoch
if err = rows.Scan(
&epoch.Epoch,
&epoch.StartHeight,
&epoch.epoch,
&epoch.startHeight,
); err != nil {
return nil, fmt.Errorf("scanning epoch: %w", err)
}
Expand All @@ -83,21 +84,28 @@ func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*Epoch, error

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, epoch *Epoch) error {
p.logger.Info("downloading validator balances", "epoch", epoch)
validators, err := p.source.GetValidators(ctx, epoch.StartHeight)
validators, err := p.source.GetValidators(ctx, epoch.startHeight)
if err != nil {
return fmt.Errorf("downloading validators for height %d", epoch.StartHeight)
return fmt.Errorf("downloading validators for height %d", epoch.startHeight)
}
for _, v := range validators {
addr := staking.NewAddress(v.ID)
acct, err1 := p.source.GetAccount(ctx, epoch.StartHeight, addr)
acct, err1 := p.source.GetAccount(ctx, epoch.startHeight, addr)
if err1 != nil {
return fmt.Errorf("fetching account info for %s at height %d", addr.String(), epoch.startHeight)
}
delegations, err1 := p.source.DelegationsTo(ctx, epoch.startHeight, addr)
if err1 != nil {
return fmt.Errorf("fetching account info for %s at height %d", addr.String(), epoch.StartHeight)
return fmt.Errorf("fetching delegations to account %s at height %d", addr.String(), epoch.startHeight)
}
batch.Queue(queries.ConsensusValidatorBalanceInsert,
v.ID.String(),
epoch.Epoch,
epoch.epoch,
acct.Escrow.Active.Balance,
acct.Escrow.Debonding.Balance,
acct.Escrow.Active.TotalShares,
acct.Escrow.Debonding.TotalShares,
len(delegations),
)
}

Expand All @@ -106,7 +114,7 @@ func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch,

func (p *processor) QueueLength(ctx context.Context) (int, error) {
var queueLength int
if err := p.target.QueryRow(ctx, queries.ConsensusValidatorEpochsCount).Scan(&queueLength); err != nil {
if err := p.target.QueryRow(ctx, queries.ValidatorBalancesUnprocessedCount).Scan(&queueLength); err != nil {
return 0, fmt.Errorf("querying number of unprocessed epochs: %w", err)
}
return queueLength, nil
Expand Down
10 changes: 10 additions & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
nodestats "github.com/oasisprotocol/nexus/analyzer/node_stats"
"github.com/oasisprotocol/nexus/analyzer/runtime"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/analyzer/validatorstakinghistory"
"github.com/oasisprotocol/nexus/cache/httpproxy"
cmdCommon "github.com/oasisprotocol/nexus/cmd/common"
"github.com/oasisprotocol/nexus/common"
Expand Down Expand Up @@ -604,6 +605,15 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
return metadata_registry.NewAnalyzer(cfg.Analyzers.MetadataRegistry.ItemBasedAnalyzerConfig, dbClient, logger)
})
}
if cfg.Analyzers.ValidatorBalances != nil {
analyzers, err = addAnalyzer(analyzers, err, syncTagConsensus, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
}
return validatorstakinghistory.NewAnalyzer(cfg.Analyzers.ValidatorBalances.ItemBasedAnalyzerConfig, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.NodeStats != nil {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
Expand Down
12 changes: 9 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,10 @@ type AnalyzersList struct {
PontusxTestAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_test"`
PontusxDevAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_dev"`

MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
ValidatorBalances *ValidatorBalancesConfig `koanf:"validator_balances"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
}

type HelperList struct {
Expand Down Expand Up @@ -475,6 +476,11 @@ func (cfg *MetadataRegistryConfig) Validate() error {
return nil
}

// ValidatorBalancesConfig is the configuration for the validator balances analyzer.
type ValidatorBalancesConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`
}

// NodeStatsConfig is the configuration for the node stats analyzer.
type NodeStatsConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`
Expand Down
16 changes: 0 additions & 16 deletions storage/migrations/16_validator_staking_history.up.sql

This file was deleted.

19 changes: 19 additions & 0 deletions storage/migrations/21_validator_staking_history.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
BEGIN;

CREATE TABLE chain.validator_staking_history
(
id base64_ed25519_pubkey NOT NULL,
epoch UINT63 NOT NULL,
PRIMARY KEY (id, epoch),
escrow_balance_active UINT_NUMERIC NOT NULL,
escrow_balance_debonding UINT_NUMERIC NOT NULL,
escrow_total_shares_active UINT_NUMERIC NOT NULL,
escrow_total_shares_debonding UINT_NUMERIC NOT NULL,
num_delegators UINT63
);

-- todo: figure out if we still need this to track all validators ever.
ALTER TABLE chain.epochs
ADD COLUMN validators base64_ed25519_pubkey[];

COMMIT;
2 changes: 2 additions & 0 deletions storage/oasis/nodeapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type ConsensusApiLite interface {
GetCommittees(ctx context.Context, height int64, runtimeID coreCommon.Namespace) ([]Committee, error)
GetProposal(ctx context.Context, height int64, proposalID uint64) (*Proposal, error)
GetAccount(ctx context.Context, height int64, address Address) (*Account, error)
DelegationsTo(ctx context.Context, height int64, address Address) (map[Address]*Delegation, error)
Close() error
// Exposes the underlying gRPC connection, if applicable. Implementations may return nil.
// NOTE: Intended only for debugging purposes, e.g. one-off testing of gRPC methods that
Expand Down Expand Up @@ -148,6 +149,7 @@ type (
ReclaimEscrowEvent staking.ReclaimEscrowEvent
AllowanceChangeEvent staking.AllowanceChangeEvent
Account staking.Account
Delegation staking.Delegation
)

// .................... Registry ....................
Expand Down
11 changes: 11 additions & 0 deletions storage/oasis/nodeapi/cobalt/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ func (c *ConsensusApiLite) GetAccount(ctx context.Context, height int64, address
return (*nodeapi.Account)(convertAccount(rsp)), nil
}

func (c *ConsensusApiLite) DelegationsTo(ctx context.Context, height int64, address nodeapi.Address) (map[nodeapi.Address]*nodeapi.Delegation, error) {
var rsp map[nodeapi.Address]*nodeapi.Delegation
if err := c.grpcConn.Invoke(ctx, "/oasis-core.Staking/DelegationsTo", &stakingCobalt.OwnerQuery{
Height: height,
Owner: address,
}, &rsp); err != nil {
return nil, fmt.Errorf("DelegationsTo(%d, %s): %w", height, address, err)
}
return rsp, nil
}

func (c *ConsensusApiLite) GrpcConn() connections.GrpcConn {
return c.grpcConn
}
11 changes: 11 additions & 0 deletions storage/oasis/nodeapi/damask/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,17 @@ func (c *ConsensusApiLite) GetAccount(ctx context.Context, height int64, address
return rsp, nil
}

func (c *ConsensusApiLite) DelegationsTo(ctx context.Context, height int64, address nodeapi.Address) (map[nodeapi.Address]*nodeapi.Delegation, error) {
var rsp map[nodeapi.Address]*nodeapi.Delegation
if err := c.grpcConn.Invoke(ctx, "/oasis-core.Staking/DelegationsTo", &staking.OwnerQuery{
Height: height,
Owner: address,
}, &rsp); err != nil {
return nil, fmt.Errorf("DelegationsTo(%d, %s): %w", height, address, err)
}
return rsp, nil
}

func (c *ConsensusApiLite) GrpcConn() connections.GrpcConn {
return c.grpcConn
}
11 changes: 11 additions & 0 deletions storage/oasis/nodeapi/eden/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,17 @@ func (c *ConsensusApiLite) GetAccount(ctx context.Context, height int64, address
return (*nodeapi.Account)(convertAccount(rsp)), nil
}

func (c *ConsensusApiLite) DelegationsTo(ctx context.Context, height int64, address nodeapi.Address) (map[nodeapi.Address]*nodeapi.Delegation, error) {
var rsp map[nodeapi.Address]*nodeapi.Delegation
if err := c.grpcConn.Invoke(ctx, "/oasis-core.Staking/DelegationsTo", &stakingEden.OwnerQuery{
Height: height,
Owner: address,
}, &rsp); err != nil {
return nil, fmt.Errorf("DelegationsTo(%d, %s): %w", height, address, err)
}
return rsp, nil
}

func (c *ConsensusApiLite) GrpcConn() connections.GrpcConn {
return c.grpcConn
}
10 changes: 10 additions & 0 deletions storage/oasis/nodeapi/file/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ func (c *FileConsensusApiLite) GetAccount(ctx context.Context, height int64, add
)
}

func (c *FileConsensusApiLite) DelegationsTo(ctx context.Context, height int64, address nodeapi.Address) (map[nodeapi.Address]*nodeapi.Delegation, error) {
return kvstore.GetMapFromCacheOrCall[nodeapi.Address, *nodeapi.Delegation](
c.db, height == consensus.HeightLatest,
kvstore.GenerateCacheKey("DelegationsTo", height, address),
func() (map[nodeapi.Address]*nodeapi.Delegation, error) {
return c.consensusApi.DelegationsTo(ctx, height, address)
},
)
}

func (c *FileConsensusApiLite) GrpcConn() connections.GrpcConn {
return c.consensusApi.GrpcConn()
}
8 changes: 8 additions & 0 deletions storage/oasis/nodeapi/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ func (c *HistoryConsensusApiLite) GetAccount(ctx context.Context, height int64,
return api.GetAccount(ctx, height, address)
}

func (c *HistoryConsensusApiLite) DelegationsTo(ctx context.Context, height int64, address nodeapi.Address) (map[nodeapi.Address]*nodeapi.Delegation, error) {
api, err := c.APIForHeight(height)
if err != nil {
return nil, fmt.Errorf("getting api for height %d: %w", height, err)
}
return api.DelegationsTo(ctx, height, address)
}

func (c *HistoryConsensusApiLite) GrpcConn() connections.GrpcConn {
// To access the gRPC connection, you must know the height of the block.
// Use APIForHeight(h).GrpcConn() instead.
Expand Down
1 change: 1 addition & 0 deletions tests/e2e_regression/common_test_cases.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ commonTestCases=(
'db__events select tx_block, tx_index, tx_hash, type, ARRAY(SELECT unnest(related_accounts) ORDER BY 1) AS sorted_related_accounts, body::text from chain.events order by tx_block, tx_index, type, body::text'
'db__nodes select id, entity_id, roles, expiration, voting_power from chain.nodes order by id'
'db__runtime_nodes select rn.*, n.roles FROM chain.runtime_nodes rn LEFT JOIN chain.nodes n ON (rn.node_id = n.id) ORDER BY runtime_id, node_id'
'db__validator_staking_history select * from chain.validator_staking_history order by epoch, id'
## Runtimes.
'db__account_related_txs select * from chain.runtime_related_transactions order by runtime, tx_round, tx_index, account_address'
'db__runtime_accounts select * from chain.runtime_accounts order by runtime, address'
Expand Down

0 comments on commit c89e957

Please sign in to comment.