Skip to content

Commit

Permalink
services/horizon/ledger: Fix memory leak in HistoryDBSource (#2548)
Browse files Browse the repository at this point in the history
This commit fixes a memory leak in `HistoryDBSource.NextLedger` method.
The method is usually used like in `stream_handler.go`:

```
 select { 
 case currentLedgerSequence = <-handler.LedgerSource.NextLedger(currentLedgerSequence): 
 	continue 
 case <-ctx.Done(): 
 	stream.Done() 
 	return 
 } 
```

When streaming connection is closed but there are no new ledgers (ex.
due to database or network issues) the goroutines started in
`HistoryDBSource.NextLedger` will run indefinitely.

To fix this `ledger.Source` interface was updated with `Close()` method
that should be called when a ledger source is no longer used and it
exits internal goroutine. The code was refactored to create a new
`HistoryDBSource` for each streaming request (so it can be closed when
handler execution ends). This should have a really minimal impact on
memory usage.

When there are no new ledgers, the goroutines started in
`HistoryDBSource.NextLedger` will leak and will eventually consume all
available memory, like in #2535.
  • Loading branch information
bartekn authored May 5, 2020
1 parent f766413 commit 34aac57
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 16 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Add `last_modified_time` to account responses. `last_modified_time` is the
closing time of the most recent ledger in which the account was modified.
* Fix a memory leak in the code responsible for streaming [#2548](https://github.com/stellar/go/pull/2548).

## v1.2.1

Expand Down
32 changes: 27 additions & 5 deletions services/horizon/internal/ledger/ledger_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
)

// Source exposes two helpers methods to help you find out the current
// ledger and yield every time there is a new ledger.
// ledger and yield every time there is a new ledger. Call `Close` when
// source is no longer used.
type Source interface {
CurrentLedger() uint32
NextLedger(currentSequence uint32) chan uint32
Close()
}

type currentStateFunc func() State
Expand All @@ -19,23 +21,27 @@ type currentStateFunc func() State
type HistoryDBSource struct {
updateFrequency time.Duration
currentState currentStateFunc

closedLock sync.Mutex
closed bool
}

// NewHistoryDBSource constructs a new instance of HistoryDBSource
func NewHistoryDBSource(updateFrequency time.Duration) HistoryDBSource {
return HistoryDBSource{
func NewHistoryDBSource(updateFrequency time.Duration) *HistoryDBSource {
return &HistoryDBSource{
updateFrequency: updateFrequency,
currentState: CurrentState,
closedLock: sync.Mutex{},
}
}

// CurrentLedger returns the current ledger.
func (source HistoryDBSource) CurrentLedger() uint32 {
func (source *HistoryDBSource) CurrentLedger() uint32 {
return source.currentState().ExpHistoryLatest
}

// NextLedger returns a channel which yields every time there is a new ledger with a sequence number larger than currentSequence.
func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
func (source *HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
// Make sure this is buffered channel of size 1. Otherwise, the go routine below
// will never return if `newLedgers` channel is not read. From Effective Go:
// > If the channel is unbuffered, the sender blocks until the receiver has received the value.
Expand All @@ -46,6 +52,13 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
time.Sleep(source.updateFrequency)
}

source.closedLock.Lock()
closed := source.closed
source.closedLock.Unlock()
if closed {
return
}

currentLedgerState := source.currentState()
if currentLedgerState.ExpHistoryLatest > currentSequence {
newLedgers <- currentLedgerState.ExpHistoryLatest
Expand All @@ -57,6 +70,13 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 {
return newLedgers
}

// Close closes the internal go routines.
func (source *HistoryDBSource) Close() {
source.closedLock.Lock()
defer source.closedLock.Unlock()
source.closed = true
}

// TestingSource is helper struct which implements the LedgerSource
// interface.
type TestingSource struct {
Expand Down Expand Up @@ -106,3 +126,5 @@ func (source *TestingSource) NextLedger(currentSequence uint32) chan uint32 {

return response
}

func (source *TestingSource) Close() {}
15 changes: 11 additions & 4 deletions services/horizon/internal/render/sse/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"github.com/stellar/throttled"
)

type LedgerSourceFactory interface {
Get() ledger.Source
}

// StreamHandler represents a stream handling action
type StreamHandler struct {
RateLimiter *throttled.HTTPRateLimiter
LedgerSource ledger.Source
RateLimiter *throttled.HTTPRateLimiter
LedgerSourceFactory LedgerSourceFactory
}

// GenerateEventsFunc generates a slice of sse.Event which are sent via
Expand All @@ -30,7 +34,10 @@ func (handler StreamHandler) ServeStream(
stream := NewStream(ctx, w)
stream.SetLimit(limit)

currentLedgerSequence := handler.LedgerSource.CurrentLedger()
ledgerSource := handler.LedgerSourceFactory.Get()
defer ledgerSource.Close()

currentLedgerSequence := ledgerSource.CurrentLedger()
for {
// Rate limit the request if it's a call to stream since it queries the DB every second. See
// https://github.com/stellar/go/issues/715 for more details.
Expand Down Expand Up @@ -71,7 +78,7 @@ func (handler StreamHandler) ServeStream(
stream.Init()

select {
case currentLedgerSequence = <-handler.LedgerSource.NextLedger(currentLedgerSequence):
case currentLedgerSequence = <-ledgerSource.NextLedger(currentLedgerSequence):
continue
case <-ctx.Done():
stream.Done()
Expand Down
12 changes: 9 additions & 3 deletions services/horizon/internal/render/sse/stream_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@ import (
"github.com/stellar/go/services/horizon/internal/ledger"
)

type testingFactory struct {
ledgerSource ledger.Source
}

func (f *testingFactory) Get() ledger.Source {
return f.ledgerSource
}

func TestSendByeByeOnContextDone(t *testing.T) {
ledgerSource := ledger.NewTestingSource(1)
handler := StreamHandler{
LedgerSource: ledgerSource,
}
handler := StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}}

r, err := http.NewRequest("GET", "http://localhost", nil)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions services/horizon/internal/stream_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (
"github.com/stellar/go/support/render/hal"
)

type testingFactory struct {
ledgerSource ledger.Source
}

func (f *testingFactory) Get() ledger.Source {
return f.ledgerSource
}

// StreamTest utility struct to wrap SSE related tests.
type StreamTest struct {
ledgerSource *ledger.TestingSource
Expand Down Expand Up @@ -64,7 +72,7 @@ func NewStreamablePageTest(
) *StreamTest {
ledgerSource := ledger.NewTestingSource(currentLedger)
action.ledgerSource = ledgerSource
streamHandler := sse.StreamHandler{LedgerSource: ledgerSource}
streamHandler := sse.StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}}
handler := streamablePageHandler(action, streamHandler)

return newStreamTest(
Expand All @@ -85,7 +93,7 @@ func NewStreamableObjectTest(
) *StreamTest {
ledgerSource := ledger.NewTestingSource(currentLedger)
action.ledgerSource = ledgerSource
streamHandler := sse.StreamHandler{LedgerSource: ledgerSource}
streamHandler := sse.StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}}
handler := streamableObjectActionHandler{action: action, limit: limit, streamHandler: streamHandler}

return newStreamTest(
Expand Down
12 changes: 10 additions & 2 deletions services/horizon/internal/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ func (w *web) mustInstallMiddlewares(app *App, connTimeout time.Duration) {
w.internalRouter.Use(loggerMiddleware)
}

type historyLedgerSourceFactory struct {
updateFrequency time.Duration
}

func (f historyLedgerSourceFactory) Get() ledger.Source {
return ledger.NewHistoryDBSource(f.updateFrequency)
}

// mustInstallActions installs the routing configuration of horizon onto the
// provided app. All route registration should be implemented here.
func (w *web) mustInstallActions(
Expand All @@ -151,8 +159,8 @@ func (w *web) mustInstallActions(
r.Get("/", RootAction{}.Handle)

streamHandler := sse.StreamHandler{
RateLimiter: w.rateLimiter,
LedgerSource: ledger.NewHistoryDBSource(w.sseUpdateFrequency),
RateLimiter: w.rateLimiter,
LedgerSourceFactory: historyLedgerSourceFactory{updateFrequency: w.sseUpdateFrequency},
}

// State endpoints behind stateMiddleware
Expand Down

0 comments on commit 34aac57

Please sign in to comment.