Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Andrew7234/validator staking history #703

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/703.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
consensus: track validator escrow history
2 changes: 1 addition & 1 deletion analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const (
// Timeout to process a single batch.
processBatchTimeout = 61 * time.Second
processBatchTimeout = 6001 * time.Second
// Default number of items processed in a batch.
defaultBatchSize = 20
)
Expand Down
31 changes: 31 additions & 0 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,37 @@ var (
vote = excluded.vote,
height = excluded.height;`

ValidatorStakingHistoryUnprocessedEpochs = `
SELECT epochs.id, epochs.start_height, prev_epoch.validators
FROM chain.epochs as epochs
LEFT JOIN history.validators as history
ON epochs.id = history.epoch
LEFT JOIN chain.epochs as prev_epoch
ON epochs.id = prev_epoch.id + 1
WHERE
history.epoch IS NULL AND
epochs.id >= $1
ORDER BY epochs.id
LIMIT $2`

ValidatorBalanceInsert = `
INSERT INTO history.validators (id, epoch, escrow_balance_active, escrow_balance_debonding, escrow_total_shares_active, escrow_total_shares_debonding, num_delegators)
VALUES ($1, $2, $3, $4, $5, $6, $7)`

EpochValidatorsUpdate = `
UPDATE chain.epochs
SET validators = $2
WHERE id = $1`

ValidatorStakingHistoryUnprocessedCount = `
SELECT COUNT(*)
FROM chain.epochs AS epochs
LEFT JOIN history.validators AS history
ON epochs.id = history.epoch
WHERE
history.epoch IS NULL AND
epochs.id >= $1`

RuntimeBlockInsert = `
INSERT INTO chain.runtime_blocks (runtime, round, version, timestamp, block_hash, prev_block_hash, io_root, state_root, messages_hash, in_messages_hash, num_transactions, gas_used, size)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`
Expand Down
194 changes: 194 additions & 0 deletions analyzer/validatorstakinghistory/validatorstakinghistory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package validatorstakinghistory

import (
"context"
"fmt"
"math"

"github.com/jackc/pgx/v5"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/item"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/config"
staking "github.com/oasisprotocol/nexus/coreapi/v22.2.11/staking/api"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
)

// The validator staking history analyzer (1) gets the next epoch / validators to download validator
// history for, (2) downloads that info, and (3) saves the info in the database.
//
// We process epochs sequentially in chronological order in order to track validator
// history for all past and current validators. More concretly, for a given epoch, we find all current
// validators and take the union with the set of all tracked validators from the previous epoch.
// Notably, we do not track validator history for an entity before it first becomes an active validator.
//
// WARNING: This analyzer SHOULD NOT run while block analyzers are in fast sync. This analyzer expects that
// when a row appears in `chain.epochs`, its first block is already processed. In fast sync, blocks are processed
// out of order. (As of 2024-06, fast-sync actually does not update chain.epochs directly so it might not cause bugs
// when running in parallel with this analyzer, but that's by chance rather than by design.)

const (
validatorHistoryAnalyzerName = "validator_history"
)

type processor struct {
source nodeapi.ConsensusApiLite
target storage.TargetStorage
startEpoch uint64
logger *log.Logger
}

var _ item.ItemProcessor[*Epoch] = (*processor)(nil)

func NewAnalyzer(
initCtx context.Context,
cfg config.ItemBasedAnalyzerConfig,
startHeight uint64,
sourceClient nodeapi.ConsensusApiLite,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
logger = logger.With("analyzer", validatorHistoryAnalyzerName)

// Find the epoch corresponding to startHeight.
if startHeight > math.MaxInt64 {
return nil, fmt.Errorf("startHeight %d is too large", startHeight)
}
epoch, err := sourceClient.GetEpoch(initCtx, int64(startHeight))
if err != nil {
return nil, fmt.Errorf("failed to fetch epoch for startHeight %d", startHeight)
}
p := &processor{
source: sourceClient,
target: target,
startEpoch: uint64(epoch),
logger: logger,
}

return item.NewAnalyzer[*Epoch](
validatorHistoryAnalyzerName,
cfg,
p,
target,
logger,
)
}

type Epoch struct {
epoch uint64
startHeight int64
prevValidators []signature.PublicKey // Validator set from the previous epoch.
}

// Note: limit is ignored here because epochs must be processed sequentially in chronological order.
func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*Epoch, error) {
var epoch Epoch
vAddrs := []string{}
err := p.target.QueryRow(
ctx,
queries.ValidatorStakingHistoryUnprocessedEpochs,
p.startEpoch,
1, // overwrite limit to 1
).Scan(
&epoch.epoch,
&epoch.startHeight,
&vAddrs,
)
switch err {
case nil:
// continue
case pgx.ErrNoRows:
return []*Epoch{}, nil
default:
return nil, fmt.Errorf("querying epochs for validator history: %w", err)
}

// Convert vAddrs to signature.PublicKey
validators := []signature.PublicKey{}
for _, v := range vAddrs {
a := signature.PublicKey{}
if err := a.UnmarshalText([]byte(v)); err != nil {
return nil, fmt.Errorf("invalid validator pubkey '%s': %w", v, err)
}
validators = append(validators, a)
}
epoch.prevValidators = validators

return []*Epoch{&epoch}, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, epoch *Epoch) error {
p.logger.Info("downloading validator balances", "epoch", epoch.epoch, "height", epoch.startHeight, "prev_validators", epoch.prevValidators)
currValidators, err := p.source.GetValidators(ctx, epoch.startHeight)
if err != nil {
return fmt.Errorf("downloading validators for height %d", epoch.startHeight)
}
nodes, err := p.source.GetNodes(ctx, epoch.startHeight)
if err != nil {
return fmt.Errorf("downloading nodes for height %d", epoch.startHeight)
}
nodeToEntity := make(map[signature.PublicKey]signature.PublicKey)
for _, n := range nodes {
nodeToEntity[n.ID] = n.EntityID
}
// Find the union of past + current validator sets.
validators := make(map[signature.PublicKey]struct{})
for _, v := range epoch.prevValidators {
validators[v] = struct{}{}
}
for _, v := range currValidators {
// The ID returned by validator objects is the node ID, but we track validators by entity ID.
vID := nodeToEntity[v.ID]
if _, ok := validators[vID]; !ok {
p.logger.Info("found new validator", "addr", staking.NewAddress(vID), "id", vID)
validators[vID] = struct{}{}
}
}
// Download info for each validator.
validatorIDs := []string{}
for vID := range validators {
addr := staking.NewAddress(vID)
p.logger.Info("starting to process validator", "addr", addr, "height", epoch.startHeight)
validatorIDs = append(validatorIDs, vID.String())
acct, err1 := p.source.GetAccount(ctx, epoch.startHeight, addr)
if err1 != nil {
return fmt.Errorf("fetching account info for %s at height %d", addr.String(), epoch.startHeight)
}
delegations, err1 := p.source.DelegationsTo(ctx, epoch.startHeight, addr)
if err1 != nil {
return fmt.Errorf("fetching delegations to account %s at height %d", addr.String(), epoch.startHeight)
}
batch.Queue(queries.ValidatorBalanceInsert,
vID.String(),
epoch.epoch,
acct.Escrow.Active.Balance,
acct.Escrow.Debonding.Balance,
acct.Escrow.Active.TotalShares,
acct.Escrow.Debonding.TotalShares,
len(delegations),
)
p.logger.Info("processed validator", "addr", addr, "height", epoch.startHeight)
}

// Update chain.epochs with the set of past and current validators at this epoch.
batch.Queue(queries.EpochValidatorsUpdate,
epoch.epoch,
validatorIDs,
)

p.logger.Info("finished processing epoch", "epoch", epoch.epoch)

return nil
}

func (p *processor) QueueLength(ctx context.Context) (int, error) {
var queueLength int
if err := p.target.QueryRow(ctx, queries.ValidatorStakingHistoryUnprocessedCount, p.startEpoch).Scan(&queueLength); err != nil {
return 0, fmt.Errorf("querying number of unprocessed epochs: %w", err)
}
return queueLength, nil
}
14 changes: 14 additions & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
nodestats "github.com/oasisprotocol/nexus/analyzer/node_stats"
"github.com/oasisprotocol/nexus/analyzer/runtime"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/analyzer/validatorstakinghistory"
"github.com/oasisprotocol/nexus/cache/httpproxy"
cmdCommon "github.com/oasisprotocol/nexus/cmd/common"
"github.com/oasisprotocol/nexus/common"
Expand Down Expand Up @@ -614,6 +615,19 @@ func NewService(cfg *config.AnalysisConfig) (*Service, error) { //nolint:gocyclo
return metadata_registry.NewAnalyzer(cfg.Analyzers.MetadataRegistry.ItemBasedAnalyzerConfig, dbClient, logger)
})
}
if cfg.Analyzers.ValidatorStakingHistory != nil {
analyzers, err = addAnalyzer(analyzers, err, syncTagConsensus, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
if err1 != nil {
return nil, err1
}
startHeight := cfg.Analyzers.ValidatorStakingHistory.StartHeight
if startHeight == 0 && cfg.Analyzers.Consensus.From != 0 {
startHeight = cfg.Analyzers.Consensus.From
}
return validatorstakinghistory.NewAnalyzer(ctx, cfg.Analyzers.ValidatorStakingHistory.ItemBasedAnalyzerConfig, startHeight, sourceClient, dbClient, logger)
})
}
if cfg.Analyzers.NodeStats != nil {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
sourceClient, err1 := sources.Consensus(ctx)
Expand Down
24 changes: 21 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ type AnalyzersList struct {
PontusxTestAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_test"`
PontusxDevAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_dev"`

MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
ValidatorStakingHistory *ValidatorStakingHistoryConfig `koanf:"validator_staking_history"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
AggregateStats *AggregateStatsConfig `koanf:"aggregate_stats"`
}

type HelperList struct {
Expand Down Expand Up @@ -477,6 +478,23 @@ func (cfg *MetadataRegistryConfig) Validate() error {
return nil
}

// ValidatorStakingHistoryConfig is the configuration for the validator balances analyzer.
type ValidatorStakingHistoryConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`

// StartHeight is the height at which the analyzer should start constructing validator
// history from. Note: We use a uint64 to match the consensus block config types, but
// the analyzer will refuse to start if StartHeight > math.MaxInt64.
StartHeight uint64 `koanf:"start_height"`
}

func (cfg *ValidatorStakingHistoryConfig) Validate() error {
if cfg.StartHeight == 0 {
return fmt.Errorf("validator staking startHeight must be set, preferably to the consensus analyzer start height")
}
return nil
}

// NodeStatsConfig is the configuration for the node stats analyzer.
type NodeStatsConfig struct {
ItemBasedAnalyzerConfig `koanf:",squash"`
Expand Down
16 changes: 9 additions & 7 deletions config/local-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ analysis:
nodes: &src
eden:
default:
rpc: unix:/tmp/node.sock #unix:/node/data/internal.sock
rpc: grpc.prd.oasis.io:1443 #unix:/tmp/node.sock #unix:/node/data/internal.sock
sapphire: { rpc: grpc.opf.oasis.io:443 }
ipfs:
gateway: https://ipfs.io
analyzers:
metadata_registry:
interval: 1h
# metadata_registry:
# interval: 1h
node_stats: {}
aggregate_stats: {}
consensus:
from: 16_817_956 # Eden genesis
emerald:
from: 7_875_129 # round at Eden genesis
sapphire:
from: 1_357_486 # round at Eden genesis
# emerald:
# from: 7_875_129 # round at Eden genesis
# sapphire:
# from: 1_357_486 # round at Eden genesis
storage:
backend: postgres
endpoint: postgresql://rwuser:password@localhost:5432/indexer?sslmode=disable
DANGER__WIPE_STORAGE_ON_STARTUP: true
migrations: file://storage/migrations

server:
Expand Down
3 changes: 3 additions & 0 deletions storage/migrations/00_consensus.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ CREATE TABLE chain.epochs
-- Max known height that belongs to the epoch.
end_height UINT63 NOT NULL CHECK (end_height >= start_height),
UNIQUE (start_height, end_height)

-- Added in 25_validator_staking_history.up.sql
-- validators base64_ed25519_pubkey[]
);
CREATE INDEX ix_epochs_id ON chain.epochs (id);

Expand Down
24 changes: 24 additions & 0 deletions storage/migrations/28_validator_staking_history.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
BEGIN;

CREATE SCHEMA IF NOT EXISTS history;
GRANT USAGE ON SCHEMA history TO PUBLIC;

CREATE TABLE history.validators
(
id base64_ed25519_pubkey NOT NULL,
epoch UINT63 NOT NULL,
PRIMARY KEY (id, epoch),
escrow_balance_active UINT_NUMERIC NOT NULL,
escrow_balance_debonding UINT_NUMERIC NOT NULL,
escrow_total_shares_active UINT_NUMERIC NOT NULL,
escrow_total_shares_debonding UINT_NUMERIC NOT NULL,
num_delegators UINT63
);

ALTER TABLE chain.epochs
ADD COLUMN validators base64_ed25519_pubkey[];

-- Grant others read-only use. This does NOT apply to future tables in the schema.
GRANT SELECT ON ALL TABLES IN SCHEMA history TO PUBLIC;

COMMIT;
4 changes: 4 additions & 0 deletions storage/oasis/nodeapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type ConsensusApiLite interface {
GetNodes(ctx context.Context, height int64) ([]Node, error)
GetCommittees(ctx context.Context, height int64, runtimeID coreCommon.Namespace) ([]Committee, error)
GetProposal(ctx context.Context, height int64, proposalID uint64) (*Proposal, error)
GetAccount(ctx context.Context, height int64, address Address) (*Account, error)
DelegationsTo(ctx context.Context, height int64, address Address) (map[Address]*Delegation, error)
Close() error
// Exposes the underlying gRPC connection, if applicable. Implementations may return nil.
// NOTE: Intended only for debugging purposes, e.g. one-off testing of gRPC methods that
Expand Down Expand Up @@ -163,6 +165,8 @@ type (
DebondingStartEscrowEvent staking.DebondingStartEscrowEvent
ReclaimEscrowEvent staking.ReclaimEscrowEvent
AllowanceChangeEvent staking.AllowanceChangeEvent
Account staking.Account
Andrew7234 marked this conversation as resolved.
Show resolved Hide resolved
Delegation staking.Delegation
)

// .................... Registry ....................
Expand Down
Loading
Loading