Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Add ingestion processors run duration metrics #3224

Merged
merged 3 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ func (r resumeState) run(s *system) (transition, error) {
"commit": true,
}).Info("Processing ledger")

changeStats, ledgerTransactionStats, err := s.runner.RunAllProcessorsOnLedger(ingestLedger)
changeStats, changeDurations, transactionStats, transactionDurations, err :=
s.runner.RunAllProcessorsOnLedger(ingestLedger)
if err != nil {
return retryResume(r), errors.Wrap(err, "Error running processors on ledger")
}
Expand All @@ -477,13 +478,15 @@ func (r resumeState) run(s *system) (transition, error) {
// Update stats metrics
changeStatsMap := changeStats.Map()
r.addLedgerStatsMetricFromMap(s, "change", changeStatsMap)
r.addProcessorDurationsMetricFromMap(s, changeDurations)

ledgerTransactionStatsMap := ledgerTransactionStats.Map()
r.addLedgerStatsMetricFromMap(s, "ledger", ledgerTransactionStatsMap)
transactionStatsMap := transactionStats.Map()
r.addLedgerStatsMetricFromMap(s, "ledger", transactionStatsMap)
r.addProcessorDurationsMetricFromMap(s, transactionDurations)

log.
WithFields(changeStatsMap).
WithFields(ledgerTransactionStatsMap).
WithFields(transactionStatsMap).
WithFields(logpkg.F{
"sequence": ingestLedger,
"duration": duration,
Expand All @@ -506,6 +509,15 @@ func (r resumeState) addLedgerStatsMetricFromMap(s *system, prefix string, m map
}
}

func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for processorName, value := range m {
// * is not accepted in Prometheus labels
processorName = strings.Replace(processorName, "*", "", -1)
s.Metrics().ProcessorsRunDuration.
With(prometheus.Labels{"name": processorName}).Add(value.Seconds())
}
}

type historyRangeState struct {
fromLedger uint32
toLedger uint32
Expand Down Expand Up @@ -576,7 +588,7 @@ func runTransactionProcessorsOnLedger(s *system, ledger uint32) error {
}).Info("Processing ledger")
startTime := time.Now()

ledgerTransactionStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
ledgerTransactionStats, _, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger))
}
Expand Down Expand Up @@ -820,7 +832,7 @@ func (v verifyRangeState) run(s *system) (transition, error) {

var changeStats io.StatsChangeProcessorResults
var ledgerTransactionStats io.StatsLedgerTransactionProcessorResults
changeStats, ledgerTransactionStats, err = s.runner.RunAllProcessorsOnLedger(sequence)
changeStats, _, ledgerTransactionStats, _, err = s.runner.RunAllProcessorsOnLedger(sequence)
if err != nil {
err = errors.Wrap(err, "Error running processors on ledger")
return stop(), err
Expand Down Expand Up @@ -887,7 +899,7 @@ func (stressTestState) run(s *system) (transition, error) {
}).Info("Processing ledger")
startTime := time.Now()

changeStats, ledgerTransactionStats, err := s.runner.RunAllProcessorsOnLedger(sequence)
changeStats, _, ledgerTransactionStats, _, err := s.runner.RunAllProcessorsOnLedger(sequence)
if err != nil {
err = errors.Wrap(err, "Error running processors on ledger")
return stop(), err
Expand Down
51 changes: 41 additions & 10 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,83 @@
package ingest

import (
"fmt"
"time"

"github.com/stellar/go/ingest/io"
"github.com/stellar/go/support/errors"
)

type horizonChangeProcessor interface {
io.ChangeProcessor
// TODO maybe rename to Flush()
Commit() error
type processorsRunDurations map[string]time.Duration

func (d processorsRunDurations) AddRunDuration(name string, startTime time.Time) {
d[name] += time.Since(startTime)
}

type groupChangeProcessors []horizonChangeProcessor
type groupChangeProcessors struct {
processors []horizonChangeProcessor
processorsRunDurations
}

func newGroupChangeProcessors(processors []horizonChangeProcessor) *groupChangeProcessors {
return &groupChangeProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
}
}

func (g groupChangeProcessors) ProcessChange(change io.Change) error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessChange(change); err != nil {
return errors.Wrapf(err, "error in %T.ProcessChange", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

func (g groupChangeProcessors) Commit() error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.Commit(); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

type groupTransactionProcessors []horizonTransactionProcessor
type groupTransactionProcessors struct {
processors []horizonTransactionProcessor
processorsRunDurations
}

func newGroupTransactionProcessors(processors []horizonTransactionProcessor) *groupTransactionProcessors {
return &groupTransactionProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
}
}

func (g groupTransactionProcessors) ProcessTransaction(tx io.LedgerTransaction) error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessTransaction(tx); err != nil {
return errors.Wrapf(err, "error in %T.ProcessTransaction", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

func (g groupTransactionProcessors) Commit() error {
for _, p := range g {
for _, p := range g.processors {
startTime := time.Now()
if err := p.Commit(); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}
8 changes: 4 additions & 4 deletions services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ func TestGroupChangeProcessorsTestSuiteLedger(t *testing.T) {
func (s *GroupChangeProcessorsTestSuiteLedger) SetupTest() {
s.processorA = &mockHorizonChangeProcessor{}
s.processorB = &mockHorizonChangeProcessor{}
s.processors = &groupChangeProcessors{
s.processors = newGroupChangeProcessors([]horizonChangeProcessor{
s.processorA,
s.processorB,
}
})
}

func (s *GroupChangeProcessorsTestSuiteLedger) TearDownTest() {
Expand Down Expand Up @@ -127,10 +127,10 @@ func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) {
func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() {
s.processorA = &mockHorizonTransactionProcessor{}
s.processorB = &mockHorizonTransactionProcessor{}
s.processors = &groupTransactionProcessors{
s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{
s.processorA,
s.processorB,
}
})
}

func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerR
s.historyQ.On("GetLatestLedger").Return(uint32(99), nil).Once()

s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, errors.New("my error")).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
errors.New("my error"),
).Once()

next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system)
s.Assert().Error(err)
Expand Down Expand Up @@ -175,7 +179,12 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccess() {

s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once()
for i := 100; i <= 200; i++ {
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).
Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()
}

s.historyQ.On("Commit").Return(nil).Once()
Expand All @@ -191,7 +200,11 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccessOneLedger() {
s.historyQ.On("GetLatestLedger").Return(uint32(99), nil).Once()

s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(true, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()

s.historyQ.On("Commit").Return(nil).Once()

Expand Down Expand Up @@ -333,7 +346,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedge
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).
Return(io.StatsLedgerTransactionProcessorResults{}, errors.New("my error")).Once()
Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
errors.New("my error"),
).Once()
s.historyQ.On("Rollback").Return(nil).Once()

err := s.system.ReingestRange(100, 200, false)
Expand All @@ -353,7 +370,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestCommitFails() {
"DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()

s.historyQ.On("Commit").Return(errors.New("my error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()
Expand All @@ -377,7 +398,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() {
"DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", i).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", i).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()

s.historyQ.On("Commit").Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
Expand All @@ -397,7 +422,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() {
"DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()
s.historyQ.On("Commit").Return(nil).Once()

// Recreate mock in this single test to remove previous assertion.
Expand Down Expand Up @@ -427,7 +456,11 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() {
).Return(nil).Once()

for i := 100; i <= 200; i++ {
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(
io.StatsLedgerTransactionProcessorResults{},
processorsRunDurations{},
nil,
).Once()
}

s.historyQ.On("Commit").Return(nil).Once()
Expand Down
11 changes: 11 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Metrics struct {

// LedgerStatsCounter exposes ledger stats counters (like number of ops/changes).
LedgerStatsCounter *prometheus.CounterVec

// ProcessorsRunDuration exposes processors run durations.
ProcessorsRunDuration *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -267,6 +270,14 @@ func (s *system) initMetrics() {
},
[]string{"type"},
)

s.metrics.ProcessorsRunDuration = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "processor_run_duration_seconds_total",
Help: "run durations of ingestion processors",
},
[]string{"name"},
)
}

func (s *system) Metrics() Metrics {
Expand Down
24 changes: 19 additions & 5 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,30 @@ func (m *mockProcessorsRunner) RunHistoryArchiveIngestion(checkpointLedger uint3
return args.Get(0).(io.StatsChangeProcessorResults), args.Error(1)
}

func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(sequence uint32) (io.StatsChangeProcessorResults, io.StatsLedgerTransactionProcessorResults, error) {
func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(sequence uint32) (
io.StatsChangeProcessorResults,
processorsRunDurations,
io.StatsLedgerTransactionProcessorResults,
processorsRunDurations,
error,
) {
args := m.Called(sequence)
return args.Get(0).(io.StatsChangeProcessorResults),
args.Get(1).(io.StatsLedgerTransactionProcessorResults),
args.Error(2)
args.Get(1).(processorsRunDurations),
args.Get(2).(io.StatsLedgerTransactionProcessorResults),
args.Get(3).(processorsRunDurations),
args.Error(4)
}

func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(sequence uint32) (io.StatsLedgerTransactionProcessorResults, error) {
func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(sequence uint32) (
io.StatsLedgerTransactionProcessorResults,
processorsRunDurations,
error,
) {
args := m.Called(sequence)
return args.Get(0).(io.StatsLedgerTransactionProcessorResults), args.Error(1)
return args.Get(0).(io.StatsLedgerTransactionProcessorResults),
args.Get(1).(processorsRunDurations),
args.Error(2)
}

var _ ProcessorRunnerInterface = (*mockProcessorsRunner)(nil)
Expand Down
Loading