From 1ea7b1155a9f4e8f75b7b16e601e1765077beda5 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 7 May 2024 15:04:49 -0400 Subject: [PATCH 1/7] Refactor ingestion filtering rules for OR rather than AND --- .../internal/ingest/filters/account.go | 22 +++--- .../horizon/internal/ingest/filters/asset.go | 22 +++--- .../internal/ingest/group_processors.go | 32 +++++++-- .../internal/ingest/processors/main.go | 1 + .../integration/ingestion_filtering_test.go | 67 ++++++++++++++++++- 5 files changed, 114 insertions(+), 30 deletions(-) diff --git a/services/horizon/internal/ingest/filters/account.go b/services/horizon/internal/ingest/filters/account.go index 08def6a2b7..325d3c5d29 100644 --- a/services/horizon/internal/ingest/filters/account.go +++ b/services/horizon/internal/ingest/filters/account.go @@ -26,30 +26,25 @@ func NewAccountFilter() AccountFilter { } } -func (filter *accountFilter) Name() string { +func (f *accountFilter) Name() string { return "filters.accountFilter" } -func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error { +func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error { // only need to re-initialize the filter config state(rules) if its cached version(in memory) // is older than the incoming config version based on lastModified epoch timestamp - if filterConfig.LastModified > filter.lastModified { + if filterConfig.LastModified > f.lastModified { logger.Infof("New Account Filter config detected, reloading new config %v ", *filterConfig) - filter.enabled = filterConfig.Enabled - filter.whitelistedAccountsSet = listToSet(filterConfig.Whitelist) - filter.lastModified = filterConfig.LastModified + f.enabled = filterConfig.Enabled + f.whitelistedAccountsSet = listToSet(filterConfig.Whitelist) + f.lastModified = filterConfig.LastModified } return nil } func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - // filtering is disabled if the whitelist is empty for now, as that is the only filter rule - if len(f.whitelistedAccountsSet) == 0 || !f.enabled { - return true, nil - } - participants, err := processors.ParticipantsForTransaction(0, transaction) if err != nil { return false, err @@ -64,3 +59,8 @@ func (f *accountFilter) FilterTransaction(ctx context.Context, transaction inges } return false, nil } + +func (f accountFilter) IsEmpty(ctx context.Context) bool { + // filtering is disabled if the whitelist is empty for now, as that is the only filter rule + return len(f.whitelistedAccountsSet) == 0 || !f.enabled +} diff --git a/services/horizon/internal/ingest/filters/asset.go b/services/horizon/internal/ingest/filters/asset.go index 0a3a4ca6c7..2e301e9db5 100644 --- a/services/horizon/internal/ingest/filters/asset.go +++ b/services/horizon/internal/ingest/filters/asset.go @@ -34,29 +34,24 @@ func NewAssetFilter() AssetFilter { } } -func (filter *assetFilter) Name() string { +func (f *assetFilter) Name() string { return "filters.assetFilter" } -func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error { +func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error { // only need to re-initialize the filter config state(rules) if it's cached version(in memory) // is older than the incoming config version based on lastModified epoch timestamp - if filterConfig.LastModified > filter.lastModified { + if filterConfig.LastModified > f.lastModified { logger.Infof("New Asset Filter config detected, reloading new config %v ", *filterConfig) - filter.enabled = filterConfig.Enabled - filter.canonicalAssetsLookup = listToSet(filterConfig.Whitelist) - filter.lastModified = filterConfig.LastModified + f.enabled = filterConfig.Enabled + f.canonicalAssetsLookup = listToSet(filterConfig.Whitelist) + f.lastModified = filterConfig.LastModified } return nil } func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - // filtering is disabled if the whitelist is empty for now as that is the only filter rule - if len(f.canonicalAssetsLookup) < 1 || !f.enabled { - return true, nil - } - var operations []xdr.Operation if txv1, v1Exists := transaction.Envelope.GetV1(); v1Exists { @@ -144,3 +139,8 @@ func listToSet(list []string) set.Set[string] { } return set } + +func (f assetFilter) IsEmpty(ctx context.Context) bool { + // filtering is disabled if the whitelist is empty for now as that is the only filter rule + return len(f.canonicalAssetsLookup) < 1 || !f.enabled +} diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index 00f8adde8a..6a17082582 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -179,20 +179,38 @@ func (g *groupTransactionFilterers) Name() string { } func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) { + include := false + noFiltersDefined := true + for _, f := range g.filterers { + if f.IsEmpty(ctx) { + continue + } + + // This means atleast one filter is defined. + noFiltersDefined = false + startTime := time.Now() - include, err := f.FilterTransaction(ctx, tx) + inc, err := f.FilterTransaction(ctx, tx) if err != nil { return false, errors.Wrapf(err, "error in %T.FilterTransaction", f) } g.AddRunDuration(f.Name(), startTime) - if !include { - // filter out, we can return early - g.droppedTransactions++ - return false, nil - } + include = include || inc } - return true, nil + + // Transaction is stored only if there are no filtering rules or atleast one of the rules + // whitelists the transaction. + if noFiltersDefined || include { + return true, nil + } + + g.droppedTransactions++ + return false, nil +} + +func (g *groupTransactionFilterers) IsEmpty(ctx context.Context) bool { + return len(g.filterers) < 1 } func (g *groupTransactionFilterers) ResetStats() { diff --git a/services/horizon/internal/ingest/processors/main.go b/services/horizon/internal/ingest/processors/main.go index 2b6c1cc8fb..de9744aa6c 100644 --- a/services/horizon/internal/ingest/processors/main.go +++ b/services/horizon/internal/ingest/processors/main.go @@ -29,6 +29,7 @@ type LedgerTransactionProcessor interface { type LedgerTransactionFilterer interface { Name() string FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) + IsEmpty(ctx context.Context) bool } func StreamLedgerTransactions( diff --git a/services/horizon/internal/integration/ingestion_filtering_test.go b/services/horizon/internal/integration/ingestion_filtering_test.go index 6d0bcee69e..f35e67cada 100644 --- a/services/horizon/internal/integration/ingestion_filtering_test.go +++ b/services/horizon/internal/integration/ingestion_filtering_test.go @@ -5,12 +5,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/clients/horizonclient" hProtocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/services/horizon/internal/ingest/filters" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/txnbuild" - "github.com/stretchr/testify/assert" ) func TestFilteringWithNoFilters(t *testing.T) { @@ -168,3 +169,67 @@ func TestFilteringAssetWhiteList(t *testing.T) { _, err = itest.Client().TransactionDetail(txResp.Hash) tt.NoError(err) } + +func TestFilteringAssetAndAccountFilters(t *testing.T) { + tt := assert.New(t) + const adminPort uint16 = 6000 + itest := integration.NewTest(t, integration.Config{ + HorizonIngestParameters: map[string]string{ + "admin-port": strconv.Itoa(int(adminPort)), + }, + }) + + fullKeys, accounts := itest.CreateAccounts(2, "10000") + whitelistedAccount := accounts[0] + whitelistedAccountKey := fullKeys[0] + nonWhitelistedAccount := accounts[1] + nonWhitelistedAccountKey := fullKeys[1] + enabled := true + + whitelistedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()} + nonWhitelistedAsset := txnbuild.CreditAsset{Code: "SEK", Issuer: nonWhitelistedAccountKey.Address()} + itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, nonWhitelistedAsset) + + // Setup whitelisted account and asset rule, force refresh of filter configs to be quick + filters.SetFilterConfigCheckIntervalSeconds(1) + + expectedAccountFilter := hProtocol.AccountFilterConfig{ + Whitelist: []string{whitelistedAccount.GetAccountID()}, + Enabled: &enabled, + } + err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter) + tt.NoError(err) + accountFilter, err := itest.AdminClient().GetIngestionAccountFilter() + tt.NoError(err) + tt.ElementsMatch(expectedAccountFilter.Whitelist, accountFilter.Whitelist) + tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled) + + asset, err := whitelistedAsset.ToXDR() + tt.NoError(err) + expectedAssetFilter := hProtocol.AssetFilterConfig{ + Whitelist: []string{asset.StringCanonical()}, + Enabled: &enabled, + } + err = itest.AdminClient().SetIngestionAssetFilter(expectedAssetFilter) + tt.NoError(err) + assetFilter, err := itest.AdminClient().GetIngestionAssetFilter() + tt.NoError(err) + + tt.ElementsMatch(expectedAssetFilter.Whitelist, assetFilter.Whitelist) + tt.Equal(expectedAssetFilter.Enabled, assetFilter.Enabled) + + // Ensure the latest filter configs are reloaded by the ingestion state machine processor + time.Sleep(filters.GetFilterConfigCheckIntervalSeconds() * time.Second) + + // Use a non-whitelisted account to submit a non-whitelisted asset to a whitelisted account. + // The transaction should be stored. + txResp := itest.MustSubmitOperations(nonWhitelistedAccount, nonWhitelistedAccountKey, + &txnbuild.Payment{ + Destination: whitelistedAccount.GetAccountID(), + Amount: "10", + Asset: nonWhitelistedAsset, + }, + ) + _, err = itest.Client().TransactionDetail(txResp.Hash) + tt.NoError(err) +} From 4a9961fde3d45cb2a4e5f54cc4d335dc129b5785 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 7 May 2024 15:21:08 -0400 Subject: [PATCH 2/7] Fix failing tests --- services/horizon/internal/ingest/filters/account.go | 4 ++++ services/horizon/internal/ingest/filters/asset.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/services/horizon/internal/ingest/filters/account.go b/services/horizon/internal/ingest/filters/account.go index 325d3c5d29..3b645c6709 100644 --- a/services/horizon/internal/ingest/filters/account.go +++ b/services/horizon/internal/ingest/filters/account.go @@ -45,6 +45,10 @@ func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilter } func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { + if f.IsEmpty(ctx) { + return true, nil + } + participants, err := processors.ParticipantsForTransaction(0, transaction) if err != nil { return false, err diff --git a/services/horizon/internal/ingest/filters/asset.go b/services/horizon/internal/ingest/filters/asset.go index 2e301e9db5..4155f18ffd 100644 --- a/services/horizon/internal/ingest/filters/asset.go +++ b/services/horizon/internal/ingest/filters/asset.go @@ -52,6 +52,10 @@ func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig } func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { + if f.IsEmpty(ctx) { + return true, nil + } + var operations []xdr.Operation if txv1, v1Exists := transaction.Envelope.GetV1(); v1Exists { From 4abef7c8066bcba22506ff9697bdcd45f7215e80 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 7 May 2024 15:48:17 -0400 Subject: [PATCH 3/7] Rename variable --- services/horizon/internal/ingest/group_processors.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index 6a17082582..fcbc83b48d 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -179,7 +179,7 @@ func (g *groupTransactionFilterers) Name() string { } func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) { - include := false + matchAtleastOneFilter := false noFiltersDefined := true for _, f := range g.filterers { @@ -196,12 +196,12 @@ func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx in return false, errors.Wrapf(err, "error in %T.FilterTransaction", f) } g.AddRunDuration(f.Name(), startTime) - include = include || inc + matchAtleastOneFilter = matchAtleastOneFilter || inc } // Transaction is stored only if there are no filtering rules or atleast one of the rules // whitelists the transaction. - if noFiltersDefined || include { + if noFiltersDefined || matchAtleastOneFilter { return true, nil } From 9d13a6eb87764a5b470346b34dbf4dacdb7b3021 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 9 May 2024 16:42:49 -0400 Subject: [PATCH 4/7] rename func name to IsEnabled --- .../internal/ingest/filters/account.go | 6 ++-- .../horizon/internal/ingest/filters/asset.go | 6 ++-- .../internal/ingest/group_processors.go | 28 ++++++++----------- .../internal/ingest/processors/main.go | 2 +- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/services/horizon/internal/ingest/filters/account.go b/services/horizon/internal/ingest/filters/account.go index 3b645c6709..3ca1caa139 100644 --- a/services/horizon/internal/ingest/filters/account.go +++ b/services/horizon/internal/ingest/filters/account.go @@ -45,7 +45,7 @@ func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilter } func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - if f.IsEmpty(ctx) { + if !f.IsEnabled() { return true, nil } @@ -64,7 +64,7 @@ func (f *accountFilter) FilterTransaction(ctx context.Context, transaction inges return false, nil } -func (f accountFilter) IsEmpty(ctx context.Context) bool { +func (f accountFilter) IsEnabled() bool { // filtering is disabled if the whitelist is empty for now, as that is the only filter rule - return len(f.whitelistedAccountsSet) == 0 || !f.enabled + return len(f.whitelistedAccountsSet) >= 1 && f.enabled } diff --git a/services/horizon/internal/ingest/filters/asset.go b/services/horizon/internal/ingest/filters/asset.go index 4155f18ffd..95b353a676 100644 --- a/services/horizon/internal/ingest/filters/asset.go +++ b/services/horizon/internal/ingest/filters/asset.go @@ -52,7 +52,7 @@ func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig } func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - if f.IsEmpty(ctx) { + if !f.IsEnabled() { return true, nil } @@ -144,7 +144,7 @@ func listToSet(list []string) set.Set[string] { return set } -func (f assetFilter) IsEmpty(ctx context.Context) bool { +func (f assetFilter) IsEnabled() bool { // filtering is disabled if the whitelist is empty for now as that is the only filter rule - return len(f.canonicalAssetsLookup) < 1 || !f.enabled + return len(f.canonicalAssetsLookup) >= 1 && f.enabled } diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index fcbc83b48d..f8e76dba3b 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -179,37 +179,33 @@ func (g *groupTransactionFilterers) Name() string { } func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) { - matchAtleastOneFilter := false - noFiltersDefined := true + filtersEnabled := false for _, f := range g.filterers { - if f.IsEmpty(ctx) { + if !f.IsEnabled() { continue } - // This means atleast one filter is defined. - noFiltersDefined = false - + filtersEnabled = true startTime := time.Now() - inc, err := f.FilterTransaction(ctx, tx) + include, err := f.FilterTransaction(ctx, tx) if err != nil { return false, errors.Wrapf(err, "error in %T.FilterTransaction", f) } g.AddRunDuration(f.Name(), startTime) - matchAtleastOneFilter = matchAtleastOneFilter || inc + if include { + return true, nil + } } - // Transaction is stored only if there are no filtering rules or atleast one of the rules - // whitelists the transaction. - if noFiltersDefined || matchAtleastOneFilter { - return true, nil + if filtersEnabled { + g.droppedTransactions++ + return false, nil } - - g.droppedTransactions++ - return false, nil + return true, nil } -func (g *groupTransactionFilterers) IsEmpty(ctx context.Context) bool { +func (g *groupTransactionFilterers) IsEnabled() bool { return len(g.filterers) < 1 } diff --git a/services/horizon/internal/ingest/processors/main.go b/services/horizon/internal/ingest/processors/main.go index de9744aa6c..f46cfefddf 100644 --- a/services/horizon/internal/ingest/processors/main.go +++ b/services/horizon/internal/ingest/processors/main.go @@ -29,7 +29,7 @@ type LedgerTransactionProcessor interface { type LedgerTransactionFilterer interface { Name() string FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) - IsEmpty(ctx context.Context) bool + IsEnabled() bool } func StreamLedgerTransactions( From 944377a7b6a21d9f0ddf6abc9073a237481b2d87 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 May 2024 13:09:18 -0400 Subject: [PATCH 5/7] Refactor to remove interface dependency --- .../internal/ingest/filters/account.go | 14 ++++++------- .../internal/ingest/filters/account_test.go | 12 +++++++---- .../horizon/internal/ingest/filters/asset.go | 12 +++++------ .../internal/ingest/filters/asset_test.go | 14 ++++++------- .../internal/ingest/group_processors.go | 20 ++++++++----------- .../internal/ingest/processors/main.go | 5 ++--- 6 files changed, 38 insertions(+), 39 deletions(-) diff --git a/services/horizon/internal/ingest/filters/account.go b/services/horizon/internal/ingest/filters/account.go index 3ca1caa139..1601314c75 100644 --- a/services/horizon/internal/ingest/filters/account.go +++ b/services/horizon/internal/ingest/filters/account.go @@ -44,27 +44,27 @@ func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilter return nil } -func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - if !f.IsEnabled() { - return true, nil +func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) { + if !f.isEnabled() { + return false, true, nil } participants, err := processors.ParticipantsForTransaction(0, transaction) if err != nil { - return false, err + return true, false, err } // NOTE: this assumes that the participant list has a small memory footprint // otherwise, we should be doing the filtering on the DB side for _, p := range participants { if f.whitelistedAccountsSet.Contains(p.Address()) { - return true, nil + return true, true, nil } } - return false, nil + return true, false, nil } -func (f accountFilter) IsEnabled() bool { +func (f accountFilter) isEnabled() bool { // filtering is disabled if the whitelist is empty for now, as that is the only filter rule return len(f.whitelistedAccountsSet) >= 1 && f.enabled } diff --git a/services/horizon/internal/ingest/filters/account_test.go b/services/horizon/internal/ingest/filters/account_test.go index 1831a6a6e5..17f290a460 100644 --- a/services/horizon/internal/ingest/filters/account_test.go +++ b/services/horizon/internal/ingest/filters/account_test.go @@ -26,11 +26,12 @@ func TestAccountFilterAllowsWhenMatch(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, true) } @@ -47,13 +48,14 @@ func TestAccountFilterAllowsWhenDisabled(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) // there is no match on filter rule, but since filter is disabled, it should allow all + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -70,11 +72,12 @@ func TestAccountFilterAllowsWhenEmptyWhitelist(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -92,11 +95,12 @@ func TestAccountFilterDoesNotAllowWhenNoMatch(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, false) } diff --git a/services/horizon/internal/ingest/filters/asset.go b/services/horizon/internal/ingest/filters/asset.go index 95b353a676..4aa6ebc9b3 100644 --- a/services/horizon/internal/ingest/filters/asset.go +++ b/services/horizon/internal/ingest/filters/asset.go @@ -51,9 +51,9 @@ func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig return nil } -func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - if !f.IsEnabled() { - return true, nil +func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) { + if !f.isEnabled() { + return false, true, nil } var operations []xdr.Operation @@ -67,11 +67,11 @@ func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest. } if f.filterOperationsMatchedOnRules(operations) { - return true, nil + return true, true, nil } logger.Debugf("No match, dropped tx with seq %v ", transaction.Envelope.SeqNum()) - return false, nil + return true, false, nil } func (f assetFilter) filterOperationsMatchedOnRules(operations []xdr.Operation) bool { @@ -144,7 +144,7 @@ func listToSet(list []string) set.Set[string] { return set } -func (f assetFilter) IsEnabled() bool { +func (f assetFilter) isEnabled() bool { // filtering is disabled if the whitelist is empty for now as that is the only filter rule return len(f.canonicalAssetsLookup) >= 1 && f.enabled } diff --git a/services/horizon/internal/ingest/filters/asset_test.go b/services/horizon/internal/ingest/filters/asset_test.go index 3da23bc440..b2b886b976 100644 --- a/services/horizon/internal/ingest/filters/asset_test.go +++ b/services/horizon/internal/ingest/filters/asset_test.go @@ -25,11 +25,11 @@ func TestAssetFilterAllowsOnMatch(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) tt.Equal(result, true) - result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) tt.Equal(result, true) } @@ -47,11 +47,11 @@ func TestAssetFilterAllowsWhenEmptyWhitelist(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) tt.Equal(result, true) - result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) tt.Equal(result, true) } @@ -69,7 +69,7 @@ func TestAssetFilterAllowsWhenDisabled(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) // there was no match on filter rules, but since filter was disabled also, it should allow all tt.Equal(result, true) @@ -89,11 +89,11 @@ func TestAssetFilterDoesNotAllowV1WhenNoMatch(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) tt.Equal(result, false) - result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + _, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) tt.Equal(result, false) } diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index f8e76dba3b..4a6c2e69af 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -178,35 +178,31 @@ func (g *groupTransactionFilterers) Name() string { return "groupTransactionFilterers" } -func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) { +func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, bool, error) { filtersEnabled := false for _, f := range g.filterers { - if !f.IsEnabled() { + startTime := time.Now() + filterEnabled, include, err := f.FilterTransaction(ctx, tx) + if !filterEnabled { continue } filtersEnabled = true - startTime := time.Now() - include, err := f.FilterTransaction(ctx, tx) if err != nil { - return false, errors.Wrapf(err, "error in %T.FilterTransaction", f) + return true, false, errors.Wrapf(err, "error in %T.FilterTransaction", f) } g.AddRunDuration(f.Name(), startTime) if include { - return true, nil + return true, true, nil } } if filtersEnabled { g.droppedTransactions++ - return false, nil + return true, false, nil } - return true, nil -} - -func (g *groupTransactionFilterers) IsEnabled() bool { - return len(g.filterers) < 1 + return false, true, nil } func (g *groupTransactionFilterers) ResetStats() { diff --git a/services/horizon/internal/ingest/processors/main.go b/services/horizon/internal/ingest/processors/main.go index f46cfefddf..5db09d9ef0 100644 --- a/services/horizon/internal/ingest/processors/main.go +++ b/services/horizon/internal/ingest/processors/main.go @@ -28,8 +28,7 @@ type LedgerTransactionProcessor interface { type LedgerTransactionFilterer interface { Name() string - FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) - IsEnabled() bool + FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) } func StreamLedgerTransactions( @@ -48,7 +47,7 @@ func StreamLedgerTransactions( if err != nil { return errors.Wrap(err, "could not read transaction") } - include, err := txFilterer.FilterTransaction(ctx, tx) + _, include, err := txFilterer.FilterTransaction(ctx, tx) if err != nil { return errors.Wrapf( err, From 9f505dc86c3db9a60c139480f2f028999726f0ed Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 May 2024 13:51:39 -0400 Subject: [PATCH 6/7] Add extra assertion to asset_test --- .../internal/ingest/filters/asset_test.go | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/services/horizon/internal/ingest/filters/asset_test.go b/services/horizon/internal/ingest/filters/asset_test.go index b2b886b976..e0deed7771 100644 --- a/services/horizon/internal/ingest/filters/asset_test.go +++ b/services/horizon/internal/ingest/filters/asset_test.go @@ -25,12 +25,14 @@ func TestAssetFilterAllowsOnMatch(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, true) - _, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, true) } @@ -47,12 +49,14 @@ func TestAssetFilterAllowsWhenEmptyWhitelist(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, false) tt.Equal(result, true) - _, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -69,9 +73,10 @@ func TestAssetFilterAllowsWhenDisabled(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) // there was no match on filter rules, but since filter was disabled also, it should allow all + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -89,12 +94,14 @@ func TestAssetFilterDoesNotAllowV1WhenNoMatch(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - _, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, false) - _, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, false) } From c754e9775684fad9324343805b4189634d3032da Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 May 2024 15:22:24 -0400 Subject: [PATCH 7/7] Add changelog entry --- services/horizon/CHANGELOG.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 1d44eb5ec0..7a89844062 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -4,7 +4,12 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased - + +### Breaking Changes + +- Change ingestion filtering logic to store transactions if any filter matches on it. ([5303](https://github.com/stellar/go/pull/5303)) + - The previous behaviour was to store a tx only if both asset and account filters match together. So even if a tx matched an account filter but failed to match an asset filter, it would not be stored by Horizon. + ## 2.30.0 **This release adds support for Protocol 21**