Skip to content

Commit

Permalink
Fix deadlock in txsub.System.Tick() and tx_bad_seq errors (#815)
Browse files Browse the repository at this point in the history
This commit fixes two issues in `txsub` package. First, issue was a
deadlock in `Tick()` function:

```go
sys.tickMutex.Lock()
if sys.tickInProgress {
  logger.Debug("ticking in progress")
  return
}
sys.tickInProgress = true
sys.tickMutex.Unlock()
```

When two `Tick()` methods were called simultanously one of them could
`Lock()` the mutex without calling `Unlock()` properly. This could
basically break the `txsub` system as sequence number could not be
updated.

The second issue was a problem with `App` initialization.
`App.networkPassphrase` was set to test network passphrase by default.
If `txsub` system was initialized before updating the network passphrase
using connected `stellar-core`, the system would not be able to
correctly find the transaction result in a database as calculated hash
was different than the actual hash of the transaction.
  • Loading branch information
bartekn authored Jan 17, 2019
1 parent 81f17f7 commit 1a536e3
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 46 deletions.
2 changes: 1 addition & 1 deletion services/horizon/internal/actions_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (action *RootAction) JSON() {
ledger.CurrentState(),
action.App.horizonVersion,
action.App.coreVersion,
action.App.networkPassphrase,
action.App.config.NetworkPassphrase,
action.App.protocolVersion,
action.App.config.FriendbotURL,
)
Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/actions_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"encoding/json"
"testing"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestRootAction(t *testing.T) {
Expand All @@ -23,6 +23,7 @@ func TestRootAction(t *testing.T) {

ht.App.horizonVersion = "test-horizon"
ht.App.config.StellarCoreURL = server.URL
ht.App.config.NetworkPassphrase = "test"
ht.App.UpdateStellarCoreInfo()

w := ht.Get("/")
Expand Down
46 changes: 27 additions & 19 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"database/sql"
"fmt"
"net/http"
"os"
"runtime"
"sync"
"time"

"github.com/gomodule/redigo/redis"
"github.com/rcrowley/go-metrics"
"github.com/stellar/go/build"
"github.com/stellar/go/clients/stellarcore"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2/core"
Expand All @@ -32,22 +32,21 @@ import (

// App represents the root of the state of a horizon instance.
type App struct {
config Config
web *Web
historyQ *history.Q
coreQ *core.Q
ctx context.Context
cancel func()
redis *redis.Pool
coreVersion string
horizonVersion string
networkPassphrase string
protocolVersion int32
submitter *txsub.System
paths paths.Finder
ingester *ingest.System
reaper *reap.System
ticks *time.Ticker
config Config
web *Web
historyQ *history.Q
coreQ *core.Q
ctx context.Context
cancel func()
redis *redis.Pool
coreVersion string
horizonVersion string
protocolVersion int32
submitter *txsub.System
paths paths.Finder
ingester *ingest.System
reaper *reap.System
ticks *time.Ticker

// metrics
metrics metrics.Registry
Expand All @@ -64,7 +63,6 @@ func NewApp(config Config) (*App, error) {

result := &App{config: config}
result.horizonVersion = app.Version()
result.networkPassphrase = build.TestNetwork.Passphrase
result.ticks = time.NewTicker(1 * time.Second)
result.init()
return result, nil
Expand Down Expand Up @@ -270,8 +268,18 @@ func (a *App) UpdateStellarCoreInfo() {
return
}

// Check if NetworkPassphrase is different, if so exit Horizon as it can break the
// state of the application.
if resp.Info.Network != a.config.NetworkPassphrase {
log.Errorf(
"Network passphrase of stellar-core (%s) does not match Horizon configuration (%s). Exiting...",
resp.Info.Network,
a.config.NetworkPassphrase,
)
os.Exit(1)
}

a.coreVersion = resp.Info.Build
a.networkPassphrase = resp.Info.Network
a.protocolVersion = int32(resp.Info.ProtocolVersion)
}

Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
RateLimit *throttled.RateQuota
RateLimitRedisKey string
RedisURL string
NetworkPassphrase string
FriendbotURL *url.URL
LogLevel logrus.Level
LogFile string
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"time"

"github.com/stellar/go/network"
"github.com/stellar/go/services/horizon/internal/test"
supportLog "github.com/stellar/go/support/log"
"github.com/throttled/throttled"
Expand All @@ -32,6 +33,7 @@ func NewTestConfig() Config {
},
ConnectionTimeout: 55 * time.Second, // Default
LogLevel: supportLog.InfoLevel,
NetworkPassphrase: network.TestNetworkPassphrase,
}
}

Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/init_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ func initIngester(app *App) {
return
}

if app.networkPassphrase == "" {
if app.config.NetworkPassphrase == "" {
log.Fatal("Cannot start ingestion without network passphrase. Please confirm connectivity with stellar-core.")
}

app.ingester = ingest.New(
app.networkPassphrase,
app.config.NetworkPassphrase,
app.config.StellarCoreURL,
app.CoreSession(nil),
app.HorizonSession(nil),
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/init_txsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func initSubmissionSystem(app *App) {
History: &history.Q{Session: app.HorizonSession(nil)},
},
Sequences: cq.SequenceProvider(),
NetworkPassphrase: app.networkPassphrase,
NetworkPassphrase: app.config.NetworkPassphrase,
}
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/open_submission_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type openSubmission struct {

type submissionList struct {
sync.Mutex
submissions map[string]*openSubmission
submissions map[string]*openSubmission // hash => `*openSubmission`
log *log.Entry
}

Expand Down
52 changes: 39 additions & 13 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
// Its methods tie together the various pieces used to reliably submit transactions
// to a stellar-core instance.
type System struct {
initializer sync.Once
tickInProgress bool
initializer sync.Once

tickMutex sync.Mutex
tickInProgress bool

Pending OpenSubmissionList
Results ResultProvider
Expand Down Expand Up @@ -73,12 +74,20 @@ func (sys *System) Submit(ctx context.Context, env string) (result <-chan Result
// check the configured result provider for an existing result
r := sys.Results.ResultByHash(ctx, info.Hash)

if r.Err != ErrNoResults {
if r.Err == nil {
sys.Log.Ctx(ctx).WithField("hash", info.Hash).Info("Found submission result in a DB")
sys.finish(ctx, response, r)
return
}

if r.Err != ErrNoResults {
sys.Log.Ctx(ctx).WithField("hash", info.Hash).Info("Error getting submission result from a DB")
sys.finish(ctx, response, r)
return
}

// From now: r.Err == ErrNoResults

curSeq, err := sys.Sequences.Get([]string{info.SourceAddress})
if err != nil {
sys.finish(ctx, response, Result{Err: err, EnvelopeXDR: env})
Expand Down Expand Up @@ -170,25 +179,40 @@ func (sys *System) submitOnce(ctx context.Context, env string) SubmissionResult
return sr
}

// setTickInProgress sets `tickInProgress` to `true` if it's not
// `false`. Returns `true` if `tickInProgress` has been switched
// to `true` inside this method and `Tick()` should continue.
func (sys *System) setTickInProgress(ctx context.Context) bool {
sys.tickMutex.Lock()
defer sys.tickMutex.Unlock()

if sys.tickInProgress {
logger := log.Ctx(ctx)
logger.Info("ticking in progress")
return false
}

sys.tickInProgress = true
return true
}

func (sys *System) unsetTickInProgress() {
sys.tickMutex.Lock()
defer sys.tickMutex.Unlock()
sys.tickInProgress = false
}

// Tick triggers the system to update itself with any new data available.
func (sys *System) Tick(ctx context.Context) {
sys.Init()
logger := log.Ctx(ctx)

// Make sure Tick is not run concurrently
sys.tickMutex.Lock()
if sys.tickInProgress {
logger.Debug("ticking in progress")
if !sys.setTickInProgress(ctx) {
return
}
sys.tickInProgress = true
sys.tickMutex.Unlock()

defer func() {
sys.tickMutex.Lock()
sys.tickInProgress = false
sys.tickMutex.Unlock()
}()
defer sys.unsetTickInProgress()

logger.
WithField("queued", sys.SubmissionQueue.String()).
Expand All @@ -199,6 +223,7 @@ func (sys *System) Tick(ctx context.Context) {
curSeq, err := sys.Sequences.Get(addys)
if err != nil {
logger.WithStack(err).Error(err)
return
} else {
sys.SubmissionQueue.Update(curSeq)
}
Expand Down Expand Up @@ -229,6 +254,7 @@ func (sys *System) Tick(ctx context.Context) {
stillOpen, err := sys.Pending.Clean(ctx, sys.SubmissionTimeout)
if err != nil {
logger.WithStack(err).Error(err)
return
}

sys.Metrics.OpenSubmissionsGauge.Update(int64(stillOpen))
Expand Down
49 changes: 46 additions & 3 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txsub
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -52,9 +53,9 @@ func (suite *SystemTestSuite) SetupTest() {
Err: ErrBadSequence,
}

suite.sequences.Results = map[string]uint64{
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H": 0,
}
suite.sequences.On("Get", []string{"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"}).
Return(map[string]uint64{"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H": 0}, nil).
Once()
}

// Returns the result provided by the ResultProvider.
Expand Down Expand Up @@ -117,6 +118,48 @@ func (suite *SystemTestSuite) TestTick_Noop() {
suite.system.Tick(suite.ctx)
}

// TestTick_Deadlock is a regression test for Tick() deadlock: if for any reason
// call to Tick() takes more time and another Tick() is called.
// This test starts two go routines: both calling Tick() but the call to
// `sys.Sequences.Get(addys)` is delayed by 1 second. It allows to simulate two
// calls to `Tick()` executed at the same time.
func (suite *SystemTestSuite) TestTick_Deadlock() {
secondDone := make(chan bool, 1)
testDone := make(chan bool)

go func() {
select {
case <-secondDone:
// OK!
case <-time.After(5 * time.Second):
assert.Fail(suite.T(), "Timeout, likely a deadlock in Tick()")
}

testDone <- true
}()

// Start first Tick
suite.system.SubmissionQueue.Push("address", 0)
// Configure suite.sequences to return after 1 second in a first call
suite.sequences.On("Get", []string{"address"}).After(time.Second).Return(map[string]uint64{}, nil)

go func() {
fmt.Println("Starting first Tick()")
suite.system.Tick(suite.ctx)
fmt.Println("Finished first Tick()")
}()

go func() {
// Start second Tick - should be deadlocked if mutex is not Unlock()'ed.
fmt.Println("Starting second Tick()")
suite.system.Tick(suite.ctx)
fmt.Println("Finished second Tick()")
secondDone <- true
}()

<-testDone
}

// Test that Tick finishes any available transactions,
func (suite *SystemTestSuite) TestTick_FinishesTransactions() {
l := make(chan Result, 1)
Expand Down
10 changes: 6 additions & 4 deletions services/horizon/internal/txsub/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package txsub

import (
"context"

"github.com/stretchr/testify/mock"
)

// MockSubmitter is a test helper that simplements the Submitter interface
Expand Down Expand Up @@ -43,11 +45,11 @@ func (results *MockResultProvider) ResultByHash(ctx context.Context, hash string
// MockSequenceProvider is a test helper that simplements the SequenceProvider
// interface
type MockSequenceProvider struct {
Results map[string]uint64
Err error
mock.Mock
}

// Get implements `txsub.SequenceProvider`
func (results *MockSequenceProvider) Get(addresses []string) (map[string]uint64, error) {
return results.Results, results.Err
func (o *MockSequenceProvider) Get(addresses []string) (map[string]uint64, error) {
args := o.Called(addresses)
return args.Get(0).(map[string]uint64), args.Error(1)
}
8 changes: 7 additions & 1 deletion services/horizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stellar/go/network"
"github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/support/log"
"github.com/throttled/throttled"
Expand Down Expand Up @@ -180,7 +181,7 @@ func init() {

rootCmd.PersistentFlags().String(
"network-passphrase",
"",
network.TestNetworkPassphrase,
"Override the network passphrase",
)

Expand Down Expand Up @@ -239,6 +240,10 @@ func initConfig() {
stdLog.Fatal("Invalid config: stellar-core-url is blank. Please specify --stellar-core-url on the command line or set the STELLAR_CORE_URL environment variable.")
}

if viper.GetString("network-passphrase") == "" {
stdLog.Fatal("Invalid config: network-passphrase is blank. Please specify --network-passphrase on the command line or set the NETWORK_PASSPHRASE environment variable.")
}

ll, err := logrus.ParseLevel(viper.GetString("log-level"))

if err != nil {
Expand Down Expand Up @@ -299,6 +304,7 @@ func initConfig() {
LogLevel: ll,
LogFile: lf,
MaxPathLength: uint(viper.GetInt("max-path-length")),
NetworkPassphrase: viper.GetString("network-passphrase"),
SentryDSN: viper.GetString("sentry-dsn"),
LogglyToken: viper.GetString("loggly-token"),
LogglyTag: viper.GetString("loggly-tag"),
Expand Down

0 comments on commit 1a536e3

Please sign in to comment.