From e2fa4c7700601ce37a44e7ce1e179c359e60de44 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 6 Mar 2020 18:08:30 +0100 Subject: [PATCH] services/horizon/internal/actions: Wrap SSE handlers in repeatable read tx (#2344) * Execute SSE handlers in repeatable read transactions * Obtain account info from stellar core db using repeatable read transaction --- services/horizon/internal/actions/account.go | 12 ++- services/horizon/internal/handler.go | 36 ++++++++- .../horizon/internal/stream_handler_test.go | 79 +++++++++++++++++++ support/db/main.go | 1 + support/db/mock_session.go | 5 ++ 5 files changed, 128 insertions(+), 5 deletions(-) diff --git a/services/horizon/internal/actions/account.go b/services/horizon/internal/actions/account.go index ad090af727..782d817c01 100644 --- a/services/horizon/internal/actions/account.go +++ b/services/horizon/internal/actions/account.go @@ -2,6 +2,7 @@ package actions import ( "context" + "database/sql" "net/http" "strings" @@ -25,7 +26,16 @@ func accountFromCoreDB(ctx context.Context, cq *core.Q, addr string) (*protocol. resource protocol.Account ) - err := cq.AccountByAddress(&coreRecord, addr) + err := cq.BeginTx(&sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }) + if err != nil { + return nil, errors.Wrap(err, "starting repeatable read transaction") + } + defer cq.Rollback() + + err = cq.AccountByAddress(&coreRecord, addr) if err != nil { return nil, errors.Wrap(err, "getting core account record") } diff --git a/services/horizon/internal/handler.go b/services/horizon/internal/handler.go index 8423736599..b0bd1fe566 100644 --- a/services/horizon/internal/handler.go +++ b/services/horizon/internal/handler.go @@ -4,12 +4,14 @@ import ( "bytes" "context" "crypto/sha256" + "database/sql" "encoding/json" "net/http" "strings" "time" "github.com/stellar/go/services/horizon/internal/actions" + horizonContext "github.com/stellar/go/services/horizon/internal/context" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/hchi" "github.com/stellar/go/services/horizon/internal/ledger" @@ -18,6 +20,7 @@ import ( "github.com/stellar/go/services/horizon/internal/render/sse" "github.com/stellar/go/services/horizon/internal/toid" "github.com/stellar/go/strkey" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/render/hal" "github.com/stellar/go/support/render/httpjson" @@ -413,6 +416,31 @@ func (handler streamableObjectActionHandler) ServeHTTP( problem.Render(r.Context(), w, hProblem.NotAcceptable) } +func repeatableReadStream( + r *http.Request, + generateEvents sse.GenerateEventsFunc, +) sse.GenerateEventsFunc { + var session db.SessionInterface + if val := r.Context().Value(&horizonContext.SessionContextKey); val != nil { + session = val.(db.SessionInterface) + } + + return func() ([]sse.Event, error) { + if session != nil { + err := session.BeginTx(&sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }) + if err != nil { + return nil, errors.Wrap(err, "Error starting repeatable read transaction") + } + defer session.Rollback() + } + + return generateEvents() + } +} + func (handler streamableObjectActionHandler) renderStream( w http.ResponseWriter, r *http.Request, @@ -423,7 +451,7 @@ func (handler streamableObjectActionHandler) renderStream( w, r, singleObjectStreamLimit, - func() ([]sse.Event, error) { + repeatableReadStream(r, func() ([]sse.Event, error) { response, err := handler.action.GetResource(w, r) if err != nil { return nil, err @@ -434,7 +462,7 @@ func (handler streamableObjectActionHandler) renderStream( return []sse.Event{sse.Event{Data: response}}, nil } return []sse.Event{}, nil - }, + }), ) } @@ -495,7 +523,7 @@ func (handler pageActionHandler) renderStream(w http.ResponseWriter, r *http.Req w, r, int(pq.Limit), - func() ([]sse.Event, error) { + repeatableReadStream(r, func() ([]sse.Event, error) { records, err := handler.action.GetResourcePage(w, r) if err != nil { return nil, err @@ -515,7 +543,7 @@ func (handler pageActionHandler) renderStream(w http.ResponseWriter, r *http.Req } return events, nil - }, + }), ) } diff --git a/services/horizon/internal/stream_handler_test.go b/services/horizon/internal/stream_handler_test.go index 100d712ec7..5de7f7a383 100644 --- a/services/horizon/internal/stream_handler_test.go +++ b/services/horizon/internal/stream_handler_test.go @@ -2,6 +2,7 @@ package horizon import ( "context" + "database/sql" "encoding/json" "fmt" "net/http" @@ -14,8 +15,10 @@ import ( "github.com/go-chi/chi" "github.com/stellar/go/services/horizon/internal/actions" + horizonContext "github.com/stellar/go/services/horizon/internal/context" "github.com/stellar/go/services/horizon/internal/ledger" "github.com/stellar/go/services/horizon/internal/render/sse" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/render/hal" ) @@ -462,3 +465,79 @@ func TestObjectStream(t *testing.T) { st.Wait(true) }) } + +func TestRepeatableReadStream(t *testing.T) { + t.Run("page stream creates repeatable read tx", func(t *testing.T) { + action := &testPageAction{ + objects: map[uint32][]string{ + 3: []string{"a", "b", "c"}, + 4: []string{"a", "b", "c", "d", "e"}, + }, + } + + session := &db.MockSession{} + session.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + session.On("Rollback").Return(nil).Once() + + session.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + session.On("Rollback").Return(nil).Once() + + request := streamRequest(t, "") + request = request.WithContext(context.WithValue( + request.Context(), + &horizonContext.SessionContextKey, + session, + )) + + st := NewStreamablePageTest( + action, + 3, + request, + expectResponse(t, unmarashalPage, []string{"a", "b", "c", "d", "e"}), + ) + st.Wait(false) + }) + + t.Run("object stream creates repeatable read tx", func(t *testing.T) { + action := &testObjectAction{ + objects: map[uint32]stringObject{ + 3: "a", + 4: "b", + }, + } + + session := &db.MockSession{} + session.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + session.On("Rollback").Return(nil).Once() + + session.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + session.On("Rollback").Return(nil).Once() + + request := streamRequest(t, "") + request = request.WithContext(context.WithValue( + request.Context(), + &horizonContext.SessionContextKey, + session, + )) + + st := NewstreamableObjectTest( + action, + 3, + request, + expectResponse(t, unmarashalString, []string{"a", "b"}), + ) + st.Wait(false) + }) +} diff --git a/support/db/main.go b/support/db/main.go index d091b86a00..3e661b6feb 100644 --- a/support/db/main.go +++ b/support/db/main.go @@ -105,6 +105,7 @@ type Session struct { } type SessionInterface interface { + BeginTx(opts *sql.TxOptions) error Begin() error Rollback() error TruncateTables(tables []string) error diff --git a/support/db/mock_session.go b/support/db/mock_session.go index fe96d6226d..b064a6d394 100644 --- a/support/db/mock_session.go +++ b/support/db/mock_session.go @@ -19,6 +19,11 @@ func (m *MockSession) Begin() error { return args.Error(0) } +func (m *MockSession) BeginTx(opts *sql.TxOptions) error { + args := m.Called(opts) + return args.Error(0) +} + func (m *MockSession) Rollback() error { args := m.Called() return args.Error(0)