Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port 2.28.3 back to mainline #5215

Merged
merged 5 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## 2.28.3

### Fixed
- Fix claimable_balance_claimants subquery in GetClaimableBalances() [5207](https://github.com/stellar/go/pull/5207)

### Added
- New optional config `SKIP_TXMETA` ([5189](https://github.com/stellar/go/issues/5189)). Defaults to `FALSE`, when `TRUE` the following will occur:
* history_transactions.tx_meta column will have serialized xdr that equates to empty for any protocol version, such as for `xdr.TransactionMeta.V3`, `Operations`, `TxChangesAfter`, `TxChangesBefore` will be empty arrays and `SorobanMeta` will be nil.

### Breaking Changes
- Removed `DISABLE_SOROBAN_INGEST` configuration parameter, use the new `SKIP_TXMETA` parameter instead.

## 2.28.2

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
SkipSorobanIngestion: config.SkipSorobanIngestion,
SkipTxmeta: config.SkipTxmeta,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ type Config struct {
Network string
// DisableTxSub disables transaction submission functionality for Horizon.
DisableTxSub bool
// SkipSorobanIngestion skips Soroban related ingestion processing.
SkipSorobanIngestion bool
// SkipTxmeta, when enabled, will not store meta xdr in history transaction table
SkipTxmeta bool
}
32 changes: 21 additions & 11 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -57,7 +58,7 @@
// ApplyCursor applies cursor to the given sql. For performance reason the limit
// is not applied here. This allows us to hint the planner later to use the right
// indexes.
func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rCursor string, order string) (sq.SelectBuilder, error) {
func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, tableName string, lCursor int64, rCursor string, order string) (sq.SelectBuilder, error) {

Check failure on line 61 in services/horizon/internal/db2/history/claimable_balances.go

View workflow job for this annotation

GitHub Actions / golangci

line is 153 characters (lll)
hasPagedLimit := false
if lCursor > 0 && rCursor != "" {
hasPagedLimit = true
Expand All @@ -67,17 +68,26 @@
case db2.OrderAscending:
if hasPagedLimit {
sql = sql.
Where(sq.Expr("(cb.last_modified_ledger, cb.id) > (?, ?)", lCursor, rCursor))

Where(
sq.Expr(
fmt.Sprintf("(%s.last_modified_ledger, %s.id) > (?, ?)", tableName, tableName),
lCursor, rCursor,
),
)
}
sql = sql.OrderBy("cb.last_modified_ledger asc, cb.id asc")
sql = sql.OrderBy(fmt.Sprintf("%s.last_modified_ledger asc, %s.id asc", tableName, tableName))
case db2.OrderDescending:
if hasPagedLimit {
sql = sql.
Where(sq.Expr("(cb.last_modified_ledger, cb.id) < (?, ?)", lCursor, rCursor))
Where(
sq.Expr(
fmt.Sprintf("(%s.last_modified_ledger, %s.id) < (?, ?)", tableName, tableName),
lCursor,
rCursor,
),
)
}

sql = sql.OrderBy("cb.last_modified_ledger desc, cb.id desc")
sql = sql.OrderBy(fmt.Sprintf("%s.last_modified_ledger desc, %s.id desc", tableName, tableName))
default:
return sql, errors.Errorf("invalid order: %s", order)
}
Expand Down Expand Up @@ -216,7 +226,7 @@
return nil, errors.Wrap(err, "error getting cursor")
}

sql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalances, l, r, query.PageQuery.Order)
sql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalances, "cb", l, r, query.PageQuery.Order)
if err != nil {
return nil, errors.Wrap(err, "could not apply query to page")
}
Expand All @@ -242,10 +252,10 @@
// does not perform efficiently. Instead, use a subquery (with LIMIT) to retrieve claimable balances based on
// the claimant's address.

var selectClaimableBalanceClaimants = sq.Select("id").From("claimable_balance_claimants").
Where("destination = ?", query.Claimant.Address()).Limit(query.PageQuery.Limit)
var selectClaimableBalanceClaimants = sq.Select("claimable_balance_claimants.id").From("claimable_balance_claimants").
Where("claimable_balance_claimants.destination = ?", query.Claimant.Address()).Limit(query.PageQuery.Limit)

subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, l, r, query.PageQuery.Order)
subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, "claimable_balance_claimants", l, r, query.PageQuery.Order)

Check failure on line 258 in services/horizon/internal/db2/history/claimable_balances.go

View workflow job for this annotation

GitHub Actions / golangci

line is 145 characters (lll)
if err != nil {
return nil, errors.Wrap(err, "could not apply subquery to page")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -203,7 +204,7 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)

// this validates the cb query with claimant and cb.id/ledger cursor parameters
query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, "", 10)
query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, "asc", 10)
query.Claimant = xdr.MustAddressPtr(dest1)
cbs, err = q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
Expand All @@ -212,7 +213,7 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {

// this validates the cb query with no claimant parameter,
// should still produce working sql, as it triggers different LIMIT position in sql.
query.PageQuery = db2.MustPageQuery("", false, "", 1)
query.PageQuery = db2.MustPageQuery("", false, "desc", 1)
query.Claimant = nil
cbs, err = q.GetClaimableBalances(tt.Ctx, query)
tt.Assert.NoError(err)
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ const (
EnableIngestionFilteringFlagName = "exp-enable-ingestion-filtering"
// DisableTxSubFlagName is the command line flag for disabling transaction submission feature of Horizon
DisableTxSubFlagName = "disable-tx-sub"
// SkipSorobanIngestionFlagName is the command line flag for disabling Soroban related ingestion processing
SkipSorobanIngestionFlagName = "disable-soroban-ingest"
// SkipTxmeta is the command line flag for disabling persistence of tx meta in history transaction table
SkipTxmeta = "skip-txmeta"

// StellarPubnet is a constant representing the Stellar public network
StellarPubnet = "pubnet"
Expand Down Expand Up @@ -740,12 +740,12 @@ func Flags() (*Config, support.ConfigOptions) {
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: SkipSorobanIngestionFlagName,
ConfigKey: &config.SkipSorobanIngestion,
Name: SkipTxmeta,
ConfigKey: &config.SkipTxmeta,
OptType: types.Bool,
FlagDefault: false,
Required: false,
Usage: "excludes Soroban data during ingestion processing",
Usage: "excludes tx meta from persistence on transaction history",
UsedInCommands: IngestionCommands,
},
}
Expand Down
3 changes: 1 addition & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ type Config struct {

EnableIngestionFiltering bool
MaxLedgerPerFlush uint32

SkipSorobanIngestion bool
SkipTxmeta bool
}

const (
Expand Down
14 changes: 4 additions & 10 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func buildChangeProcessor(
source ingestionSource,
ledgerSequence uint32,
networkPassphrase string,
skipSorobanIngestion bool,
) *groupChangeProcessors {
statsChangeProcessor := &statsChangeProcessor{
StatsChangeProcessor: changeStats,
Expand Down Expand Up @@ -145,13 +144,13 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors

processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase),
ledgersProcessor,
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase),
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipSorobanIngestion),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipTxmeta),
processors.NewClaimableBalancesTransactionProcessor(cbLoader,
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
Expand All @@ -173,10 +172,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(
s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(),
s.config.SkipSorobanIngestion,
)
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta)
p = append(p, txSubProc)
}

Expand Down Expand Up @@ -239,7 +235,6 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
historyArchiveSource,
checkpointLedger,
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)

if checkpointLedger == 1 {
Expand Down Expand Up @@ -498,7 +493,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledgerSource,
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)
err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
}

stats := &ingest.StatsChangeProcessor{}
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "", false)
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand All @@ -201,7 +201,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
filters: &MockFilters{},
}

processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "", false)
processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand Down Expand Up @@ -271,7 +271,6 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
EnableIngestionFiltering: true,
SkipSorobanIngestion: false,
}

q := &mockDBQ{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) TestEmptyClaimabl

func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInserts(balanceID xdr.ClaimableBalanceId, body xdr.OperationBody, change xdr.LedgerEntryChange) {
// Setup the transaction
txn := createTransaction(true, 1)
txn := createTransaction(true, 1, 2)
txn.Envelope.Operations()[0].Body = body
txn.UnsafeMeta.V = 2
txn.UnsafeMeta.V2.Operations = []xdr.OperationMeta{
Expand Down
22 changes: 2 additions & 20 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,17 @@ type EffectProcessor struct {
accountLoader *history.AccountLoader
batch history.EffectBatchInsertBuilder
network string
skipSoroban bool
}

func NewEffectProcessor(
accountLoader *history.AccountLoader,
batch history.EffectBatchInsertBuilder,
network string,
skipSoroban bool,
) *EffectProcessor {
return &EffectProcessor{
accountLoader: accountLoader,
batch: batch,
network: network,
skipSoroban: skipSoroban,
}
}

Expand All @@ -53,29 +50,14 @@ func (p *EffectProcessor) ProcessTransaction(
return nil
}

elidedTransaction := transaction

if p.skipSoroban &&
elidedTransaction.UnsafeMeta.V == 3 &&
elidedTransaction.UnsafeMeta.V3.SorobanMeta != nil {
elidedTransaction.UnsafeMeta.V3 = &xdr.TransactionMetaV3{
Ext: xdr.ExtensionPoint{},
TxChangesBefore: xdr.LedgerEntryChanges{},
Operations: []xdr.OperationMeta{},
TxChangesAfter: xdr.LedgerEntryChanges{},
SorobanMeta: nil,
}
}

for opi, op := range elidedTransaction.Envelope.Operations() {
for opi, op := range transaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(opi),
transaction: elidedTransaction,
transaction: transaction,
operation: op,
ledgerSequence: uint32(lcm.LedgerSequence()),
network: p.network,
}

if err := operation.ingestEffects(p.accountLoader, p.batch); err != nil {
return errors.Wrapf(err, "reading operation %v effects", operation.ID())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() {
s.accountLoader,
s.mockBatchInsertBuilder,
networkPassphrase,
false,
)

s.txs = []ingest.LedgerTransaction{
Expand Down
Loading
Loading