diff --git a/ingest/io/stats_change_processor.go b/ingest/io/stats_change_processor.go index c8c3f67ae0..fd31fa0088 100644 --- a/ingest/io/stats_change_processor.go +++ b/ingest/io/stats_change_processor.go @@ -16,6 +16,10 @@ type StatsChangeProcessorResults struct { AccountsUpdated int64 AccountsRemoved int64 + ClaimableBalancesCreated int64 + ClaimableBalancesUpdated int64 + ClaimableBalancesRemoved int64 + DataCreated int64 DataUpdated int64 DataRemoved int64 @@ -40,6 +44,15 @@ func (p *StatsChangeProcessor) ProcessChange(change Change) error { case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: p.results.AccountsRemoved++ } + case xdr.LedgerEntryTypeClaimableBalance: + switch change.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + p.results.ClaimableBalancesCreated++ + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + p.results.ClaimableBalancesUpdated++ + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + p.results.ClaimableBalancesRemoved++ + } case xdr.LedgerEntryTypeData: switch change.LedgerEntryChangeType() { case xdr.LedgerEntryChangeTypeLedgerEntryCreated: @@ -82,6 +95,10 @@ func (stats *StatsChangeProcessorResults) Map() map[string]interface{} { "stats_accounts_updated": stats.AccountsUpdated, "stats_accounts_removed": stats.AccountsRemoved, + "stats_claimable_balances_created": stats.ClaimableBalancesCreated, + "stats_claimable_balances_updated": stats.ClaimableBalancesUpdated, + "stats_claimable_balances_removed": stats.ClaimableBalancesRemoved, + "stats_data_created": stats.DataCreated, "stats_data_updated": stats.DataUpdated, "stats_data_removed": stats.DataRemoved, diff --git a/ingest/io/stats_change_processor_test.go b/ingest/io/stats_change_processor_test.go index d018382431..6fb1c032ec 100644 --- a/ingest/io/stats_change_processor_test.go +++ b/ingest/io/stats_change_processor_test.go @@ -17,6 +17,12 @@ func TestStatsChangeProcessor(t *testing.T) { Post: &xdr.LedgerEntry{}, })) + assert.NoError(t, processor.ProcessChange(Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: nil, + Post: &xdr.LedgerEntry{}, + })) + assert.NoError(t, processor.ProcessChange(Change{ Type: xdr.LedgerEntryTypeData, Pre: nil, @@ -42,6 +48,12 @@ func TestStatsChangeProcessor(t *testing.T) { Post: &xdr.LedgerEntry{}, })) + assert.NoError(t, processor.ProcessChange(Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &xdr.LedgerEntry{}, + Post: &xdr.LedgerEntry{}, + })) + assert.NoError(t, processor.ProcessChange(Change{ Type: xdr.LedgerEntryTypeData, Pre: &xdr.LedgerEntry{}, @@ -67,6 +79,12 @@ func TestStatsChangeProcessor(t *testing.T) { Post: nil, })) + assert.NoError(t, processor.ProcessChange(Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &xdr.LedgerEntry{}, + Post: nil, + })) + assert.NoError(t, processor.ProcessChange(Change{ Type: xdr.LedgerEntryTypeData, Pre: &xdr.LedgerEntry{}, @@ -88,16 +106,19 @@ func TestStatsChangeProcessor(t *testing.T) { results := processor.GetResults() assert.Equal(t, int64(1), results.AccountsCreated) + assert.Equal(t, int64(1), results.ClaimableBalancesCreated) assert.Equal(t, int64(1), results.DataCreated) assert.Equal(t, int64(1), results.OffersCreated) assert.Equal(t, int64(1), results.TrustLinesCreated) assert.Equal(t, int64(1), results.AccountsUpdated) + assert.Equal(t, int64(1), results.ClaimableBalancesUpdated) assert.Equal(t, int64(1), results.DataUpdated) assert.Equal(t, int64(1), results.OffersUpdated) assert.Equal(t, int64(1), results.TrustLinesUpdated) assert.Equal(t, int64(1), results.AccountsRemoved) + assert.Equal(t, int64(1), results.ClaimableBalancesRemoved) assert.Equal(t, int64(1), results.DataRemoved) assert.Equal(t, int64(1), results.OffersRemoved) assert.Equal(t, int64(1), results.TrustLinesRemoved) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 0d14ee8efc..bf6f5697e1 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,6 +6,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x ## Unreleased * The `service` field emitted in ingestion logs has been changed from `expingest` to `ingest`. +* Ledger stats are now exported in `/metrics` in `horizon_ingest_ledger_stats_total` metric. ## v1.10.1 diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 102663a938..a491881147 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -2,8 +2,10 @@ package ingest import ( "fmt" + "strings" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/ingest/io" @@ -471,9 +473,17 @@ func (r resumeState) run(s *system) (transition, error) { duration := time.Since(startTime).Seconds() s.Metrics().LedgerIngestionDuration.Observe(float64(duration)) + + // Update stats metrics + changeStatsMap := changeStats.Map() + r.addLedgerStatsMetricFromMap(s, "change", changeStatsMap) + + ledgerTransactionStatsMap := ledgerTransactionStats.Map() + r.addLedgerStatsMetricFromMap(s, "ledger", ledgerTransactionStatsMap) + log. - WithFields(changeStats.Map()). - WithFields(ledgerTransactionStats.Map()). + WithFields(changeStatsMap). + WithFields(ledgerTransactionStatsMap). WithFields(logpkg.F{ "sequence": ingestLedger, "duration": duration, @@ -488,6 +498,14 @@ func (r resumeState) run(s *system) (transition, error) { return resumeImmediately(ingestLedger), nil } +func (r resumeState) addLedgerStatsMetricFromMap(s *system, prefix string, m map[string]interface{}) { + for stat, value := range m { + stat = strings.Replace(stat, "stats_", prefix+"_", 1) + s.Metrics().LedgerStatsCounter. + With(prometheus.Labels{"type": stat}).Add(float64(value.(int64))) + } +} + type historyRangeState struct { fromLedger uint32 toLedger uint32 diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 2c96e5931b..401d60498e 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -104,6 +104,9 @@ type Metrics struct { // StateInvalidGauge exposes state invalid metric. 1 if state is invalid, // 0 otherwise. StateInvalidGauge prometheus.GaugeFunc + + // LedgerStatsCounter exposes ledger stats counters (like number of ops/changes). + LedgerStatsCounter *prometheus.CounterVec } type System interface { @@ -250,6 +253,14 @@ func (s *system) initMetrics() { return invalidFloat }, ) + + s.metrics.LedgerStatsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "ledger_stats_total", + Help: "counters of different ledger stats", + }, + []string{"type"}, + ) } func (s *system) Metrics() Metrics { diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index fa76eb1763..c023b4a30a 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -244,6 +244,7 @@ func initIngestMetrics(app *App) { app.prometheusRegistry.MustRegister(app.ingester.Metrics().LedgerIngestionDuration) app.prometheusRegistry.MustRegister(app.ingester.Metrics().StateVerifyDuration) app.prometheusRegistry.MustRegister(app.ingester.Metrics().StateInvalidGauge) + app.prometheusRegistry.MustRegister(app.ingester.Metrics().LedgerStatsCounter) } func initTxSubMetrics(app *App) {