Skip to content

Commit

Permalink
stellar#4909: run ledger processor regardless of whether a ledger has…
Browse files Browse the repository at this point in the history
… 0 or more transactions
  • Loading branch information
sreuland committed Oct 19, 2023
1 parent 30ed273 commit 1efb7a3
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 15 deletions.
2 changes: 1 addition & 1 deletion services/horizon/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
horizon-postgres:
image: postgres:9.6.17-alpine
image: postgres:postgres:12-bullseye
restart: on-failure
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
Expand Down
19 changes: 12 additions & 7 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func buildChangeProcessor(
func (s *ProcessorRunner) buildTransactionProcessor(
ledgerTransactionStats *processors.StatsLedgerTransactionProcessor,
tradeProcessor *processors.TradeProcessor,
ledgersProcessor *processors.LedgersProcessor,
ledger xdr.LedgerCloseMeta,
) *groupTransactionProcessors {
accountLoader := history.NewAccountLoader()
Expand All @@ -151,7 +152,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(
processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder()),
processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)),
ledgersProcessor,
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder()),
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
Expand Down Expand Up @@ -321,21 +322,25 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
transactionReader *ingest.LedgerTransactionReader
)

transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger)
if err != nil {
err = errors.Wrap(err, "Error creating ledger reader")
if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil {
err = errors.Wrap(err, "Error while checking for supported protocol version")
return
}

if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil {
err = errors.Wrap(err, "Error while checking for supported protocol version")
// ensure capture of the ledger to history regardless of whether it has transactions.
ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion))
ledgersProcessor.ProcessLedger(ledger)

transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger)
if err != nil {
err = errors.Wrap(err, "Error creating ledger reader")
return
}

groupTransactionFilterers := s.buildTransactionFilterer()
groupFilteredOutProcessors := s.buildFilteredOutProcessor()
groupTransactionProcessors := s.buildTransactionProcessor(
&ledgerTransactionStats, &tradeProcessor, ledger)
&ledgerTransactionStats, &tradeProcessor, ledgersProcessor, ledger)
err = processors.StreamLedgerTransactions(s.ctx,
groupTransactionFilterers,
groupFilteredOutProcessors,
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,10 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) {
LedgerHeader: xdr.LedgerHeaderHistoryEntry{},
},
}
processor := runner.buildTransactionProcessor(stats, trades, ledger)
assert.IsType(t, &groupTransactionProcessors{}, processor)
ledgersProcessor := &processors.LedgersProcessor{}

processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor, ledger)
assert.IsType(t, &groupTransactionProcessors{}, processor)
assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0])
assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1])
assert.IsType(t, &processors.LedgersProcessor{}, processor.processors[2])
Expand Down
14 changes: 9 additions & 5 deletions services/horizon/internal/ingest/processors/ledgers_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stellar/go/xdr"
)

type ledgerInfo struct {
type LedgerInfo struct {
header xdr.LedgerHeaderHistoryEntry
successTxCount int
failedTxCount int
Expand All @@ -20,26 +20,30 @@ type ledgerInfo struct {

type LedgersProcessor struct {
batch history.LedgerBatchInsertBuilder
ledgers map[uint32]*ledgerInfo
ledgers map[uint32]*LedgerInfo
ingestVersion int
}

func NewLedgerProcessor(batch history.LedgerBatchInsertBuilder, ingestVersion int) *LedgersProcessor {
return &LedgersProcessor{
batch: batch,
ledgers: map[uint32]*ledgerInfo{},
ledgers: map[uint32]*LedgerInfo{},
ingestVersion: ingestVersion,
}
}

func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
func (p *LedgersProcessor) ProcessLedger(lcm xdr.LedgerCloseMeta) *LedgerInfo {
sequence := lcm.LedgerSequence()
entry, ok := p.ledgers[sequence]
if !ok {
entry = &ledgerInfo{header: lcm.LedgerHeaderHistoryEntry()}
entry = &LedgerInfo{header: lcm.LedgerHeaderHistoryEntry()}
p.ledgers[sequence] = entry
}
return entry
}

func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
entry := p.ProcessLedger(lcm)
opCount := len(transaction.Envelope.Operations())
entry.txSetOpCount += opCount
if transaction.Result.Successful() {
Expand Down

0 comments on commit 1efb7a3

Please sign in to comment.