Skip to content

Commit

Permalink
Merge pull request #4955 from aditya1702/remove-ingestion-filtering-flag
Browse files Browse the repository at this point in the history
Add functionality to hide ingestion enabled flag from --help output
  • Loading branch information
aditya1702 authored Jul 24, 2023
2 parents 95c2976 + 075f00a commit c83e4be
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 56 deletions.
8 changes: 4 additions & 4 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var ingestVerifyRangeCmd = &cobra.Command{
}

if ingestVerifyState && !mngr.IsCheckpoint(ingestVerifyTo) {
return fmt.Errorf("`--to` must be a checkpoint ledger when `--verify-state` is set.")
return fmt.Errorf("`--to` must be a checkpoint ledger when `--verify-state` is set")
}

ingestConfig := ingest.Config{
Expand Down Expand Up @@ -276,7 +276,7 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{horizonSession}
historyQ := &history.Q{SessionInterface: horizonSession}
if err := historyQ.UpdateIngestVersion(ctx, 0); err != nil {
return fmt.Errorf("cannot trigger state rebuild: %v", err)
}
Expand All @@ -300,7 +300,7 @@ var ingestInitGenesisStateCmd = &cobra.Command{
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{horizonSession}
historyQ := &history.Q{SessionInterface: horizonSession}

lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(ctx)
if err != nil {
Expand Down Expand Up @@ -372,7 +372,7 @@ var ingestBuildStateCmd = &cobra.Command{
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{horizonSession}
historyQ := &history.Q{SessionInterface: horizonSession}

lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (e ErrUsage) Error() string {
return e.cmd.UsageString()
}

// Indicates we want to exit with a specific error code without printing an error.
// ErrExitCode Indicates we want to exit with a specific error code without printing an error.
type ErrExitCode int

func (e ErrExitCode) Error() string {
Expand Down
27 changes: 23 additions & 4 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ const (
// HistoryArchiveURLsFlagName is the command line flag for specifying the history archive URLs
HistoryArchiveURLsFlagName = "history-archive-urls"
// NetworkFlagName is the command line flag for specifying the "network"
NetworkFlagName = "network"
NetworkFlagName = "network"
EnableIngestionFilteringFlag = "exp-enable-ingestion-filtering"

captiveCoreMigrationHint = "If you are migrating from Horizon 1.x.y, start with the Migration Guide here: https://developers.stellar.org/docs/run-api-server/migrating/"
// StellarPubnet is a constant representing the Stellar public network
Expand Down Expand Up @@ -206,12 +207,28 @@ func Flags() (*Config, support.ConfigOptions) {
ConfigKey: &config.EnableCaptiveCoreIngestion,
},
&support.ConfigOption{
Name: "exp-enable-ingestion-filtering",
Name: EnableIngestionFilteringFlag,
OptType: types.Bool,
FlagDefault: false,
FlagDefault: true,
Required: false,
Usage: "causes Horizon to enable the experimental Ingestion Filtering and the ingestion admin HTTP endpoint at /ingestion/filter",
ConfigKey: &config.EnableIngestionFiltering,
CustomSetValue: func(opt *support.ConfigOption) error {

// Always enable ingestion filtering by default.
config.EnableIngestionFiltering = true

if val := viper.GetString(opt.Name); val != "" {
stdLog.Printf(
"DEPRECATED - No ingestion filter rules are defined by default, which equates to no filtering " +
"of historical data. If you have never added filter rules to this deployment, then nothing further needed. " +
"If you have defined ingestion filter rules prior but disabled filtering overall by setting this flag " +
"disabled with --exp-enable-ingestion-filtering=false, then you should now delete the filter rules using " +
"the Horizon Admin API to achieve the same no-filtering result. Remove usage of this flag in all cases.",
)
}
return nil
},
Hidden: true,
},
&support.ConfigOption{
Name: "captive-core-http-port",
Expand Down Expand Up @@ -799,6 +816,8 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption
config.Ingest = true
}

config.EnableIngestionFiltering = true

if config.Ingest {
// Migrations should be checked as early as possible. Apply and check
// only on ingesting instances which are required to have write-access
Expand Down
102 changes: 102 additions & 0 deletions services/horizon/internal/integration/command_line_args_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package integration

import (
"bytes"
"fmt"
"github.com/spf13/cobra"
horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stretchr/testify/assert"
"io"
stdLog "log"
"os"
"sync"
"testing"
"time"
)

func TestIngestionFilteringAlwaysDefaultingToTrue(t *testing.T) {
t.Run("ingestion filtering flag set to default value", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{})
err := test.StartHorizon()
assert.NoError(t, err)
assert.Equal(t, test.HorizonIngest().Config().EnableIngestionFiltering, true)
test.Shutdown()
})
t.Run("ingestion filtering flag set to false", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{"exp-enable-ingestion-filtering": "false"})
err := test.StartHorizon()
assert.NoError(t, err)
assert.Equal(t, test.HorizonIngest().Config().EnableIngestionFiltering, true)
test.Shutdown()
})
}

func TestDeprecatedOutputForIngestionFilteringFlag(t *testing.T) {
originalStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
stdLog.SetOutput(os.Stderr)

test := NewParameterTest(t, map[string]string{"exp-enable-ingestion-filtering": "false"})
if innerErr := test.StartHorizon(); innerErr != nil {
t.Fatalf("Failed to start Horizon: %v", innerErr)
}

// Use a wait group to wait for the goroutine to finish before proceeding
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := w.Close(); err != nil {
t.Errorf("Failed to close Stdout")
return
}
}()

// Give some time for the goroutine to start
time.Sleep(time.Millisecond)

outputBytes, _ := io.ReadAll(r)
wg.Wait() // Wait for the goroutine to finish before proceeding
_ = r.Close()
os.Stderr = originalStderr

assert.Contains(t, string(outputBytes), "DEPRECATED - No ingestion filter rules are defined by default, which equates to "+
"no filtering of historical data. If you have never added filter rules to this deployment, then nothing further needed. "+
"If you have defined ingestion filter rules prior but disabled filtering overall by setting this flag disabled with "+
"--exp-enable-ingestion-filtering=false, then you should now delete the filter rules using the Horizon Admin API to achieve "+
"the same no-filtering result. Remove usage of this flag in all cases.")
}

func TestHelpOutputForNoIngestionFilteringFlag(t *testing.T) {
config, flags := horizon.Flags()

horizonCmd := &cobra.Command{
Use: "horizon",
Short: "Client-facing api server for the Stellar network",
SilenceErrors: true,
SilenceUsage: true,
Long: "Client-facing API server for the Stellar network.",
RunE: func(cmd *cobra.Command, args []string) error {
_, err := horizon.NewAppFromFlags(config, flags)
if err != nil {
return err
}
return nil
},
}

var writer io.Writer = &bytes.Buffer{}
horizonCmd.SetOutput(writer)

horizonCmd.SetArgs([]string{"-h"})
if err := flags.Init(horizonCmd); err != nil {
fmt.Println(err)
}
if err := horizonCmd.Execute(); err != nil {
fmt.Println(err)
}

output := writer.(*bytes.Buffer).String()
assert.NotContains(t, output, "--exp-enable-ingestion-filtering")
}
55 changes: 29 additions & 26 deletions services/horizon/internal/integration/ingestion_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,55 @@ import (
"github.com/stretchr/testify/assert"
)

func TestFilteringAccountWhiteList(t *testing.T) {
func TestFilteringWithNoFilters(t *testing.T) {
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
"exp-enable-ingestion-filtering": "true",
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

// all assets are allowed by default because the asset filter config is empty.
defaultAllowedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, defaultAllowedAsset)
itest.MustEstablishTrustline(nonWhitelistedAccountKey, nonWhitelistedAccount, defaultAllowedAsset)

// assert that by system default, filters with no rules yet, allow all first
// Assert that by default, the system allows all the accounts.
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: nonWhitelistedAccount.GetAccountID(),
Amount: "10",
Asset: defaultAllowedAsset,
},
)

txResp, err := itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}

func TestFilteringAccountWhiteList(t *testing.T) {
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

// all assets are allowed by default because the asset filter config is empty.
defaultAllowedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, defaultAllowedAsset)
itest.MustEstablishTrustline(nonWhitelistedAccountKey, nonWhitelistedAccount, defaultAllowedAsset)

// Setup a whitelisted account rule, force refresh of filter configs to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)
Expand All @@ -54,7 +70,7 @@ func TestFilteringAccountWhiteList(t *testing.T) {
Whitelist: []string{whitelistedAccount.GetAccountID()},
Enabled: &enabled,
}
err = itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
tt.NoError(err)

accountFilter, err := itest.AdminClient().GetIngestionAccountFilter()
Expand All @@ -67,7 +83,7 @@ func TestFilteringAccountWhiteList(t *testing.T) {
time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second)

// Make sure that when using a non-whitelisted account, the transaction is not stored
txResp = itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: nonWhitelistedAccount.GetAccountID(),
Amount: "10",
Expand All @@ -94,8 +110,7 @@ func TestFilteringAssetWhiteList(t *testing.T) {
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
"exp-enable-ingestion-filtering": "true",
"admin-port": strconv.Itoa(int(adminPort)),
},
})

Expand All @@ -110,18 +125,6 @@ func TestFilteringAssetWhiteList(t *testing.T) {
itest.MustEstablishTrustline(defaultAllowedAccountKey, defaultAllowedAccount, nonWhitelistedAsset)
enabled := true

// assert that by system default, filters with no rules yet, allow all first
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: defaultAllowedAccount.GetAccountID(),
Amount: "10",
Asset: nonWhitelistedAsset,
},
)

_, err := itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)

// Setup a whitelisted asset rule, force refresh of filters to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)

Expand All @@ -144,7 +147,7 @@ func TestFilteringAssetWhiteList(t *testing.T) {
time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second)

// Make sure that when using a non-whitelisted asset, the transaction is not stored
txResp = itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: defaultAllowedAccount.GetAccountID(),
Amount: "10",
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/integration/parameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ type FatalTestCase struct {
suite.Suite
}

func (t *FatalTestCase) Exits(subprocess func()) {
testName := t.T().Name()
func (suite *FatalTestCase) Exits(subprocess func()) {
testName := suite.T().Name()
if os.Getenv("ASSERT_EXISTS_"+testName) == "1" {
subprocess()
return
Expand All @@ -234,12 +234,12 @@ func (t *FatalTestCase) Exits(subprocess func()) {
cmd.Env = append(os.Environ(), "ASSERT_EXISTS_"+testName+"=1")
err := cmd.Run()

t.T().Log("Result:", err)
suite.T().Log("Result:", err)
if e, ok := err.(*exec.ExitError); ok && !e.Success() {
return
}

t.Fail("expecting unsuccessful exit, got", err)
suite.Fail("expecting unsuccessful exit, got", err)
}

// validateNoBucketDirPath ensures the Stellar Core auto-generated configuration
Expand Down
15 changes: 0 additions & 15 deletions services/horizon/internal/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,6 @@ type Test struct {
passPhrase string
}

func NewTestForRemoteHorizon(t *testing.T, horizonURL string, passPhrase string, masterKey *keypair.Full) *Test {
adminClient, err := sdk.NewAdminClient(0, "", 0)
if err != nil {
t.Fatal(err)
}

return &Test{
t: t,
horizonClient: &sdk.Client{HorizonURL: horizonURL},
horizonAdminClient: adminClient,
masterKey: masterKey,
passPhrase: passPhrase,
}
}

// NewTest starts a new environment for integration test at a given
// protocol version and blocks until Horizon starts ingesting.
//
Expand Down
Loading

0 comments on commit c83e4be

Please sign in to comment.