From 34aac574852a36c04b5cee8f00bc577a340bd322 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 5 May 2020 16:13:55 +0200 Subject: [PATCH] services/horizon/ledger: Fix memory leak in HistoryDBSource (#2548) This commit fixes a memory leak in `HistoryDBSource.NextLedger` method. The method is usually used like in `stream_handler.go`: ``` select { case currentLedgerSequence = <-handler.LedgerSource.NextLedger(currentLedgerSequence): continue case <-ctx.Done(): stream.Done() return } ``` When streaming connection is closed but there are no new ledgers (ex. due to database or network issues) the goroutines started in `HistoryDBSource.NextLedger` will run indefinitely. To fix this `ledger.Source` interface was updated with `Close()` method that should be called when a ledger source is no longer used and it exits internal goroutine. The code was refactored to create a new `HistoryDBSource` for each streaming request (so it can be closed when handler execution ends). This should have a really minimal impact on memory usage. When there are no new ledgers, the goroutines started in `HistoryDBSource.NextLedger` will leak and will eventually consume all available memory, like in https://github.com/stellar/go/issues/2535. --- services/horizon/CHANGELOG.md | 1 + .../horizon/internal/ledger/ledger_source.go | 32 ++++++++++++++++--- .../internal/render/sse/stream_handler.go | 15 ++++++--- .../render/sse/stream_handler_test.go | 12 +++++-- .../horizon/internal/stream_handler_test.go | 12 +++++-- services/horizon/internal/web.go | 12 +++++-- 6 files changed, 68 insertions(+), 16 deletions(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 3ea348825b..588dd0ad10 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -7,6 +7,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * Add `last_modified_time` to account responses. `last_modified_time` is the closing time of the most recent ledger in which the account was modified. +* Fix a memory leak in the code responsible for streaming [#2548](https://github.com/stellar/go/pull/2548). ## v1.2.1 diff --git a/services/horizon/internal/ledger/ledger_source.go b/services/horizon/internal/ledger/ledger_source.go index e1bcecd57e..7800ab46a6 100644 --- a/services/horizon/internal/ledger/ledger_source.go +++ b/services/horizon/internal/ledger/ledger_source.go @@ -6,10 +6,12 @@ import ( ) // Source exposes two helpers methods to help you find out the current -// ledger and yield every time there is a new ledger. +// ledger and yield every time there is a new ledger. Call `Close` when +// source is no longer used. type Source interface { CurrentLedger() uint32 NextLedger(currentSequence uint32) chan uint32 + Close() } type currentStateFunc func() State @@ -19,23 +21,27 @@ type currentStateFunc func() State type HistoryDBSource struct { updateFrequency time.Duration currentState currentStateFunc + + closedLock sync.Mutex + closed bool } // NewHistoryDBSource constructs a new instance of HistoryDBSource -func NewHistoryDBSource(updateFrequency time.Duration) HistoryDBSource { - return HistoryDBSource{ +func NewHistoryDBSource(updateFrequency time.Duration) *HistoryDBSource { + return &HistoryDBSource{ updateFrequency: updateFrequency, currentState: CurrentState, + closedLock: sync.Mutex{}, } } // CurrentLedger returns the current ledger. -func (source HistoryDBSource) CurrentLedger() uint32 { +func (source *HistoryDBSource) CurrentLedger() uint32 { return source.currentState().ExpHistoryLatest } // NextLedger returns a channel which yields every time there is a new ledger with a sequence number larger than currentSequence. -func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 { +func (source *HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 { // Make sure this is buffered channel of size 1. Otherwise, the go routine below // will never return if `newLedgers` channel is not read. From Effective Go: // > If the channel is unbuffered, the sender blocks until the receiver has received the value. @@ -46,6 +52,13 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 { time.Sleep(source.updateFrequency) } + source.closedLock.Lock() + closed := source.closed + source.closedLock.Unlock() + if closed { + return + } + currentLedgerState := source.currentState() if currentLedgerState.ExpHistoryLatest > currentSequence { newLedgers <- currentLedgerState.ExpHistoryLatest @@ -57,6 +70,13 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 { return newLedgers } +// Close closes the internal go routines. +func (source *HistoryDBSource) Close() { + source.closedLock.Lock() + defer source.closedLock.Unlock() + source.closed = true +} + // TestingSource is helper struct which implements the LedgerSource // interface. type TestingSource struct { @@ -106,3 +126,5 @@ func (source *TestingSource) NextLedger(currentSequence uint32) chan uint32 { return response } + +func (source *TestingSource) Close() {} diff --git a/services/horizon/internal/render/sse/stream_handler.go b/services/horizon/internal/render/sse/stream_handler.go index 1a25eef85f..81d898088a 100644 --- a/services/horizon/internal/render/sse/stream_handler.go +++ b/services/horizon/internal/render/sse/stream_handler.go @@ -8,10 +8,14 @@ import ( "github.com/stellar/throttled" ) +type LedgerSourceFactory interface { + Get() ledger.Source +} + // StreamHandler represents a stream handling action type StreamHandler struct { - RateLimiter *throttled.HTTPRateLimiter - LedgerSource ledger.Source + RateLimiter *throttled.HTTPRateLimiter + LedgerSourceFactory LedgerSourceFactory } // GenerateEventsFunc generates a slice of sse.Event which are sent via @@ -30,7 +34,10 @@ func (handler StreamHandler) ServeStream( stream := NewStream(ctx, w) stream.SetLimit(limit) - currentLedgerSequence := handler.LedgerSource.CurrentLedger() + ledgerSource := handler.LedgerSourceFactory.Get() + defer ledgerSource.Close() + + currentLedgerSequence := ledgerSource.CurrentLedger() for { // Rate limit the request if it's a call to stream since it queries the DB every second. See // https://github.com/stellar/go/issues/715 for more details. @@ -71,7 +78,7 @@ func (handler StreamHandler) ServeStream( stream.Init() select { - case currentLedgerSequence = <-handler.LedgerSource.NextLedger(currentLedgerSequence): + case currentLedgerSequence = <-ledgerSource.NextLedger(currentLedgerSequence): continue case <-ctx.Done(): stream.Done() diff --git a/services/horizon/internal/render/sse/stream_handler_test.go b/services/horizon/internal/render/sse/stream_handler_test.go index b6b2f5ef77..5d2ca61a87 100644 --- a/services/horizon/internal/render/sse/stream_handler_test.go +++ b/services/horizon/internal/render/sse/stream_handler_test.go @@ -9,11 +9,17 @@ import ( "github.com/stellar/go/services/horizon/internal/ledger" ) +type testingFactory struct { + ledgerSource ledger.Source +} + +func (f *testingFactory) Get() ledger.Source { + return f.ledgerSource +} + func TestSendByeByeOnContextDone(t *testing.T) { ledgerSource := ledger.NewTestingSource(1) - handler := StreamHandler{ - LedgerSource: ledgerSource, - } + handler := StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}} r, err := http.NewRequest("GET", "http://localhost", nil) if err != nil { diff --git a/services/horizon/internal/stream_handler_test.go b/services/horizon/internal/stream_handler_test.go index 074704a8e0..6670ff096b 100644 --- a/services/horizon/internal/stream_handler_test.go +++ b/services/horizon/internal/stream_handler_test.go @@ -21,6 +21,14 @@ import ( "github.com/stellar/go/support/render/hal" ) +type testingFactory struct { + ledgerSource ledger.Source +} + +func (f *testingFactory) Get() ledger.Source { + return f.ledgerSource +} + // StreamTest utility struct to wrap SSE related tests. type StreamTest struct { ledgerSource *ledger.TestingSource @@ -64,7 +72,7 @@ func NewStreamablePageTest( ) *StreamTest { ledgerSource := ledger.NewTestingSource(currentLedger) action.ledgerSource = ledgerSource - streamHandler := sse.StreamHandler{LedgerSource: ledgerSource} + streamHandler := sse.StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}} handler := streamablePageHandler(action, streamHandler) return newStreamTest( @@ -85,7 +93,7 @@ func NewStreamableObjectTest( ) *StreamTest { ledgerSource := ledger.NewTestingSource(currentLedger) action.ledgerSource = ledgerSource - streamHandler := sse.StreamHandler{LedgerSource: ledgerSource} + streamHandler := sse.StreamHandler{LedgerSourceFactory: &testingFactory{ledgerSource}} handler := streamableObjectActionHandler{action: action, limit: limit, streamHandler: streamHandler} return newStreamTest( diff --git a/services/horizon/internal/web.go b/services/horizon/internal/web.go index a3db6b0d04..3dc99fedb1 100644 --- a/services/horizon/internal/web.go +++ b/services/horizon/internal/web.go @@ -131,6 +131,14 @@ func (w *web) mustInstallMiddlewares(app *App, connTimeout time.Duration) { w.internalRouter.Use(loggerMiddleware) } +type historyLedgerSourceFactory struct { + updateFrequency time.Duration +} + +func (f historyLedgerSourceFactory) Get() ledger.Source { + return ledger.NewHistoryDBSource(f.updateFrequency) +} + // mustInstallActions installs the routing configuration of horizon onto the // provided app. All route registration should be implemented here. func (w *web) mustInstallActions( @@ -151,8 +159,8 @@ func (w *web) mustInstallActions( r.Get("/", RootAction{}.Handle) streamHandler := sse.StreamHandler{ - RateLimiter: w.rateLimiter, - LedgerSource: ledger.NewHistoryDBSource(w.sseUpdateFrequency), + RateLimiter: w.rateLimiter, + LedgerSourceFactory: historyLedgerSourceFactory{updateFrequency: w.sseUpdateFrequency}, } // State endpoints behind stateMiddleware