Skip to content

Commit

Permalink
Merge pull request #703 from oasisprotocol/andrew7234/validator-staki…
Browse files Browse the repository at this point in the history
…ng-history

Andrew7234/validator staking history
  • Loading branch information
Andrew7234 authored Jul 29, 2024
2 parents aeaea33 + 8edbb44 commit 50d4365
Show file tree
Hide file tree
Showing 39 changed files with 912 additions and 36 deletions.
1 change: 1 addition & 0 deletions .changelog/703.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
consensus: track validator escrow history
2 changes: 1 addition & 1 deletion analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const (
// Timeout to process a single batch.
processBatchTimeout = 61 * time.Second
processBatchTimeout = 6001 * time.Second
// Default number of items processed in a batch.
defaultBatchSize = 20
)
Expand Down
31 changes: 31 additions & 0 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,37 @@ var (
vote = excluded.vote,
height = excluded.height;`

ValidatorStakingHistoryUnprocessedEpochs = `
SELECT epochs.id, epochs.start_height, prev_epoch.validators
FROM chain.epochs as epochs
LEFT JOIN history.validators as history
ON epochs.id = history.epoch
LEFT JOIN chain.epochs as prev_epoch
ON epochs.id = prev_epoch.id + 1
WHERE
history.epoch IS NULL AND
epochs.id >= $1
ORDER BY epochs.id
LIMIT $2`

ValidatorBalanceInsert = `
INSERT INTO history.validators (id, epoch, escrow_balance_active, escrow_balance_debonding, escrow_total_shares_active, escrow_total_shares_debonding, num_delegators)
VALUES ($1, $2, $3, $4, $5, $6, $7)`

EpochValidatorsUpdate = `
UPDATE chain.epochs
SET validators = $2
WHERE id = $1`

ValidatorStakingHistoryUnprocessedCount = `
SELECT COUNT(*)
FROM chain.epochs AS epochs
LEFT JOIN history.validators AS history
ON epochs.id = history.epoch
WHERE
history.epoch IS NULL AND
epochs.id >= $1`

RuntimeBlockInsert = `
INSERT INTO chain.runtime_blocks (runtime, round, version, timestamp, block_hash, prev_block_hash, io_root, state_root, messages_hash, in_messages_hash, num_transactions, gas_used, size)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`
Expand Down
194 changes: 194 additions & 0 deletions analyzer/validatorstakinghistory/validatorstakinghistory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package validatorstakinghistory

import (
"context"
"fmt"
"math"

"github.com/jackc/pgx/v5"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/item"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/config"
staking "github.com/oasisprotocol/nexus/coreapi/v22.2.11/staking/api"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
)

// The validator staking history analyzer (1) gets the next epoch / validators to download validator
// history for, (2) downloads that info, and (3) saves the info in the database.
//
// We process epochs sequentially in chronological order in order to track validator
// history for all past and current validators. More concretly, for a given epoch, we find all current
// validators and take the union with the set of all tracked validators from the previous epoch.
// Notably, we do not track validator history for an entity before it first becomes an active validator.
//
// WARNING: This analyzer SHOULD NOT run while block analyzers are in fast sync. This analyzer expects that
// when a row appears in `chain.epochs`, its first block is already processed. In fast sync, blocks are processed
// out of order. (As of 2024-06, fast-sync actually does not update chain.epochs directly so it might not cause bugs
// when running in parallel with this analyzer, but that's by chance rather than by design.)

const (
validatorHistoryAnalyzerName = "validator_history"
)

type processor struct {
source nodeapi.ConsensusApiLite
target storage.TargetStorage
startEpoch uint64
logger *log.Logger
}

var _ item.ItemProcessor[*Epoch] = (*processor)(nil)

func NewAnalyzer(
initCtx context.Context,
cfg config.ItemBasedAnalyzerConfig,
startHeight uint64,
sourceClient nodeapi.ConsensusApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
logger = logger.With("analyzer", validatorHistoryAnalyzerName)

// Find the epoch corresponding to startHeight.
if startHeight > math.MaxInt64 {
return nil, fmt.Errorf("startHeight %d is too large", startHeight)
}
epoch, err := sourceClient.GetEpoch(initCtx, int64(startHeight))
if err != nil {
return nil, fmt.Errorf("failed to fetch epoch for startHeight %d", startHeight)
}
p := &processor{
source: sourceClient,
target: target,
startEpoch: uint64(epoch),
logger: logger,
}

return item.NewAnalyzer[*Epoch](
validatorHistoryAnalyzerName,
cfg,
p,
target,
logger,
)
}

type Epoch struct {
epoch uint64
startHeight int64
prevValidators []signature.PublicKey // Validator set from the previous epoch.
}

// Note: limit is ignored here because epochs must be processed sequentially in chronological order.
func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*Epoch, error) {
var epoch Epoch
vAddrs := []string{}
err := p.target.QueryRow(
ctx,
queries.ValidatorStakingHistoryUnprocessedEpochs,
p.startEpoch,
1, // overwrite limit to 1
).Scan(
&epoch.epoch,
&epoch.startHeight,
&vAddrs,
)
switch err {
case nil:
// continue
case pgx.ErrNoRows:
return []*Epoch{}, nil
default:
return nil, fmt.Errorf("querying epochs for validator history: %w", err)
}

// Convert vAddrs to signature.PublicKey
validators := []signature.PublicKey{}
for _, v := range vAddrs {
a := signature.PublicKey{}
if err := a.UnmarshalText([]byte(v)); err != nil {
return nil, fmt.Errorf("invalid validator pubkey '%s': %w", v, err)
}
validators = append(validators, a)
}
epoch.prevValidators = validators

return []*Epoch{&epoch}, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, epoch *Epoch) error {
p.logger.Info("downloading validator balances", "epoch", epoch.epoch, "height", epoch.startHeight, "prev_validators", epoch.prevValidators)
currValidators, err := p.source.GetValidators(ctx, epoch.startHeight)
if err != nil {
return fmt.Errorf("downloading validators for height %d", epoch.startHeight)
}
nodes, err := p.source.GetNodes(ctx, epoch.startHeight)
if err != nil {
return fmt.Errorf("downloading nodes for height %d", epoch.startHeight)
}
nodeToEntity := make(map[signature.PublicKey]signature.PublicKey)
for _, n := range nodes {
nodeToEntity[n.ID] = n.EntityID
}
// Find the union of past + current validator sets.
validators := make(map[signature.PublicKey]struct{})
for _, v := range epoch.prevValidators {
validators[v] = struct{}{}
}
for _, v := range currValidators {
// The ID returned by validator objects is the node ID, but we track validators by entity ID.
vID := nodeToEntity[v.ID]
if _, ok := validators[vID]; !ok {
p.logger.Info("found new validator", "addr", staking.NewAddress(vID), "id", vID)
validators[vID] = struct{}{}
}
}
// Download info for each validator.
validatorIDs := []string{}
for vID := range validators {
addr := staking.NewAddress(vID)
p.logger.Info("starting to process validator", "addr", addr, "height", epoch.startHeight)
validatorIDs = append(validatorIDs, vID.String())
acct, err1 := p.source.GetAccount(ctx, epoch.startHeight, addr)
if err1 != nil {
return fmt.Errorf("fetching account info for %s at height %d", addr.String(), epoch.startHeight)
}
delegations, err1 := p.source.DelegationsTo(ctx, epoch.startHeight, addr)
if err1 != nil {
return fmt.Errorf("fetching delegations to account %s at height %d", addr.String(), epoch.startHeight)
}
batch.Queue(queries.ValidatorBalanceInsert,
vID.String(),
epoch.epoch,
acct.Escrow.Active.Balance,
acct.Escrow.Debonding.Balance,
acct.Escrow.Active.TotalShares,
acct.Escrow.Debonding.TotalShares,
len(delegations),
)
p.logger.Info("processed validator", "addr", addr, "height", epoch.startHeight)
}

// Update chain.epochs with the set of past and current validators at this epoch.
batch.Queue(queries.EpochValidatorsUpdate,
epoch.epoch,
validatorIDs,
)

p.logger.Info("finished processing epoch", "epoch", epoch.epoch)

return nil
}

func (p *processor) QueueLength(ctx context.Context) (int, error) {
var queueLength int
if err := p.target.QueryRow(ctx, queries.ValidatorStakingHistoryUnprocessedCount, p.startEpoch).Scan(&queueLength); err != nil {
return 0, fmt.Errorf("querying number of unprocessed epochs: %w", err)
}
return queueLength, nil
}
14 changes: 14 additions & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
nodestats "github.com/oasisprotocol/nexus/analyzer/node_stats"
"github.com/oasisprotocol/nexus/analyzer/runtime"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/analyzer/validatorstakinghistory"
"github.com/oasisprotocol/nexus/cache/httpproxy"
cmdCommon "github.com/oasisprotocol/nexus/cmd/common"
"github.com/oasisprotocol/nexus/common"
Expand Down Expand Up @@ -614,6 +615,19 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
return metadata_registry.NewAnalyzer(cfg.Analyzers.MetadataRegistry.ItemBasedAnalyzerConfig, dbClient, logger)
})
}
if cfg.Analyzers.ValidatorStakingHistory != nil {
analyzers, err = addAnalyzer(analyzers, err, syncTagConsensus, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
}
startHeight := cfg.Analyzers.ValidatorStakingHistory.StartHeight
if startHeight == 0 && cfg.Analyzers.Consensus.From != 0 {
startHeight = cfg.Analyzers.Consensus.From
}
return validatorstakinghistory.NewAnalyzer(ctx, cfg.Analyzers.ValidatorStakingHistory.ItemBasedAnalyzerConfig, startHeight, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.NodeStats != nil {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
Expand Down
24 changes: 21 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ type AnalyzersList struct {
PontusxTestAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_test"`
PontusxDevAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_dev"`

MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
ValidatorStakingHistory *ValidatorStakingHistoryConfig `koanf:"validator_staking_history"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
}

type HelperList struct {
Expand Down Expand Up @@ -477,6 +478,23 @@ func (cfg *MetadataRegistryConfig) Validate() error {
return nil
}

// ValidatorStakingHistoryConfig is the configuration for the validator balances analyzer.
type ValidatorStakingHistoryConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`

// StartHeight is the height at which the analyzer should start constructing validator
// history from. Note: We use a uint64 to match the consensus block config types, but
// the analyzer will refuse to start if StartHeight > math.MaxInt64.
StartHeight uint64 `koanf:"start_height"`
}

func (cfg *ValidatorStakingHistoryConfig) Validate() error {
if cfg.StartHeight == 0 {
return fmt.Errorf("validator staking startHeight must be set, preferably to the consensus analyzer start height")
}
return nil
}

// NodeStatsConfig is the configuration for the node stats analyzer.
type NodeStatsConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`
Expand Down
16 changes: 9 additions & 7 deletions config/local-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ analysis:
nodes: &src
eden:
default:
rpc: unix:/tmp/node.sock #unix:/node/data/internal.sock
rpc: grpc.prd.oasis.io:1443 #unix:/tmp/node.sock #unix:/node/data/internal.sock
sapphire: { rpc: grpc.opf.oasis.io:443 }
ipfs:
gateway: https://ipfs.io
analyzers:
metadata_registry:
interval: 1h
# metadata_registry:
# interval: 1h
node_stats: {}
aggregate_stats: {}
consensus:
from: 16_817_956 # Eden genesis
emerald:
from: 7_875_129 # round at Eden genesis
sapphire:
from: 1_357_486 # round at Eden genesis
# emerald:
# from: 7_875_129 # round at Eden genesis
# sapphire:
# from: 1_357_486 # round at Eden genesis
storage:
backend: postgres
endpoint: postgresql://rwuser:password@localhost:5432/indexer?sslmode=disable
DANGER__WIPE_STORAGE_ON_STARTUP: true
migrations: file://storage/migrations

server:
Expand Down
3 changes: 3 additions & 0 deletions storage/migrations/00_consensus.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ CREATE TABLE chain.epochs
-- Max known height that belongs to the epoch.
end_height UINT63 NOT NULL CHECK (end_height >= start_height),
UNIQUE (start_height, end_height)

-- Added in 25_validator_staking_history.up.sql
-- validators base64_ed25519_pubkey[]
);
CREATE INDEX ix_epochs_id ON chain.epochs (id);

Expand Down
24 changes: 24 additions & 0 deletions storage/migrations/28_validator_staking_history.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
BEGIN;

CREATE SCHEMA IF NOT EXISTS history;
GRANT USAGE ON SCHEMA history TO PUBLIC;

CREATE TABLE history.validators
(
id base64_ed25519_pubkey NOT NULL,
epoch UINT63 NOT NULL,
PRIMARY KEY (id, epoch),
escrow_balance_active UINT_NUMERIC NOT NULL,
escrow_balance_debonding UINT_NUMERIC NOT NULL,
escrow_total_shares_active UINT_NUMERIC NOT NULL,
escrow_total_shares_debonding UINT_NUMERIC NOT NULL,
num_delegators UINT63
);

ALTER TABLE chain.epochs
ADD COLUMN validators base64_ed25519_pubkey[];

-- Grant others read-only use. This does NOT apply to future tables in the schema.
GRANT SELECT ON ALL TABLES IN SCHEMA history TO PUBLIC;

COMMIT;
4 changes: 4 additions & 0 deletions storage/oasis/nodeapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type ConsensusApiLite interface {
GetNodes(ctx context.Context, height int64) ([]Node, error)
GetCommittees(ctx context.Context, height int64, runtimeID coreCommon.Namespace) ([]Committee, error)
GetProposal(ctx context.Context, height int64, proposalID uint64) (*Proposal, error)
GetAccount(ctx context.Context, height int64, address Address) (*Account, error)
DelegationsTo(ctx context.Context, height int64, address Address) (map[Address]*Delegation, error)
Close() error
// Exposes the underlying gRPC connection, if applicable. Implementations may return nil.
// NOTE: Intended only for debugging purposes, e.g. one-off testing of gRPC methods that
Expand Down Expand Up @@ -163,6 +165,8 @@ type (
DebondingStartEscrowEvent staking.DebondingStartEscrowEvent
ReclaimEscrowEvent staking.ReclaimEscrowEvent
AllowanceChangeEvent staking.AllowanceChangeEvent
Account staking.Account
Delegation staking.Delegation
)

// .................... Registry ....................
Expand Down
Loading

0 comments on commit 50d4365

Please sign in to comment.