Skip to content

Commit

Permalink
Port 2.28.3 back to mainline, #5215
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Feb 21, 2024
2 parents 8e18724 + d356615 commit 84c21d0
Show file tree
Hide file tree
Showing 26 changed files with 661 additions and 638 deletions.
12 changes: 12 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## 2.28.3

### Fixed
- Fix claimable_balance_claimants subquery in GetClaimableBalances() [5207](https://github.com/stellar/go/pull/5207)

### Added
- New optional config `SKIP_TXMETA` ([5189](https://github.com/stellar/go/issues/5189)). Defaults to `FALSE`, when `TRUE` the following will occur:
* history_transactions.tx_meta column will have serialized xdr that equates to empty for any protocol version, such as for `xdr.TransactionMeta.V3`, `Operations`, `TxChangesAfter`, `TxChangesBefore` will be empty arrays and `SorobanMeta` will be nil.

### Breaking Changes
- Removed `DISABLE_SOROBAN_INGEST` configuration parameter, use the new `SKIP_TXMETA` parameter instead.

## 2.28.2

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
SkipSorobanIngestion: config.SkipSorobanIngestion,
SkipTxmeta: config.SkipTxmeta,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ type Config struct {
Network string
// DisableTxSub disables transaction submission functionality for Horizon.
DisableTxSub bool
// SkipSorobanIngestion skips Soroban related ingestion processing.
SkipSorobanIngestion bool
// SkipTxmeta, when enabled, will not store meta xdr in history transaction table
SkipTxmeta bool
}
32 changes: 21 additions & 11 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (cbq ClaimableBalancesQuery) Cursor() (int64, string, error) {
// ApplyCursor applies cursor to the given sql. For performance reason the limit
// is not applied here. This allows us to hint the planner later to use the right
// indexes.
func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rCursor string, order string) (sq.SelectBuilder, error) {
func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, tableName string, lCursor int64, rCursor string, order string) (sq.SelectBuilder, error) {
hasPagedLimit := false
if lCursor > 0 && rCursor != "" {
hasPagedLimit = true
Expand All @@ -67,17 +68,26 @@ func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rC
case db2.OrderAscending:
if hasPagedLimit {
sql = sql.
Where(sq.Expr("(cb.last_modified_ledger, cb.id) > (?, ?)", lCursor, rCursor))

Where(
sq.Expr(
fmt.Sprintf("(%s.last_modified_ledger, %s.id) > (?, ?)", tableName, tableName),
lCursor, rCursor,
),
)
}
sql = sql.OrderBy("cb.last_modified_ledger asc, cb.id asc")
sql = sql.OrderBy(fmt.Sprintf("%s.last_modified_ledger asc, %s.id asc", tableName, tableName))
case db2.OrderDescending:
if hasPagedLimit {
sql = sql.
Where(sq.Expr("(cb.last_modified_ledger, cb.id) < (?, ?)", lCursor, rCursor))
Where(
sq.Expr(
fmt.Sprintf("(%s.last_modified_ledger, %s.id) < (?, ?)", tableName, tableName),
lCursor,
rCursor,
),
)
}

sql = sql.OrderBy("cb.last_modified_ledger desc, cb.id desc")
sql = sql.OrderBy(fmt.Sprintf("%s.last_modified_ledger desc, %s.id desc", tableName, tableName))
default:
return sql, errors.Errorf("invalid order: %s", order)
}
Expand Down Expand Up @@ -216,7 +226,7 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
return nil, errors.Wrap(err, "error getting cursor")
}

sql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalances, l, r, query.PageQuery.Order)
sql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalances, "cb", l, r, query.PageQuery.Order)
if err != nil {
return nil, errors.Wrap(err, "could not apply query to page")
}
Expand All @@ -242,10 +252,10 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
// does not perform efficiently. Instead, use a subquery (with LIMIT) to retrieve claimable balances based on
// the claimant's address.

var selectClaimableBalanceClaimants = sq.Select("id").From("claimable_balance_claimants").
Where("destination = ?", query.Claimant.Address()).Limit(query.PageQuery.Limit)
var selectClaimableBalanceClaimants = sq.Select("claimable_balance_claimants.id").From("claimable_balance_claimants").
Where("claimable_balance_claimants.destination = ?", query.Claimant.Address()).Limit(query.PageQuery.Limit)

subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, l, r, query.PageQuery.Order)
subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, "claimable_balance_claimants", l, r, query.PageQuery.Order)
if err != nil {
return nil, errors.Wrap(err, "could not apply subquery to page")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -203,7 +204,7 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)

// this validates the cb query with claimant and cb.id/ledger cursor parameters
query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, "", 10)
query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, "asc", 10)
query.Claimant = xdr.MustAddressPtr(dest1)
cbs, err = q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
Expand All @@ -212,7 +213,7 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {

// this validates the cb query with no claimant parameter,
// should still produce working sql, as it triggers different LIMIT position in sql.
query.PageQuery = db2.MustPageQuery("", false, "", 1)
query.PageQuery = db2.MustPageQuery("", false, "desc", 1)
query.Claimant = nil
cbs, err = q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ const (
EnableIngestionFilteringFlagName = "exp-enable-ingestion-filtering"
// DisableTxSubFlagName is the command line flag for disabling transaction submission feature of Horizon
DisableTxSubFlagName = "disable-tx-sub"
// SkipSorobanIngestionFlagName is the command line flag for disabling Soroban related ingestion processing
SkipSorobanIngestionFlagName = "disable-soroban-ingest"
// SkipTxmeta is the command line flag for disabling persistence of tx meta in history transaction table
SkipTxmeta = "skip-txmeta"

// StellarPubnet is a constant representing the Stellar public network
StellarPubnet = "pubnet"
Expand Down Expand Up @@ -740,12 +740,12 @@ func Flags() (*Config, support.ConfigOptions) {
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: SkipSorobanIngestionFlagName,
ConfigKey: &config.SkipSorobanIngestion,
Name: SkipTxmeta,
ConfigKey: &config.SkipTxmeta,
OptType: types.Bool,
FlagDefault: false,
Required: false,
Usage: "excludes Soroban data during ingestion processing",
Usage: "excludes tx meta from persistence on transaction history",
UsedInCommands: IngestionCommands,
},
}
Expand Down
3 changes: 1 addition & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ type Config struct {

EnableIngestionFiltering bool
MaxLedgerPerFlush uint32

SkipSorobanIngestion bool
SkipTxmeta bool
}

const (
Expand Down
14 changes: 4 additions & 10 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func buildChangeProcessor(
source ingestionSource,
ledgerSequence uint32,
networkPassphrase string,
skipSorobanIngestion bool,
) *groupChangeProcessors {
statsChangeProcessor := &statsChangeProcessor{
StatsChangeProcessor: changeStats,
Expand Down Expand Up @@ -145,13 +144,13 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors

processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase),
ledgersProcessor,
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase),
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipSorobanIngestion),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipTxmeta),
processors.NewClaimableBalancesTransactionProcessor(cbLoader,
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
Expand All @@ -173,10 +172,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(
s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(),
s.config.SkipSorobanIngestion,
)
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta)
p = append(p, txSubProc)
}

Expand Down Expand Up @@ -239,7 +235,6 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
historyArchiveSource,
checkpointLedger,
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)

if checkpointLedger == 1 {
Expand Down Expand Up @@ -498,7 +493,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledgerSource,
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)
err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
}

stats := &ingest.StatsChangeProcessor{}
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "", false)
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand All @@ -201,7 +201,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
filters: &MockFilters{},
}

processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "", false)
processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand Down Expand Up @@ -271,7 +271,6 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
EnableIngestionFiltering: true,
SkipSorobanIngestion: false,
}

q := &mockDBQ{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) TestEmptyClaimabl

func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInserts(balanceID xdr.ClaimableBalanceId, body xdr.OperationBody, change xdr.LedgerEntryChange) {
// Setup the transaction
txn := createTransaction(true, 1)
txn := createTransaction(true, 1, 2)
txn.Envelope.Operations()[0].Body = body
txn.UnsafeMeta.V = 2
txn.UnsafeMeta.V2.Operations = []xdr.OperationMeta{
Expand Down
22 changes: 2 additions & 20 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,17 @@ type EffectProcessor struct {
accountLoader *history.AccountLoader
batch history.EffectBatchInsertBuilder
network string
skipSoroban bool
}

func NewEffectProcessor(
accountLoader *history.AccountLoader,
batch history.EffectBatchInsertBuilder,
network string,
skipSoroban bool,
) *EffectProcessor {
return &EffectProcessor{
accountLoader: accountLoader,
batch: batch,
network: network,
skipSoroban: skipSoroban,
}
}

Expand All @@ -53,29 +50,14 @@ func (p *EffectProcessor) ProcessTransaction(
return nil
}

elidedTransaction := transaction

if p.skipSoroban &&
elidedTransaction.UnsafeMeta.V == 3 &&
elidedTransaction.UnsafeMeta.V3.SorobanMeta != nil {
elidedTransaction.UnsafeMeta.V3 = &xdr.TransactionMetaV3{
Ext: xdr.ExtensionPoint{},
TxChangesBefore: xdr.LedgerEntryChanges{},
Operations: []xdr.OperationMeta{},
TxChangesAfter: xdr.LedgerEntryChanges{},
SorobanMeta: nil,
}
}

for opi, op := range elidedTransaction.Envelope.Operations() {
for opi, op := range transaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(opi),
transaction: elidedTransaction,
transaction: transaction,
operation: op,
ledgerSequence: uint32(lcm.LedgerSequence()),
network: p.network,
}

if err := operation.ingestEffects(p.accountLoader, p.batch); err != nil {
return errors.Wrapf(err, "reading operation %v effects", operation.ID())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() {
s.accountLoader,
s.mockBatchInsertBuilder,
networkPassphrase,
false,
)

s.txs = []ingest.LedgerTransaction{
Expand Down
Loading

0 comments on commit 84c21d0

Please sign in to comment.