Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to hide ingestion enabled flag from --help output #4955

Merged
merged 19 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var ingestVerifyRangeCmd = &cobra.Command{
}

if ingestVerifyState && !mngr.IsCheckpoint(ingestVerifyTo) {
return fmt.Errorf("`--to` must be a checkpoint ledger when `--verify-state` is set.")
return fmt.Errorf("`--to` must be a checkpoint ledger when `--verify-state` is set")
}

ingestConfig := ingest.Config{
Expand Down Expand Up @@ -276,7 +276,7 @@ var ingestTriggerStateRebuildCmd = &cobra.Command{
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{horizonSession}
historyQ := &history.Q{SessionInterface: horizonSession}
if err := historyQ.UpdateIngestVersion(ctx, 0); err != nil {
return fmt.Errorf("cannot trigger state rebuild: %v", err)
}
Expand All @@ -300,7 +300,7 @@ var ingestInitGenesisStateCmd = &cobra.Command{
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{horizonSession}
historyQ := &history.Q{SessionInterface: horizonSession}

lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(ctx)
if err != nil {
Expand Down Expand Up @@ -372,7 +372,7 @@ var ingestBuildStateCmd = &cobra.Command{
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

historyQ := &history.Q{horizonSession}
historyQ := &history.Q{SessionInterface: horizonSession}

lastIngestedLedger, err := historyQ.GetLastLedgerIngestNonBlocking(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (e ErrUsage) Error() string {
return e.cmd.UsageString()
}

// Indicates we want to exit with a specific error code without printing an error.
// ErrExitCode Indicates we want to exit with a specific error code without printing an error.
type ErrExitCode int

func (e ErrExitCode) Error() string {
Expand Down
27 changes: 23 additions & 4 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ const (
// HistoryArchiveURLsFlagName is the command line flag for specifying the history archive URLs
HistoryArchiveURLsFlagName = "history-archive-urls"
// NetworkFlagName is the command line flag for specifying the "network"
NetworkFlagName = "network"
NetworkFlagName = "network"
EnableIngestionFilteringFlag = "exp-enable-ingestion-filtering"

captiveCoreMigrationHint = "If you are migrating from Horizon 1.x.y, start with the Migration Guide here: https://developers.stellar.org/docs/run-api-server/migrating/"
// StellarPubnet is a constant representing the Stellar public network
Expand Down Expand Up @@ -206,12 +207,28 @@ func Flags() (*Config, support.ConfigOptions) {
ConfigKey: &config.EnableCaptiveCoreIngestion,
},
&support.ConfigOption{
Name: "exp-enable-ingestion-filtering",
Name: EnableIngestionFilteringFlag,
OptType: types.Bool,
FlagDefault: false,
FlagDefault: true,
Required: false,
Usage: "causes Horizon to enable the experimental Ingestion Filtering and the ingestion admin HTTP endpoint at /ingestion/filter",
ConfigKey: &config.EnableIngestionFiltering,
CustomSetValue: func(opt *support.ConfigOption) error {

// Always enable ingestion filtering by default.
config.EnableIngestionFiltering = true

if val := viper.GetString(opt.Name); val != "" {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
stdLog.Printf(
"DEPRECATED - No ingestion filter rules are defined by default, which equates to no filtering " +
"of historical data. If you have never added filter rules to this deployment, then nothing further needed. " +
"If you have defined ingestion filter rules prior but disabled filtering overall by setting this flag " +
"disabled with --exp-enable-ingestion-filtering=false, then you should now delete the filter rules using " +
"the Horizon Admin API to achieve the same no-filtering result. Remove usage of this flag in all cases.",
)
}
return nil
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
},
Hidden: true,
},
&support.ConfigOption{
Name: "captive-core-http-port",
Expand Down Expand Up @@ -799,6 +816,8 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption
config.Ingest = true
}

config.EnableIngestionFiltering = true
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved

if config.Ingest {
// Migrations should be checked as early as possible. Apply and check
// only on ingesting instances which are required to have write-access
Expand Down
102 changes: 102 additions & 0 deletions services/horizon/internal/integration/command_line_args_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package integration

import (
"bytes"
"fmt"
"github.com/spf13/cobra"

Check failure on line 6 in services/horizon/internal/integration/command_line_args_test.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed with -local github.com/golangci/golangci-lint (goimports)
horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stretchr/testify/assert"
"io"
stdLog "log"
"os"
"sync"
"testing"
"time"
)

func TestIngestionFilteringAlwaysDefaultingToTrue(t *testing.T) {
t.Run("ingestion filtering flag set to default value", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{})
err := test.StartHorizon()
assert.NoError(t, err)
assert.Equal(t, test.HorizonIngest().Config().EnableIngestionFiltering, true)
test.Shutdown()
})
t.Run("ingestion filtering flag set to false", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{"exp-enable-ingestion-filtering": "false"})
err := test.StartHorizon()
assert.NoError(t, err)
assert.Equal(t, test.HorizonIngest().Config().EnableIngestionFiltering, true)
test.Shutdown()
})
}

func TestDeprecatedOutputForIngestionFilteringFlag(t *testing.T) {
originalStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stderr = w
stdLog.SetOutput(os.Stderr)

test := NewParameterTest(t, map[string]string{"exp-enable-ingestion-filtering": "false"})
if innerErr := test.StartHorizon(); innerErr != nil {
t.Fatalf("Failed to start Horizon: %v", innerErr)
}

// Use a wait group to wait for the goroutine to finish before proceeding
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := w.Close(); err != nil {
t.Errorf("Failed to close Stdout")
return
}
}()

// Give some time for the goroutine to start
time.Sleep(time.Millisecond)

outputBytes, _ := io.ReadAll(r)
wg.Wait() // Wait for the goroutine to finish before proceeding
_ = r.Close()
os.Stderr = originalStderr

assert.Contains(t, string(outputBytes), "DEPRECATED - No ingestion filter rules are defined by default, which equates to "+
"no filtering of historical data. If you have never added filter rules to this deployment, then nothing further needed. "+
"If you have defined ingestion filter rules prior but disabled filtering overall by setting this flag disabled with "+
"--exp-enable-ingestion-filtering=false, then you should now delete the filter rules using the Horizon Admin API to achieve "+
"the same no-filtering result. Remove usage of this flag in all cases.")
}

func TestHelpOutputForNoIngestionFilteringFlag(t *testing.T) {
config, flags := horizon.Flags()

horizonCmd := &cobra.Command{
Use: "horizon",
Short: "Client-facing api server for the Stellar network",
SilenceErrors: true,
SilenceUsage: true,
Long: "Client-facing API server for the Stellar network.",
RunE: func(cmd *cobra.Command, args []string) error {
_, err := horizon.NewAppFromFlags(config, flags)
if err != nil {
return err
}
return nil
},
}

var writer io.Writer = &bytes.Buffer{}
horizonCmd.SetOutput(writer)

horizonCmd.SetArgs([]string{"-h"})
if err := flags.Init(horizonCmd); err != nil {
fmt.Println(err)
}
if err := horizonCmd.Execute(); err != nil {
fmt.Println(err)
}

output := writer.(*bytes.Buffer).String()
assert.NotContains(t, output, "--exp-enable-ingestion-filtering")
}
55 changes: 29 additions & 26 deletions services/horizon/internal/integration/ingestion_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,55 @@ import (
"github.com/stretchr/testify/assert"
)

func TestFilteringAccountWhiteList(t *testing.T) {
func TestFilteringWithNoFilters(t *testing.T) {
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
"exp-enable-ingestion-filtering": "true",
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

// all assets are allowed by default because the asset filter config is empty.
defaultAllowedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, defaultAllowedAsset)
itest.MustEstablishTrustline(nonWhitelistedAccountKey, nonWhitelistedAccount, defaultAllowedAsset)

// assert that by system default, filters with no rules yet, allow all first
// Assert that by default, the system allows all the accounts.
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: nonWhitelistedAccount.GetAccountID(),
Amount: "10",
Asset: defaultAllowedAsset,
},
)

txResp, err := itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}

func TestFilteringAccountWhiteList(t *testing.T) {
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

// all assets are allowed by default because the asset filter config is empty.
defaultAllowedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, defaultAllowedAsset)
itest.MustEstablishTrustline(nonWhitelistedAccountKey, nonWhitelistedAccount, defaultAllowedAsset)

// Setup a whitelisted account rule, force refresh of filter configs to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)
Expand All @@ -54,7 +70,7 @@ func TestFilteringAccountWhiteList(t *testing.T) {
Whitelist: []string{whitelistedAccount.GetAccountID()},
Enabled: &enabled,
}
err = itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
tt.NoError(err)

accountFilter, err := itest.AdminClient().GetIngestionAccountFilter()
Expand All @@ -67,7 +83,7 @@ func TestFilteringAccountWhiteList(t *testing.T) {
time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second)

// Make sure that when using a non-whitelisted account, the transaction is not stored
txResp = itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: nonWhitelistedAccount.GetAccountID(),
Amount: "10",
Expand All @@ -94,8 +110,7 @@ func TestFilteringAssetWhiteList(t *testing.T) {
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
"exp-enable-ingestion-filtering": "true",
"admin-port": strconv.Itoa(int(adminPort)),
},
})

Expand All @@ -110,18 +125,6 @@ func TestFilteringAssetWhiteList(t *testing.T) {
itest.MustEstablishTrustline(defaultAllowedAccountKey, defaultAllowedAccount, nonWhitelistedAsset)
enabled := true

// assert that by system default, filters with no rules yet, allow all first
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: defaultAllowedAccount.GetAccountID(),
Amount: "10",
Asset: nonWhitelistedAsset,
},
)

_, err := itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)

// Setup a whitelisted asset rule, force refresh of filters to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)

Expand All @@ -144,7 +147,7 @@ func TestFilteringAssetWhiteList(t *testing.T) {
time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second)

// Make sure that when using a non-whitelisted asset, the transaction is not stored
txResp = itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
txResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(),
&txnbuild.Payment{
Destination: defaultAllowedAccount.GetAccountID(),
Amount: "10",
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/integration/parameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ type FatalTestCase struct {
suite.Suite
}

func (t *FatalTestCase) Exits(subprocess func()) {
testName := t.T().Name()
func (suite *FatalTestCase) Exits(subprocess func()) {
testName := suite.T().Name()
if os.Getenv("ASSERT_EXISTS_"+testName) == "1" {
subprocess()
return
Expand All @@ -234,12 +234,12 @@ func (t *FatalTestCase) Exits(subprocess func()) {
cmd.Env = append(os.Environ(), "ASSERT_EXISTS_"+testName+"=1")
err := cmd.Run()

t.T().Log("Result:", err)
suite.T().Log("Result:", err)
if e, ok := err.(*exec.ExitError); ok && !e.Success() {
return
}

t.Fail("expecting unsuccessful exit, got", err)
suite.Fail("expecting unsuccessful exit, got", err)
}

// validateNoBucketDirPath ensures the Stellar Core auto-generated configuration
Expand Down
15 changes: 0 additions & 15 deletions services/horizon/internal/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,6 @@ type Test struct {
passPhrase string
}

func NewTestForRemoteHorizon(t *testing.T, horizonURL string, passPhrase string, masterKey *keypair.Full) *Test {
adminClient, err := sdk.NewAdminClient(0, "", 0)
if err != nil {
t.Fatal(err)
}

return &Test{
t: t,
horizonClient: &sdk.Client{HorizonURL: horizonURL},
horizonAdminClient: adminClient,
masterKey: masterKey,
passPhrase: passPhrase,
}
}

// NewTest starts a new environment for integration test at a given
// protocol version and blocks until Horizon starts ingesting.
//
Expand Down
Loading
Loading