diff --git a/services/horizon/internal/actions_root.go b/services/horizon/internal/actions_root.go index 6ddb043081..0ab53a9455 100644 --- a/services/horizon/internal/actions_root.go +++ b/services/horizon/internal/actions_root.go @@ -22,7 +22,7 @@ func (action *RootAction) JSON() { ledger.CurrentState(), action.App.horizonVersion, action.App.coreVersion, - action.App.networkPassphrase, + action.App.config.NetworkPassphrase, action.App.protocolVersion, action.App.config.FriendbotURL, ) diff --git a/services/horizon/internal/actions_root_test.go b/services/horizon/internal/actions_root_test.go index 504b26929f..2c6be04644 100644 --- a/services/horizon/internal/actions_root_test.go +++ b/services/horizon/internal/actions_root_test.go @@ -4,8 +4,8 @@ import ( "encoding/json" "testing" - "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/services/horizon/internal/test" ) func TestRootAction(t *testing.T) { @@ -23,6 +23,7 @@ func TestRootAction(t *testing.T) { ht.App.horizonVersion = "test-horizon" ht.App.config.StellarCoreURL = server.URL + ht.App.config.NetworkPassphrase = "test" ht.App.UpdateStellarCoreInfo() w := ht.Get("/") diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index c813a0ad92..67c2d27f65 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -5,13 +5,13 @@ import ( "database/sql" "fmt" "net/http" + "os" "runtime" "sync" "time" "github.com/gomodule/redigo/redis" "github.com/rcrowley/go-metrics" - "github.com/stellar/go/build" "github.com/stellar/go/clients/stellarcore" horizonContext "github.com/stellar/go/services/horizon/internal/context" "github.com/stellar/go/services/horizon/internal/db2/core" @@ -32,22 +32,21 @@ import ( // App represents the root of the state of a horizon instance. type App struct { - config Config - web *Web - historyQ *history.Q - coreQ *core.Q - ctx context.Context - cancel func() - redis *redis.Pool - coreVersion string - horizonVersion string - networkPassphrase string - protocolVersion int32 - submitter *txsub.System - paths paths.Finder - ingester *ingest.System - reaper *reap.System - ticks *time.Ticker + config Config + web *Web + historyQ *history.Q + coreQ *core.Q + ctx context.Context + cancel func() + redis *redis.Pool + coreVersion string + horizonVersion string + protocolVersion int32 + submitter *txsub.System + paths paths.Finder + ingester *ingest.System + reaper *reap.System + ticks *time.Ticker // metrics metrics metrics.Registry @@ -64,7 +63,6 @@ func NewApp(config Config) (*App, error) { result := &App{config: config} result.horizonVersion = app.Version() - result.networkPassphrase = build.TestNetwork.Passphrase result.ticks = time.NewTicker(1 * time.Second) result.init() return result, nil @@ -270,8 +268,18 @@ func (a *App) UpdateStellarCoreInfo() { return } + // Check if NetworkPassphrase is different, if so exit Horizon as it can break the + // state of the application. + if resp.Info.Network != a.config.NetworkPassphrase { + log.Errorf( + "Network passphrase of stellar-core (%s) does not match Horizon configuration (%s). Exiting...", + resp.Info.Network, + a.config.NetworkPassphrase, + ) + os.Exit(1) + } + a.coreVersion = resp.Info.Build - a.networkPassphrase = resp.Info.Network a.protocolVersion = int32(resp.Info.ProtocolVersion) } diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index bbff804403..aefe06c6d4 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -21,6 +21,7 @@ type Config struct { RateLimit *throttled.RateQuota RateLimitRedisKey string RedisURL string + NetworkPassphrase string FriendbotURL *url.URL LogLevel logrus.Level LogFile string diff --git a/services/horizon/internal/helpers_test.go b/services/horizon/internal/helpers_test.go index 245c915a19..5f6356aeb9 100644 --- a/services/horizon/internal/helpers_test.go +++ b/services/horizon/internal/helpers_test.go @@ -7,6 +7,7 @@ import ( "log" "time" + "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal/test" supportLog "github.com/stellar/go/support/log" "github.com/throttled/throttled" @@ -32,6 +33,7 @@ func NewTestConfig() Config { }, ConnectionTimeout: 55 * time.Second, // Default LogLevel: supportLog.InfoLevel, + NetworkPassphrase: network.TestNetworkPassphrase, } } diff --git a/services/horizon/internal/init_ingester.go b/services/horizon/internal/init_ingester.go index 7e6c0a16a6..2d33d987c1 100644 --- a/services/horizon/internal/init_ingester.go +++ b/services/horizon/internal/init_ingester.go @@ -11,12 +11,12 @@ func initIngester(app *App) { return } - if app.networkPassphrase == "" { + if app.config.NetworkPassphrase == "" { log.Fatal("Cannot start ingestion without network passphrase. Please confirm connectivity with stellar-core.") } app.ingester = ingest.New( - app.networkPassphrase, + app.config.NetworkPassphrase, app.config.StellarCoreURL, app.CoreSession(nil), app.HorizonSession(nil), diff --git a/services/horizon/internal/init_txsub.go b/services/horizon/internal/init_txsub.go index 68963b0d61..cff314c56b 100644 --- a/services/horizon/internal/init_txsub.go +++ b/services/horizon/internal/init_txsub.go @@ -22,7 +22,7 @@ func initSubmissionSystem(app *App) { History: &history.Q{Session: app.HorizonSession(nil)}, }, Sequences: cq.SequenceProvider(), - NetworkPassphrase: app.networkPassphrase, + NetworkPassphrase: app.config.NetworkPassphrase, } } diff --git a/services/horizon/internal/txsub/open_submission_list.go b/services/horizon/internal/txsub/open_submission_list.go index b38a5a6bab..43c066e942 100644 --- a/services/horizon/internal/txsub/open_submission_list.go +++ b/services/horizon/internal/txsub/open_submission_list.go @@ -29,7 +29,7 @@ type openSubmission struct { type submissionList struct { sync.Mutex - submissions map[string]*openSubmission + submissions map[string]*openSubmission // hash => `*openSubmission` log *log.Entry } diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 4a56e7cbc9..181a49f088 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -15,9 +15,10 @@ import ( // Its methods tie together the various pieces used to reliably submit transactions // to a stellar-core instance. type System struct { - initializer sync.Once - tickInProgress bool + initializer sync.Once + tickMutex sync.Mutex + tickInProgress bool Pending OpenSubmissionList Results ResultProvider @@ -73,12 +74,20 @@ func (sys *System) Submit(ctx context.Context, env string) (result <-chan Result // check the configured result provider for an existing result r := sys.Results.ResultByHash(ctx, info.Hash) - if r.Err != ErrNoResults { + if r.Err == nil { sys.Log.Ctx(ctx).WithField("hash", info.Hash).Info("Found submission result in a DB") sys.finish(ctx, response, r) return } + if r.Err != ErrNoResults { + sys.Log.Ctx(ctx).WithField("hash", info.Hash).Info("Error getting submission result from a DB") + sys.finish(ctx, response, r) + return + } + + // From now: r.Err == ErrNoResults + curSeq, err := sys.Sequences.Get([]string{info.SourceAddress}) if err != nil { sys.finish(ctx, response, Result{Err: err, EnvelopeXDR: env}) @@ -170,25 +179,40 @@ func (sys *System) submitOnce(ctx context.Context, env string) SubmissionResult return sr } +// setTickInProgress sets `tickInProgress` to `true` if it's not +// `false`. Returns `true` if `tickInProgress` has been switched +// to `true` inside this method and `Tick()` should continue. +func (sys *System) setTickInProgress(ctx context.Context) bool { + sys.tickMutex.Lock() + defer sys.tickMutex.Unlock() + + if sys.tickInProgress { + logger := log.Ctx(ctx) + logger.Info("ticking in progress") + return false + } + + sys.tickInProgress = true + return true +} + +func (sys *System) unsetTickInProgress() { + sys.tickMutex.Lock() + defer sys.tickMutex.Unlock() + sys.tickInProgress = false +} + // Tick triggers the system to update itself with any new data available. func (sys *System) Tick(ctx context.Context) { sys.Init() logger := log.Ctx(ctx) // Make sure Tick is not run concurrently - sys.tickMutex.Lock() - if sys.tickInProgress { - logger.Debug("ticking in progress") + if !sys.setTickInProgress(ctx) { return } - sys.tickInProgress = true - sys.tickMutex.Unlock() - defer func() { - sys.tickMutex.Lock() - sys.tickInProgress = false - sys.tickMutex.Unlock() - }() + defer sys.unsetTickInProgress() logger. WithField("queued", sys.SubmissionQueue.String()). @@ -199,6 +223,7 @@ func (sys *System) Tick(ctx context.Context) { curSeq, err := sys.Sequences.Get(addys) if err != nil { logger.WithStack(err).Error(err) + return } else { sys.SubmissionQueue.Update(curSeq) } @@ -229,6 +254,7 @@ func (sys *System) Tick(ctx context.Context) { stillOpen, err := sys.Pending.Clean(ctx, sys.SubmissionTimeout) if err != nil { logger.WithStack(err).Error(err) + return } sys.Metrics.OpenSubmissionsGauge.Update(int64(stillOpen)) diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 5257e15402..3276a6a78f 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -3,6 +3,7 @@ package txsub import ( "context" "errors" + "fmt" "testing" "time" @@ -52,9 +53,9 @@ func (suite *SystemTestSuite) SetupTest() { Err: ErrBadSequence, } - suite.sequences.Results = map[string]uint64{ - "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H": 0, - } + suite.sequences.On("Get", []string{"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"}). + Return(map[string]uint64{"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H": 0}, nil). + Once() } // Returns the result provided by the ResultProvider. @@ -117,6 +118,48 @@ func (suite *SystemTestSuite) TestTick_Noop() { suite.system.Tick(suite.ctx) } +// TestTick_Deadlock is a regression test for Tick() deadlock: if for any reason +// call to Tick() takes more time and another Tick() is called. +// This test starts two go routines: both calling Tick() but the call to +// `sys.Sequences.Get(addys)` is delayed by 1 second. It allows to simulate two +// calls to `Tick()` executed at the same time. +func (suite *SystemTestSuite) TestTick_Deadlock() { + secondDone := make(chan bool, 1) + testDone := make(chan bool) + + go func() { + select { + case <-secondDone: + // OK! + case <-time.After(5 * time.Second): + assert.Fail(suite.T(), "Timeout, likely a deadlock in Tick()") + } + + testDone <- true + }() + + // Start first Tick + suite.system.SubmissionQueue.Push("address", 0) + // Configure suite.sequences to return after 1 second in a first call + suite.sequences.On("Get", []string{"address"}).After(time.Second).Return(map[string]uint64{}, nil) + + go func() { + fmt.Println("Starting first Tick()") + suite.system.Tick(suite.ctx) + fmt.Println("Finished first Tick()") + }() + + go func() { + // Start second Tick - should be deadlocked if mutex is not Unlock()'ed. + fmt.Println("Starting second Tick()") + suite.system.Tick(suite.ctx) + fmt.Println("Finished second Tick()") + secondDone <- true + }() + + <-testDone +} + // Test that Tick finishes any available transactions, func (suite *SystemTestSuite) TestTick_FinishesTransactions() { l := make(chan Result, 1) diff --git a/services/horizon/internal/txsub/test_helpers.go b/services/horizon/internal/txsub/test_helpers.go index 00b39791ac..2f866b3e1f 100644 --- a/services/horizon/internal/txsub/test_helpers.go +++ b/services/horizon/internal/txsub/test_helpers.go @@ -8,6 +8,8 @@ package txsub import ( "context" + + "github.com/stretchr/testify/mock" ) // MockSubmitter is a test helper that simplements the Submitter interface @@ -43,11 +45,11 @@ func (results *MockResultProvider) ResultByHash(ctx context.Context, hash string // MockSequenceProvider is a test helper that simplements the SequenceProvider // interface type MockSequenceProvider struct { - Results map[string]uint64 - Err error + mock.Mock } // Get implements `txsub.SequenceProvider` -func (results *MockSequenceProvider) Get(addresses []string) (map[string]uint64, error) { - return results.Results, results.Err +func (o *MockSequenceProvider) Get(addresses []string) (map[string]uint64, error) { + args := o.Called(addresses) + return args.Get(0).(map[string]uint64), args.Error(1) } diff --git a/services/horizon/main.go b/services/horizon/main.go index 258e78fcc3..022e2ab809 100644 --- a/services/horizon/main.go +++ b/services/horizon/main.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal" "github.com/stellar/go/support/log" "github.com/throttled/throttled" @@ -180,7 +181,7 @@ func init() { rootCmd.PersistentFlags().String( "network-passphrase", - "", + network.TestNetworkPassphrase, "Override the network passphrase", ) @@ -239,6 +240,10 @@ func initConfig() { stdLog.Fatal("Invalid config: stellar-core-url is blank. Please specify --stellar-core-url on the command line or set the STELLAR_CORE_URL environment variable.") } + if viper.GetString("network-passphrase") == "" { + stdLog.Fatal("Invalid config: network-passphrase is blank. Please specify --network-passphrase on the command line or set the NETWORK_PASSPHRASE environment variable.") + } + ll, err := logrus.ParseLevel(viper.GetString("log-level")) if err != nil { @@ -299,6 +304,7 @@ func initConfig() { LogLevel: ll, LogFile: lf, MaxPathLength: uint(viper.GetInt("max-path-length")), + NetworkPassphrase: viper.GetString("network-passphrase"), SentryDSN: viper.GetString("sentry-dsn"), LogglyToken: viper.GetString("loggly-token"), LogglyTag: viper.GetString("loggly-tag"),