Skip to content

Commit

Permalink
services/horizon/internal/ingest: Fix transaction processor metrics (s…
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Feb 28, 2024
1 parent 29c49f9 commit c3f65f4
Show file tree
Hide file tree
Showing 25 changed files with 194 additions and 39 deletions.
4 changes: 4 additions & 0 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func NewAccountFilter() AccountFilter {
}
}

func (filter *accountFilter) Name() string {
return "filters.accountFilter"
}

func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
// only need to re-initialize the filter config state(rules) if its cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
Expand Down
4 changes: 4 additions & 0 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func NewAssetFilter() AssetFilter {
}
}

func (filter *assetFilter) Name() string {
return "filters.assetFilter"
}

func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
Expand Down
2 changes: 0 additions & 2 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,6 @@ func (r resumeState) addLedgerStatsMetricFromMap(s *system, prefix string, m map

func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for processorName, value := range m {
// * is not accepted in Prometheus labels
processorName = strings.Replace(processorName, "*", "", -1)
s.Metrics().ProcessorsRunDuration.
With(prometheus.Labels{"name": processorName}).Add(value.Seconds())
s.Metrics().ProcessorsRunDurationSummary.
Expand Down
22 changes: 13 additions & 9 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ingest

import (
"context"
"fmt"
"time"

"github.com/stellar/go/ingest"
Expand Down Expand Up @@ -31,13 +30,17 @@ func newGroupChangeProcessors(processors []horizonChangeProcessor) *groupChangeP
}
}

func (g groupChangeProcessors) Name() string {
return "groupChangeProcessors"
}

func (g groupChangeProcessors) ProcessChange(ctx context.Context, change ingest.Change) error {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessChange(ctx, change); err != nil {
return errors.Wrapf(err, "error in %T.ProcessChange", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand All @@ -48,7 +51,7 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error {
if err := p.Commit(ctx); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand Down Expand Up @@ -95,7 +98,7 @@ func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta,
if err := p.ProcessTransaction(lcm, tx); err != nil {
return errors.Wrapf(err, "error in %T.ProcessTransaction", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand All @@ -110,9 +113,6 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio
}
name := loader.Name()
g.loaderRunDurations.AddRunDuration(name, startTime)
if _, ok := g.loaderStats[name]; ok {
return fmt.Errorf("%s is present multiple times", name)
}
g.loaderStats[name] = loader.Stats()
}

Expand All @@ -123,7 +123,7 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio
if err := p.Flush(ctx, session); err != nil {
return errors.Wrapf(err, "error in %T.Flush", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand Down Expand Up @@ -153,14 +153,18 @@ func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilter
}
}

func (g *groupTransactionFilterers) Name() string {
return "groupTransactionFilterers"
}

func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) {
for _, f := range g.filterers {
startTime := time.Now()
include, err := f.FilterTransaction(ctx, tx)
if err != nil {
return false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
}
g.AddRunDuration(fmt.Sprintf("%T", f), startTime)
g.AddRunDuration(f.Name(), startTime)
if !include {
// filter out, we can return early
g.droppedTransactions++
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type mockHorizonChangeProcessor struct {
mock.Mock
}

func (m *mockHorizonChangeProcessor) Name() string {
return "mockHorizonChangeProcessor"
}

func (m *mockHorizonChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
args := m.Called(ctx, change)
return args.Error(0)
Expand All @@ -39,6 +43,10 @@ type mockHorizonTransactionProcessor struct {
mock.Mock
}

func (m *mockHorizonTransactionProcessor) Name() string {
return "mockHorizonTransactionProcessor"
}

func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
args := m.Called(lcm, transaction)
return args.Error(0)
Expand Down
18 changes: 0 additions & 18 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
Expand Down Expand Up @@ -529,23 +528,6 @@ func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMe
args.Error(1)
}

func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
processors.StatsLedgerTransactionProcessorResults,
runDurations,
processors.TradeStats,
runDurations,
map[string]history.LoaderStats,
error,
) {
args := m.Called(ledger)
return args.Get(0).(processors.StatsLedgerTransactionProcessorResults),
args.Get(1).(runDurations),
args.Get(2).(processors.TradeStats),
args.Get(3).(runDurations),
args.Get(4).(map[string]history.LoaderStats),
args.Error(3)
}

func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error {
args := m.Called(ledgers)
return args.Error(0)
Expand Down
101 changes: 91 additions & 10 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ const (

type horizonChangeProcessor interface {
processors.ChangeProcessor
Name() string
Commit(context.Context) error
}

type horizonTransactionProcessor interface {
Name() string
processors.LedgerTransactionProcessor
}

Expand All @@ -45,6 +47,10 @@ type statsChangeProcessor struct {
*ingest.StatsChangeProcessor
}

func (statsChangeProcessor) Name() string {
return "ingest.statsChangeProcessor"
}

func (statsChangeProcessor) Commit(ctx context.Context) error {
return nil
}
Expand All @@ -70,14 +76,6 @@ type ProcessorRunnerInterface interface {
ledgerProtocolVersion uint32,
bucketListHash xdr.Hash,
) (ingest.StatsChangeProcessorResults, error)
RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
transactionStats processors.StatsLedgerTransactionProcessorResults,
transactionDurations runDurations,
tradeStats processors.TradeStats,
loaderDurations runDurations,
loaderStats map[string]history.LoaderStats,
err error,
)
RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error
RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
stats ledgerStats,
Expand Down Expand Up @@ -243,6 +241,13 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
s.config.NetworkPassphrase,
)

if err := registerChangeProcessors(
nameRegistry{},
changeProcessor,
); err != nil {
return ingest.StatsChangeProcessorResults{}, err
}

if checkpointLedger == 1 {
if err := changeProcessor.ProcessChange(s.ctx, ingest.GenesisChange(s.config.NetworkPassphrase)); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error ingesting genesis ledger")
Expand Down Expand Up @@ -364,7 +369,7 @@ func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta,
return nil
}

func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta) (
transactionStats processors.StatsLedgerTransactionProcessorResults,
transactionDurations runDurations,
tradeStats processors.TradeStats,
Expand All @@ -380,6 +385,15 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
groupFilteredOutProcessors := s.buildFilteredOutProcessor()
groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor)

if err = registerTransactionProcessors(
registry,
groupTransactionFilterers,
groupFilteredOutProcessors,
groupTransactionProcessors,
); err != nil {
return
}

err = s.streamLedger(ledger,
groupTransactionFilterers,
groupFilteredOutProcessors,
Expand Down Expand Up @@ -413,6 +427,64 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
return
}

// nameRegistry ensures all ingestion components have a unique name
// for metrics reporting
type nameRegistry map[string]struct{}

func (n nameRegistry) add(name string) error {
if _, ok := n[name]; ok {
return fmt.Errorf("%s is duplicated", name)
}
n[name] = struct{}{}
return nil
}

func registerChangeProcessors(
registry nameRegistry,
group *groupChangeProcessors,
) error {
for _, p := range group.processors {
if err := registry.add(p.Name()); err != nil {
return err
}
}
return nil
}

func registerTransactionProcessors(
registry nameRegistry,
groupTransactionFilterers *groupTransactionFilterers,
groupFilteredOutProcessors *groupTransactionProcessors,
groupTransactionProcessors *groupTransactionProcessors,
) error {
for _, f := range groupTransactionFilterers.filterers {
if err := registry.add(f.Name()); err != nil {
return err
}
}
for _, p := range groupTransactionProcessors.processors {
if err := registry.add(p.Name()); err != nil {
return err
}
}
for _, l := range groupTransactionProcessors.lazyLoaders {
if err := registry.add(l.Name()); err != nil {
return err
}
}
for _, p := range groupFilteredOutProcessors.processors {
if err := registry.add(p.Name()); err != nil {
return err
}
}
for _, l := range groupFilteredOutProcessors.lazyLoaders {
if err := registry.add(l.Name()); err != nil {
return err
}
}
return nil
}

func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) (err error) {
ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion)

Expand Down Expand Up @@ -504,12 +576,21 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
)

registry := nameRegistry{}
if err = registerChangeProcessors(
registry,
groupChangeProcessors,
); err != nil {
return
}

err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
return
}

transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.RunTransactionProcessorsOnLedger(ledger)
transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger)

stats.changeStats = changeStatsProcessor.GetResults()
stats.changeDurations = groupChangeProcessors.processorsRunDurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (p *AccountDataProcessor) reset() {
p.batchInsertBuilder = p.dataQ.NewAccountDataBatchInsertBuilder()
}

func (p *AccountDataProcessor) Name() string {
return "processors.AccountDataProcessor"
}

func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
// We're interested in data only
if change.Type != xdr.LedgerEntryTypeData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/guregu/null/zero"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
Expand All @@ -28,6 +29,10 @@ func (p *AccountsProcessor) reset() {
p.batchInsertBuilder = p.accountsQ.NewAccountsBatchInsertBuilder()
}

func (p *AccountsProcessor) Name() string {
return "processors.AccountsProcessor"
}

func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeAccount {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewAssetStatsProcessor(
return p
}

func (p *AssetStatsProcessor) Name() string {
return "processors.AssetStatsProcessor"
}

func (p *AssetStatsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeLiquidityPool &&
change.Type != xdr.LedgerEntryTypeClaimableBalance &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances) *Claimabl
return p
}

func (p *ClaimableBalancesChangeProcessor) Name() string {
return "processors.ClaimableBalancesChangeProcessor"
}

func (p *ClaimableBalancesChangeProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.claimantsInsertBuilder = p.qClaimableBalances.NewClaimableBalanceClaimantBatchInsertBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func NewClaimableBalancesTransactionProcessor(
}
}

func (p *ClaimableBalancesTransactionProcessor) Name() string {
return "processors.ClaimableBalancesTransactionProcessor"
}

func (p *ClaimableBalancesTransactionProcessor) ProcessTransaction(
lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction,
) error {
Expand Down
Loading

0 comments on commit c3f65f4

Please sign in to comment.