Skip to content

Commit

Permalink
services/horizon: Split App.UpdateLedgerState into Core and Horizon l…
Browse files Browse the repository at this point in the history
…edger update methods (#3900)

This commit splits `UpdateLedgerState` into `UpdateCoreLedgerState` and
`UpdateHorizonLedgerState` which update Core and Horizon ledger state
separately. It also changes `Tick` frequency to 5 seconds.

If Stellar-Core is unresponsive the call to Stellar-Core `/info` endpoint  will
be cancelled after 10 seconds but it will also return entire `UpdateLedgerState`
method. This will make Horizon ledger stats out of date which can affect
streaming (because streams are updated only when Horizon ledger in
`ledger.Status` is incremented).

Known limitations: with this change it's possible that Horizon root can report
Core ledger sequence behind Horizon.
  • Loading branch information
bartekn authored Sep 9, 2021
1 parent ea9fc73 commit e011c91
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 21 deletions.
2 changes: 1 addition & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ var dbReapCmd = &cobra.Command{
return err
}
ctx := context.Background()
app.UpdateLedgerState(ctx)
app.UpdateHorizonLedgerState(ctx)
return app.DeleteUnretainedHistory(ctx)
},
}
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/actions_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestRootAction(t *testing.T) {
ht.App.config.StellarCoreURL = server.URL
ht.App.config.NetworkPassphrase = "test"
assert.NoError(t, ht.App.UpdateStellarCoreInfo(ht.Ctx))
ht.App.UpdateLedgerState(ht.Ctx)
ht.App.UpdateCoreLedgerState(ht.Ctx)
ht.App.UpdateHorizonLedgerState(ht.Ctx)

w := ht.Get("/")

Expand Down Expand Up @@ -95,7 +96,7 @@ func TestRootCoreClientInfoErrored(t *testing.T) {
defer server.Close()

ht.App.config.StellarCoreURL = server.URL
ht.App.UpdateLedgerState(ht.Ctx)
ht.App.UpdateCoreLedgerState(ht.Ctx)

w := ht.Get("/")

Expand Down
31 changes: 23 additions & 8 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (a *App) GetCoreState() corestate.State {
}

const tickerMaxFrequency = 1 * time.Second
const tickerMaxDuration = 10 * time.Second
const tickerMaxDuration = 5 * time.Second

// NewApp constructs an new App instance from the provided config.
func NewApp(config Config) (*App, error) {
Expand Down Expand Up @@ -208,10 +208,11 @@ func (a *App) HorizonSession() db.SessionInterface {
return a.historyQ.SessionInterface.Clone()
}

// UpdateLedgerState triggers a refresh of several metrics gauges, such as open
// db connections and ledger state
func (a *App) UpdateLedgerState(ctx context.Context) {
var next ledger.Status
// UpdateCoreLedgerState triggers a refresh of Stellar-Core ledger state.
// This is done separately from Horizon ledger state update to prevent issues
// in case Stellar-Core query timeout.
func (a *App) UpdateCoreLedgerState(ctx context.Context) {
var next ledger.CoreStatus

logErr := func(err error, msg string) {
log.WithStack(err).WithField("err", err.Error()).Error(msg)
Expand All @@ -228,7 +229,20 @@ func (a *App) UpdateLedgerState(ctx context.Context) {
return
}
next.CoreLatest = int32(coreInfo.Info.Ledger.Num)
a.ledgerState.SetCoreStatus(next)
}

// UpdateHorizonLedgerState triggers a refresh of Horizon ledger state.
// This is done separately from Core ledger state update to prevent issues
// in case Stellar-Core query timeout.
func (a *App) UpdateHorizonLedgerState(ctx context.Context) {
var next ledger.HorizonStatus

logErr := func(err error, msg string) {
log.WithStack(err).WithField("err", err.Error()).Error(msg)
}

var err error
next.HistoryLatest, next.HistoryLatestClosedAt, err =
a.HistoryQ().LatestLedgerSequenceClosedAt(ctx)
if err != nil {
Expand All @@ -248,7 +262,7 @@ func (a *App) UpdateLedgerState(ctx context.Context) {
return
}

a.ledgerState.SetStatus(next)
a.ledgerState.SetHorizonStatus(next)
}

// UpdateFeeStatsState triggers a refresh of several operation fee metrics.
Expand Down Expand Up @@ -419,9 +433,10 @@ func (a *App) Tick(ctx context.Context) error {
log.Debug("ticking app")

// update ledger state, operation fee state, and stellar-core info in parallel
wg.Add(3)
wg.Add(4)
var err error
go func() { a.UpdateLedgerState(ctx); wg.Done() }()
go func() { a.UpdateCoreLedgerState(ctx); wg.Done() }()
go func() { a.UpdateHorizonLedgerState(ctx); wg.Done() }()
go func() { a.UpdateFeeStatsState(ctx); wg.Done() }()
go func() { err = a.UpdateStellarCoreInfo(ctx); wg.Done() }()
wg.Wait()
Expand Down
6 changes: 4 additions & 2 deletions services/horizon/internal/httpt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func startHTTPTest(t *testing.T, scenario string) *HTTPT {
}`)

ret.App.config.StellarCoreURL = ret.coreServer.URL
ret.App.UpdateLedgerState(context.Background())
ret.App.UpdateCoreLedgerState(context.Background())
ret.App.UpdateHorizonLedgerState(context.Background())

return ret
}
Expand Down Expand Up @@ -101,5 +102,6 @@ func (ht *HTTPT) ReapHistory(retention uint) {
ht.App.reaper.RetentionCount = retention
err := ht.App.DeleteUnretainedHistory(context.Background())
ht.Require.NoError(err)
ht.App.UpdateLedgerState(context.Background())
ht.App.UpdateCoreLedgerState(context.Background())
ht.App.UpdateHorizonLedgerState(context.Background())
}
12 changes: 10 additions & 2 deletions services/horizon/internal/ledger/ledger_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import (
func Test_HistoryDBLedgerSourceCurrentLedger(t *testing.T) {
state := &State{
RWMutex: sync.RWMutex{},
current: Status{ExpHistoryLatest: 3},
current: Status{
HorizonStatus: HorizonStatus{
ExpHistoryLatest: 3,
},
},
}

ledgerSource := HistoryDBSource{
Expand All @@ -25,7 +29,11 @@ func Test_HistoryDBLedgerSourceCurrentLedger(t *testing.T) {
func Test_HistoryDBLedgerSourceNextLedger(t *testing.T) {
state := &State{
RWMutex: sync.RWMutex{},
current: Status{ExpHistoryLatest: 3},
current: Status{
HorizonStatus: HorizonStatus{
ExpHistoryLatest: 3,
},
},
}

ledgerSource := HistoryDBSource{
Expand Down
24 changes: 23 additions & 1 deletion services/horizon/internal/ledger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ import (
// Status represents a snapshot of both horizon's and stellar-core's view of the
// ledger.
type Status struct {
CoreLatest int32 `db:"core_latest"`
CoreStatus
HorizonStatus
}

type CoreStatus struct {
CoreLatest int32 `db:"core_latest"`
}

type HorizonStatus struct {
HistoryLatest int32 `db:"history_latest"`
HistoryLatestClosedAt time.Time `db:"history_latest_closed_at"`
HistoryElder int32 `db:"history_elder"`
Expand Down Expand Up @@ -41,3 +49,17 @@ func (c *State) SetStatus(next Status) {
defer c.Unlock()
c.current = next
}

// SetCoreStatus updates the cached snapshot of the ledger state of Stellar-Core
func (c *State) SetCoreStatus(next CoreStatus) {
c.Lock()
defer c.Unlock()
c.current.CoreStatus = next
}

// SetHorizonStatus updates the cached snapshot of the ledger state of Horizon
func (c *State) SetHorizonStatus(next HorizonStatus) {
c.Lock()
defer c.Unlock()
c.current.HorizonStatus = next
}
8 changes: 6 additions & 2 deletions services/horizon/internal/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,12 @@ func TestCheckHistoryStaleMiddleware(t *testing.T) {
} {
t.Run(testCase.name, func(t *testing.T) {
state := ledger.Status{
CoreLatest: testCase.coreLatest,
HistoryLatest: testCase.historyLatest,
CoreStatus: ledger.CoreStatus{
CoreLatest: testCase.coreLatest,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: testCase.historyLatest,
},
}
ledgerState := &ledger.State{}
ledgerState.SetStatus(state)
Expand Down
27 changes: 24 additions & 3 deletions services/horizon/internal/resourceadapter/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ func TestPopulateRoot(t *testing.T) {

PopulateRoot(context.Background(),
res,
ledger.Status{CoreLatest: 1, HistoryLatest: 3, HistoryElder: 2},
ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 1,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 3, HistoryElder: 2,
},
},
"hVersion",
"cVersion",
"passphrase",
Expand All @@ -44,7 +51,14 @@ func TestPopulateRoot(t *testing.T) {
res = &horizon.Root{}
PopulateRoot(context.Background(),
res,
ledger.Status{CoreLatest: 1, HistoryLatest: 3, HistoryElder: 2},
ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 1,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 3, HistoryElder: 2,
},
},
"hVersion",
"cVersion",
"passphrase",
Expand All @@ -65,7 +79,14 @@ func TestPopulateRoot(t *testing.T) {
res = &horizon.Root{}
PopulateRoot(context.Background(),
res,
ledger.Status{CoreLatest: 1, HistoryLatest: 3, HistoryElder: 2},
ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 1,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 3, HistoryElder: 2,
},
},
"hVersion",
"cVersion",
"passphrase",
Expand Down

0 comments on commit e011c91

Please sign in to comment.