Skip to content

Commit

Permalink
services/horizon/internal/actions: Wrap SSE handlers in repeatable re…
Browse files Browse the repository at this point in the history
…ad tx (#2344)

* Execute SSE handlers in repeatable read transactions
* Obtain account info from stellar core db using repeatable read transaction
  • Loading branch information
tamirms authored Mar 6, 2020
1 parent 19254e9 commit e2fa4c7
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 5 deletions.
12 changes: 11 additions & 1 deletion services/horizon/internal/actions/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actions

import (
"context"
"database/sql"
"net/http"
"strings"

Expand All @@ -25,7 +26,16 @@ func accountFromCoreDB(ctx context.Context, cq *core.Q, addr string) (*protocol.
resource protocol.Account
)

err := cq.AccountByAddress(&coreRecord, addr)
err := cq.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return nil, errors.Wrap(err, "starting repeatable read transaction")
}
defer cq.Rollback()

err = cq.AccountByAddress(&coreRecord, addr)
if err != nil {
return nil, errors.Wrap(err, "getting core account record")
}
Expand Down
36 changes: 32 additions & 4 deletions services/horizon/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
"encoding/json"
"net/http"
"strings"
"time"

"github.com/stellar/go/services/horizon/internal/actions"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/hchi"
"github.com/stellar/go/services/horizon/internal/ledger"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/stellar/go/services/horizon/internal/render/sse"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/strkey"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
"github.com/stellar/go/support/render/httpjson"
Expand Down Expand Up @@ -413,6 +416,31 @@ func (handler streamableObjectActionHandler) ServeHTTP(
problem.Render(r.Context(), w, hProblem.NotAcceptable)
}

func repeatableReadStream(
r *http.Request,
generateEvents sse.GenerateEventsFunc,
) sse.GenerateEventsFunc {
var session db.SessionInterface
if val := r.Context().Value(&horizonContext.SessionContextKey); val != nil {
session = val.(db.SessionInterface)
}

return func() ([]sse.Event, error) {
if session != nil {
err := session.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return nil, errors.Wrap(err, "Error starting repeatable read transaction")
}
defer session.Rollback()
}

return generateEvents()
}
}

func (handler streamableObjectActionHandler) renderStream(
w http.ResponseWriter,
r *http.Request,
Expand All @@ -423,7 +451,7 @@ func (handler streamableObjectActionHandler) renderStream(
w,
r,
singleObjectStreamLimit,
func() ([]sse.Event, error) {
repeatableReadStream(r, func() ([]sse.Event, error) {
response, err := handler.action.GetResource(w, r)
if err != nil {
return nil, err
Expand All @@ -434,7 +462,7 @@ func (handler streamableObjectActionHandler) renderStream(
return []sse.Event{sse.Event{Data: response}}, nil
}
return []sse.Event{}, nil
},
}),
)
}

Expand Down Expand Up @@ -495,7 +523,7 @@ func (handler pageActionHandler) renderStream(w http.ResponseWriter, r *http.Req
w,
r,
int(pq.Limit),
func() ([]sse.Event, error) {
repeatableReadStream(r, func() ([]sse.Event, error) {
records, err := handler.action.GetResourcePage(w, r)
if err != nil {
return nil, err
Expand All @@ -515,7 +543,7 @@ func (handler pageActionHandler) renderStream(w http.ResponseWriter, r *http.Req
}

return events, nil
},
}),
)
}

Expand Down
79 changes: 79 additions & 0 deletions services/horizon/internal/stream_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package horizon

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -14,8 +15,10 @@ import (

"github.com/go-chi/chi"
"github.com/stellar/go/services/horizon/internal/actions"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/render/sse"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/render/hal"
)

Expand Down Expand Up @@ -462,3 +465,79 @@ func TestObjectStream(t *testing.T) {
st.Wait(true)
})
}

func TestRepeatableReadStream(t *testing.T) {
t.Run("page stream creates repeatable read tx", func(t *testing.T) {
action := &testPageAction{
objects: map[uint32][]string{
3: []string{"a", "b", "c"},
4: []string{"a", "b", "c", "d", "e"},
},
}

session := &db.MockSession{}
session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

request := streamRequest(t, "")
request = request.WithContext(context.WithValue(
request.Context(),
&horizonContext.SessionContextKey,
session,
))

st := NewStreamablePageTest(
action,
3,
request,
expectResponse(t, unmarashalPage, []string{"a", "b", "c", "d", "e"}),
)
st.Wait(false)
})

t.Run("object stream creates repeatable read tx", func(t *testing.T) {
action := &testObjectAction{
objects: map[uint32]stringObject{
3: "a",
4: "b",
},
}

session := &db.MockSession{}
session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

request := streamRequest(t, "")
request = request.WithContext(context.WithValue(
request.Context(),
&horizonContext.SessionContextKey,
session,
))

st := NewstreamableObjectTest(
action,
3,
request,
expectResponse(t, unmarashalString, []string{"a", "b"}),
)
st.Wait(false)
})
}
1 change: 1 addition & 0 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Session struct {
}

type SessionInterface interface {
BeginTx(opts *sql.TxOptions) error
Begin() error
Rollback() error
TruncateTables(tables []string) error
Expand Down
5 changes: 5 additions & 0 deletions support/db/mock_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func (m *MockSession) Begin() error {
return args.Error(0)
}

func (m *MockSession) BeginTx(opts *sql.TxOptions) error {
args := m.Called(opts)
return args.Error(0)
}

func (m *MockSession) Rollback() error {
args := m.Called()
return args.Error(0)
Expand Down

0 comments on commit e2fa4c7

Please sign in to comment.