diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 60fb5438a1..46ece7c788 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -125,6 +125,10 @@ type Metrics struct { // 0 otherwise. StateInvalidGauge prometheus.GaugeFunc + // StateVerifyLedgerEntriesCount exposes total number of ledger entries + // checked by the state verifier by type. + StateVerifyLedgerEntriesCount *prometheus.SummaryVec + // LedgerStatsCounter exposes ledger stats counters (like number of ops/changes). LedgerStatsCounter *prometheus.CounterVec @@ -315,6 +319,16 @@ func (s *system) initMetrics() { }, ) + s.metrics.StateVerifyLedgerEntriesCount = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_ledger_entries_count", + Help: "number of ledger entries downloaded from buckets in a single state verifier run", + // Quantile ranks are not relevant here so pass empty map. + Objectives: map[float64]float64{}, + }, + []string{"type"}, + ) + s.metrics.LedgerStatsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "ledger_stats_total", diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index 43ef2285e1..ba91349df6 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -8,6 +8,7 @@ import ( "time" "github.com/guregu/null" + "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2" @@ -124,6 +125,8 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { } } + totalByType := map[string]int64{} + startTime := time.Now() defer func() { duration := time.Since(startTime).Seconds() @@ -131,6 +134,10 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { // Don't update metrics if context cancelled. if s.ctx.Err() != context.Canceled { s.Metrics().StateVerifyDuration.Observe(float64(duration)) + for typ, tot := range totalByType { + s.Metrics().StateVerifyLedgerEntriesCount. + With(prometheus.Labels{"type": typ}).Observe(float64(tot)) + } } } log.WithField("duration", duration).Info("State verification finished") @@ -150,7 +157,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { } assetStats := processors.AssetStatSet{} - total := 0 + total := int64(0) for { var keys []xdr.LedgerKey keys, err = verifier.GetLedgerKeys(verifyBatchSize) @@ -172,16 +179,22 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { switch key.Type { case xdr.LedgerEntryTypeAccount: accounts = append(accounts, key.Account.AccountId.Address()) + totalByType["accounts"]++ case xdr.LedgerEntryTypeData: data = append(data, *key.Data) + totalByType["data"]++ case xdr.LedgerEntryTypeOffer: offers = append(offers, int64(key.Offer.OfferId)) + totalByType["offers"]++ case xdr.LedgerEntryTypeTrustline: trustLines = append(trustLines, *key.TrustLine) + totalByType["trust_lines"]++ case xdr.LedgerEntryTypeClaimableBalance: cBalances = append(cBalances, key.ClaimableBalance.BalanceId) + totalByType["claimable_balances"]++ case xdr.LedgerEntryTypeLiquidityPool: lPools = append(lPools, key.LiquidityPool.LiquidityPoolId) + totalByType["liquidity_pools"]++ default: return errors.New("GetLedgerKeys return unexpected type") } @@ -217,7 +230,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { return errors.Wrap(err, "addLiquidityPoolsToStateVerifier failed") } - total += len(keys) + total += int64(len(keys)) localLog.WithField("total", total).Info("Batch added to StateVerifier") }