Skip to content

Commit

Permalink
Exclude Soroban meta in the process runner before passing it down to …
Browse files Browse the repository at this point in the history
…processors to prevent making changes in the individual processors.
  • Loading branch information
urvisavla committed Jan 23, 2024
1 parent 66342bb commit e5db60b
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 77 deletions.
11 changes: 1 addition & 10 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,16 @@ func (d processorsRunDurations) AddRunDuration(name string, startTime time.Time)
type groupChangeProcessors struct {
processors []horizonChangeProcessor
processorsRunDurations
skipEntryType []xdr.LedgerEntryType
}

func newGroupChangeProcessors(processors []horizonChangeProcessor, skipEntryType []xdr.LedgerEntryType) *groupChangeProcessors {
func newGroupChangeProcessors(processors []horizonChangeProcessor) *groupChangeProcessors {
return &groupChangeProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
skipEntryType: skipEntryType,
}
}

func (g groupChangeProcessors) ProcessChange(ctx context.Context, change ingest.Change) error {

for _, op := range g.skipEntryType {
if op == change.Type {
return nil
}
}

for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessChange(ctx, change); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *GroupChangeProcessorsTestSuiteLedger) SetupTest() {
s.processors = newGroupChangeProcessors([]horizonChangeProcessor{
s.processorA,
s.processorB,
}, []xdr.LedgerEntryType{})
})
}

func (s *GroupChangeProcessorsTestSuiteLedger) TearDownTest() {
Expand Down
47 changes: 19 additions & 28 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,11 @@ func buildChangeProcessor(
source ingestionSource,
ledgerSequence uint32,
networkPassphrase string,
skipSorobanIngestion bool,
) *groupChangeProcessors {
statsChangeProcessor := &statsChangeProcessor{
StatsChangeProcessor: changeStats,
}

var skipEntryType []xdr.LedgerEntryType

if skipSorobanIngestion {
skipEntryType = []xdr.LedgerEntryType{
xdr.LedgerEntryTypeContractData,
xdr.LedgerEntryTypeContractCode,
xdr.LedgerEntryTypeTtl,
xdr.LedgerEntryTypeConfigSetting,
}
}

useLedgerCache := source == ledgerSource
return newGroupChangeProcessors([]horizonChangeProcessor{
statsChangeProcessor,
Expand All @@ -139,7 +127,7 @@ func buildChangeProcessor(
processors.NewTrustLinesProcessor(historyQ),
processors.NewClaimableBalancesChangeProcessor(historyQ),
processors.NewLiquidityPoolsChangeProcessor(historyQ, ledgerSequence),
}, skipEntryType)
})
}

func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) *groupTransactionProcessors {
Expand All @@ -154,22 +142,11 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors
tradeProcessor := processors.NewTradeProcessor(accountLoader,
lpLoader, assetLoader, s.historyQ.NewTradeBatchInsertBuilder())

var skipOperationType []xdr.OperationType

if s.config.SkipSorobanIngestion {
skipOperationType = []xdr.OperationType{
xdr.OperationTypeInvokeHostFunction,
xdr.OperationTypeExtendFootprintTtl,
xdr.OperationTypeRestoreFootprint,
xdr.OperationTypeExtendFootprintTtl,
}
}

processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase, skipOperationType),
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase),
ledgersProcessor,
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase, skipOperationType),
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase),
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
Expand Down Expand Up @@ -258,7 +235,6 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
historyArchiveSource,
checkpointLedger,
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)

if checkpointLedger == 1 {
Expand Down Expand Up @@ -504,6 +480,22 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
stats ledgerStats,
err error,
) {
if s.config.SkipSorobanIngestion {
for txIndex := 0; txIndex < ledger.CountTransactions(); txIndex++ {
txMeta := &ledger.MustV1().TxProcessing[txIndex].TxApplyProcessing
if v3Meta, ok := txMeta.GetV3(); ok && v3Meta.SorobanMeta != nil {
// it's soroban, elide the tx meta, force empty meta into the xdr instead
txMeta.V3 = &xdr.TransactionMetaV3{
Ext: xdr.ExtensionPoint{},
TxChangesBefore: xdr.LedgerEntryChanges{},
Operations: []xdr.OperationMeta{},
TxChangesAfter: xdr.LedgerEntryChanges{},
SorobanMeta: nil,
}
}
}
}

changeStatsProcessor := ingest.StatsChangeProcessor{}

if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil {
Expand All @@ -517,7 +509,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledgerSource,
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)
err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
}

stats := &ingest.StatsChangeProcessor{}
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "", false)
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand All @@ -201,7 +201,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
filters: &MockFilters{},
}

processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "", false)
processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand Down Expand Up @@ -271,6 +271,7 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
EnableIngestionFiltering: true,
SkipSorobanIngestion: false,
}

q := &mockDBQ{}
Expand Down
23 changes: 6 additions & 17 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,20 @@ import (

// EffectProcessor process effects
type EffectProcessor struct {
accountLoader *history.AccountLoader
batch history.EffectBatchInsertBuilder
network string
skipOperationType []xdr.OperationType
accountLoader *history.AccountLoader
batch history.EffectBatchInsertBuilder
network string
}

func NewEffectProcessor(
accountLoader *history.AccountLoader,
batch history.EffectBatchInsertBuilder,
network string,
skipOperationType []xdr.OperationType,
) *EffectProcessor {
return &EffectProcessor{
accountLoader: accountLoader,
batch: batch,
network: network,
skipOperationType: skipOperationType,
accountLoader: accountLoader,
batch: batch,
network: network,
}
}

Expand All @@ -53,7 +50,6 @@ func (p *EffectProcessor) ProcessTransaction(
return nil
}

OUTER:
for opi, op := range transaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(opi),
Expand All @@ -62,13 +58,6 @@ OUTER:
ledgerSequence: uint32(lcm.LedgerSequence()),
network: p.network,
}

for _, op := range p.skipOperationType {
if op == operation.OperationType() {
continue OUTER
}
}

if err := operation.ingestEffects(p.accountLoader, p.batch); err != nil {
return errors.Wrapf(err, "reading operation %v effects", operation.ID())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() {
s.accountLoader,
s.mockBatchInsertBuilder,
networkPassphrase,
[]xdr.OperationType{},
)

s.txs = []ingest.LedgerTransaction{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@ import (

// OperationProcessor operations processor
type OperationProcessor struct {
batch history.OperationBatchInsertBuilder
network string
skipOperationType []xdr.OperationType
batch history.OperationBatchInsertBuilder
network string
}

func NewOperationProcessor(batch history.OperationBatchInsertBuilder, network string, skipOperationType []xdr.OperationType) *OperationProcessor {
func NewOperationProcessor(batch history.OperationBatchInsertBuilder, network string) *OperationProcessor {
return &OperationProcessor{
batch: batch,
network: network,
skipOperationType: skipOperationType,
batch: batch,
network: network,
}
}

// ProcessTransaction process the given transaction
func (p *OperationProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {

OUTER:
for i, op := range transaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(i),
Expand All @@ -46,12 +42,6 @@ OUTER:
ledgerSequence: lcm.LedgerSequence(),
network: p.network,
}
for _, op := range p.skipOperationType {
if op == operation.OperationType() {
continue OUTER
}
}

details, err := operation.Details()
if err != nil {
return errors.Wrapf(err, "Error obtaining details for operation %v", operation.ID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (s *OperationsProcessorTestSuiteLedger) SetupTest() {
s.processor = NewOperationProcessor(
s.mockBatchInsertBuilder,
"test network",
[]xdr.OperationType{},
)
}

Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestStateVerifierLockBusy(t *testing.T) {
tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{}))

checkpointLedger := uint32(63)
changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "", false)
changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "")

gen := randxdr.NewGenerator()
var changes []xdr.LedgerEntryChange
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestStateVerifier(t *testing.T) {

ledger := rand.Int31()
checkpointLedger := uint32(ledger - (ledger % 64) - 1)
changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "", false)
changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "")
mockChangeReader := &ingest.MockChangeReader{}

gen := randxdr.NewGenerator()
Expand Down

0 comments on commit e5db60b

Please sign in to comment.