Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion filtering should use OR logic for rules rather than AND #5303

Merged
merged 13 commits into from
May 10, 2024
Merged
20 changes: 12 additions & 8 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,26 @@ func NewAccountFilter() AccountFilter {
}
}

func (filter *accountFilter) Name() string {
func (f *accountFilter) Name() string {
return "filters.accountFilter"
}

func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
// only need to re-initialize the filter config state(rules) if its cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
if filterConfig.LastModified > f.lastModified {
logger.Infof("New Account Filter config detected, reloading new config %v ", *filterConfig)

filter.enabled = filterConfig.Enabled
filter.whitelistedAccountsSet = listToSet(filterConfig.Whitelist)
filter.lastModified = filterConfig.LastModified
f.enabled = filterConfig.Enabled
f.whitelistedAccountsSet = listToSet(filterConfig.Whitelist)
f.lastModified = filterConfig.LastModified
}

return nil
}

func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// filtering is disabled if the whitelist is empty for now, as that is the only filter rule
if len(f.whitelistedAccountsSet) == 0 || !f.enabled {
if !f.IsEnabled() {
return true, nil
}

Expand All @@ -64,3 +63,8 @@ func (f *accountFilter) FilterTransaction(ctx context.Context, transaction inges
}
return false, nil
}

func (f accountFilter) IsEnabled() bool {
// filtering is disabled if the whitelist is empty for now, as that is the only filter rule
return len(f.whitelistedAccountsSet) >= 1 && f.enabled
}
20 changes: 12 additions & 8 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,25 @@ func NewAssetFilter() AssetFilter {
}
}

func (filter *assetFilter) Name() string {
func (f *assetFilter) Name() string {
return "filters.assetFilter"
}

func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
if filterConfig.LastModified > f.lastModified {
logger.Infof("New Asset Filter config detected, reloading new config %v ", *filterConfig)
filter.enabled = filterConfig.Enabled
filter.canonicalAssetsLookup = listToSet(filterConfig.Whitelist)
filter.lastModified = filterConfig.LastModified
f.enabled = filterConfig.Enabled
f.canonicalAssetsLookup = listToSet(filterConfig.Whitelist)
f.lastModified = filterConfig.LastModified
}

return nil
}

func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// filtering is disabled if the whitelist is empty for now as that is the only filter rule
if len(f.canonicalAssetsLookup) < 1 || !f.enabled {
if !f.IsEnabled() {
return true, nil
}

Expand Down Expand Up @@ -144,3 +143,8 @@ func listToSet(list []string) set.Set[string] {
}
return set
}

func (f assetFilter) IsEnabled() bool {
// filtering is disabled if the whitelist is empty for now as that is the only filter rule
return len(f.canonicalAssetsLookup) >= 1 && f.enabled
}
22 changes: 18 additions & 4 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,36 @@ func (g *groupTransactionFilterers) Name() string {
}

func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) {
filtersEnabled := false

for _, f := range g.filterers {
if !f.IsEnabled() {
continue
}

filtersEnabled = true
startTime := time.Now()
include, err := f.FilterTransaction(ctx, tx)
if err != nil {
return false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
}
g.AddRunDuration(f.Name(), startTime)
if !include {
// filter out, we can return early
g.droppedTransactions++
return false, nil
if include {
return true, nil
}
}

if filtersEnabled {
g.droppedTransactions++
return false, nil
}
return true, nil
}

func (g *groupTransactionFilterers) IsEnabled() bool {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
return len(g.filterers) < 1
}

func (g *groupTransactionFilterers) ResetStats() {
g.droppedTransactions = 0
g.runDurations = make(map[string]time.Duration)
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/processors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type LedgerTransactionProcessor interface {
type LedgerTransactionFilterer interface {
Name() string
FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error)
IsEnabled() bool
}

func StreamLedgerTransactions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/clients/horizonclient"
hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/services/horizon/internal/test/integration"
"github.com/stellar/go/txnbuild"
"github.com/stretchr/testify/assert"
)

func TestFilteringWithNoFilters(t *testing.T) {
Expand Down Expand Up @@ -168,3 +169,67 @@ func TestFilteringAssetWhiteList(t *testing.T) {
_, err = itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}

func TestFilteringAssetAndAccountFilters(t *testing.T) {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

whitelistedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
nonWhitelistedAsset := txnbuild.CreditAsset{Code: "SEK", Issuer: nonWhitelistedAccountKey.Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, nonWhitelistedAsset)

// Setup whitelisted account and asset rule, force refresh of filter configs to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)

expectedAccountFilter := hProtocol.AccountFilterConfig{
Whitelist: []string{whitelistedAccount.GetAccountID()},
Enabled: &enabled,
}
err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
tt.NoError(err)
accountFilter, err := itest.AdminClient().GetIngestionAccountFilter()
tt.NoError(err)
tt.ElementsMatch(expectedAccountFilter.Whitelist, accountFilter.Whitelist)
tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled)

asset, err := whitelistedAsset.ToXDR()
tt.NoError(err)
expectedAssetFilter := hProtocol.AssetFilterConfig{
Whitelist: []string{asset.StringCanonical()},
Enabled: &enabled,
}
err = itest.AdminClient().SetIngestionAssetFilter(expectedAssetFilter)
tt.NoError(err)
assetFilter, err := itest.AdminClient().GetIngestionAssetFilter()
tt.NoError(err)

tt.ElementsMatch(expectedAssetFilter.Whitelist, assetFilter.Whitelist)
tt.Equal(expectedAssetFilter.Enabled, assetFilter.Enabled)

// Ensure the latest filter configs are reloaded by the ingestion state machine processor
time.Sleep(filters.GetFilterConfigCheckIntervalSeconds() * time.Second)

// Use a non-whitelisted account to submit a non-whitelisted asset to a whitelisted account.
// The transaction should be stored.
txResp := itest.MustSubmitOperations(nonWhitelistedAccount, nonWhitelistedAccountKey,
&txnbuild.Payment{
Destination: whitelistedAccount.GetAccountID(),
Amount: "10",
Asset: nonWhitelistedAsset,
},
)
_, err = itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}
Loading