Skip to content

Commit

Permalink
Remove inappropriate use of context in ingestion endpoints
Browse files Browse the repository at this point in the history
Commit 9161997 used a request context to ensure
that all queries to the horizon database from ingestion endpoints were wrapped
in a repeatable read transaction.

This commit refactors the ingestion endpoints to acheive the same effect without
using a context to store a request scoped database session.
  • Loading branch information
tamirms committed Oct 18, 2019
1 parent 19fde30 commit 8d8cfc5
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 232 deletions.
15 changes: 12 additions & 3 deletions services/horizon/internal/actions/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,27 @@ func AccountInfo(ctx context.Context, cq *core.Q, addr string) (*protocol.Accoun
return &resource, errors.Wrap(err, "populating account")
}

// GetAccountsHandler is the action handler for the /accounts endpoint
type GetAccountsHandler struct {
type getAccountsHandler struct {
HistoryQ *history.Q
}

// NewAccounts returns a PageHandler for the `/accounts` endpoint
func NewAccounts(historyQ *history.Q) PageHandler {
return repeatableReadPageHandler{
historyQ: historyQ,
withQ: func(q *history.Q) PageHandler {
return getAccountsHandler{q}
},
}
}

// GetResourcePage returns a page containing the account records that have
// `signer` as a signer. This doesn't return full account details resource
// because of the limitations of existing ingestion architecture. In a future,
// when the new ingestion system is fully integrated, this endpoint can be used
// to find accounts for signer but also accounts for assets, home domain,
// inflation_dest etc.
func (handler GetAccountsHandler) GetResourcePage(
func (handler getAccountsHandler) GetResourcePage(
w HeaderWriter,
r *http.Request,
) ([]hal.Pageable, error) {
Expand Down
7 changes: 2 additions & 5 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGetAccountsHandlerPageNoResults(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)

q := &history.Q{tt.HorizonSession()}
handler := &GetAccountsHandler{HistoryQ: q}
handler := NewAccounts(q)
records, err := handler.GetResourcePage(
httptest.NewRecorder(),
makeRequest(
Expand All @@ -43,7 +43,6 @@ func TestGetAccountsHandlerPageNoResults(t *testing.T) {
"signer": "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU",
},
map[string]string{},
q.Session,
),
)
tt.Assert.NoError(err)
Expand All @@ -56,7 +55,7 @@ func TestGetAccountsHandlerPageResults(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)

q := &history.Q{tt.HorizonSession()}
handler := &GetAccountsHandler{HistoryQ: q}
handler := NewAccounts(q)

rows := []history.AccountSigner{
history.AccountSigner{
Expand Down Expand Up @@ -88,7 +87,6 @@ func TestGetAccountsHandlerPageResults(t *testing.T) {
"signer": "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU",
},
map[string]string{},
q.Session,
),
)

Expand All @@ -111,7 +109,6 @@ func TestGetAccountsHandlerPageResults(t *testing.T) {
"cursor": "GABGMPEKKDWR2WFH5AJOZV5PDKLJEHGCR3Q24ALETWR5H3A7GI3YTS7V",
},
map[string]string{},
q.Session,
),
)

Expand Down
32 changes: 18 additions & 14 deletions services/horizon/internal/actions/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@ import (
"github.com/stellar/go/xdr"
)

// AssetStatsHandler is the action handler for the /asset endpoint
type AssetStatsHandler struct {
type assetStatsHandler struct {
historyQ *history.Q
}

func (handler AssetStatsHandler) validateAssetParams(code, issuer string, pq db2.PageQuery) error {
// NewAssetStats returns a PageHandler for the `/assets` endpoint
func NewAssetStats(historyQ *history.Q) PageHandler {
return repeatableReadPageHandler{
historyQ: historyQ,
withQ: func(q *history.Q) PageHandler {
return assetStatsHandler{q}
},
}
}

func (handler assetStatsHandler) validateAssetParams(code, issuer string, pq db2.PageQuery) error {
if code != "" {
if !validAssetCode.MatchString(code) {
return problem.MakeInvalidFieldProblem(
Expand Down Expand Up @@ -66,8 +76,7 @@ func (handler AssetStatsHandler) validateAssetParams(code, issuer string, pq db2
return nil
}

func (handler AssetStatsHandler) findIssuersForAssets(
historyQ *history.Q,
func (handler assetStatsHandler) findIssuersForAssets(
assetStats []history.ExpAssetStat,
) (map[string]history.AccountEntry, error) {
issuerSet := map[string]bool{}
Expand All @@ -81,7 +90,7 @@ func (handler AssetStatsHandler) findIssuersForAssets(
}

accountsByID := map[string]history.AccountEntry{}
accounts, err := historyQ.GetAccountsByIDs(issuers)
accounts, err := handler.historyQ.GetAccountsByIDs(issuers)
if err != nil {
return nil, err
}
Expand All @@ -103,7 +112,7 @@ func (handler AssetStatsHandler) findIssuersForAssets(
}

// GetResourcePage returns a page of offers.
func (handler AssetStatsHandler) GetResourcePage(
func (handler assetStatsHandler) GetResourcePage(
w HeaderWriter,
r *http.Request,
) ([]hal.Pageable, error) {
Expand All @@ -128,17 +137,12 @@ func (handler AssetStatsHandler) GetResourcePage(
return nil, err
}

historyQ, err := historyQFromRequest(r)
if err != nil {
return nil, err
}

assetStats, err := historyQ.GetAssetStats(code, issuer, pq)
assetStats, err := handler.historyQ.GetAssetStats(code, issuer, pq)
if err != nil {
return nil, err
}

issuerAccounts, err := handler.findIssuersForAssets(historyQ, assetStats)
issuerAccounts, err := handler.findIssuersForAssets(assetStats)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions services/horizon/internal/actions/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestAssetStatsValidation(t *testing.T) {
handler := AssetStatsHandler{}
handler := assetStatsHandler{}

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestAssetStatsValidation(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
r := makeRequest(t, testCase.queryParams, map[string]string{}, nil)
r := makeRequest(t, testCase.queryParams, map[string]string{})
_, err := handler.GetResourcePage(httptest.NewRecorder(), r)
if err == nil {
t.Fatalf("expected error %v but got %v", testCase.expectedError, err)
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestAssetStats(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}
handler := AssetStatsHandler{}
handler := NewAssetStats(q)

issuer := history.AccountEntry{
AccountID: "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestAssetStats(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
r := makeRequest(t, testCase.queryParams, map[string]string{}, q.Session)
r := makeRequest(t, testCase.queryParams, map[string]string{})
results, err := handler.GetResourcePage(httptest.NewRecorder(), r)
if err != nil {
t.Fatalf("unexpected error %v", err)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}
handler := AssetStatsHandler{}
handler := NewAssetStats(q)

usdAssetStat := history.ExpAssetStat{
AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4,
Expand All @@ -334,7 +334,7 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Equal(numChanged, int64(1))

r := makeRequest(t, map[string]string{}, map[string]string{}, q.Session)
r := makeRequest(t, map[string]string{}, map[string]string{})
_, err = handler.GetResourcePage(httptest.NewRecorder(), r)
if err == nil {
t.Fatal("error but got not nil")
Expand Down
122 changes: 122 additions & 0 deletions services/horizon/internal/actions/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package actions

import (
"database/sql"
"net/http"
"strconv"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
)

// LastLedgerHeaderName is the header which is set on all experimental ingestion endpoints
const LastLedgerHeaderName = "Latest-Ledger"

// HeaderWriter is an interface for setting HTTP response headers
type HeaderWriter interface {
Header() http.Header
}

// StreamableObjectResponse is an interface for objects returned by streamable object endpoints
// A streamable object endpoint is an SSE endpoint which returns a single JSON object response
// instead of a page of items.
type StreamableObjectResponse interface {
Equals(other StreamableObjectResponse) bool
}

// PageHandler represents a handler for a horizon endpoint which produces a response body
// consisting of a list of `hal.Pageable` items
type PageHandler interface {
GetResourcePage(w HeaderWriter, r *http.Request) ([]hal.Pageable, error)
}

// StreamableObjectHandler represents a handler for a horizon endpoint which produces a
// response body consisting of a single `StreamableObjectResponse` instance
type StreamableObjectHandler interface {
GetResource(
w HeaderWriter,
r *http.Request,
) (StreamableObjectResponse, error)
}

// ObjectHandler represents a handler for a horizon endpoint which produces a
// response body consisting of a single `hal.Pageable` instance
type ObjectHandler interface {
GetResource(
w HeaderWriter,
r *http.Request,
) (hal.Pageable, error)
}

func repeatableReadSession(source *history.Q, r *http.Request) (*history.Q, error) {
repeatableReadSession := source.Clone()
repeatableReadSession.Ctx = r.Context()
err := repeatableReadSession.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return nil, errors.Wrap(err, "could not begin repeatable read transaction")
}
return &history.Q{repeatableReadSession}, nil
}

// SetLastLedgerHeader sets the Latest-Ledger header
func SetLastLedgerHeader(w HeaderWriter, lastIngestedLedger uint32) {
w.Header().Set(LastLedgerHeaderName, strconv.FormatUint(uint64(lastIngestedLedger), 10))
}

func fetchAndSetLastLedgerHeader(w HeaderWriter, historyQ *history.Q) error {
lastIngestedLedger, err := historyQ.GetLastLedgerExpIngestNonBlocking()
if err != nil {
return errors.Wrap(err, "could not determine last ingested ledger")
}

SetLastLedgerHeader(w, lastIngestedLedger)
return nil
}

type repeatableReadPageHandler struct {
historyQ *history.Q
withQ func(*history.Q) PageHandler
}

func (handler repeatableReadPageHandler) GetResourcePage(
w HeaderWriter,
r *http.Request,
) ([]hal.Pageable, error) {
historyQ, err := repeatableReadSession(handler.historyQ, r)
if err != nil {
return nil, err
}
defer historyQ.Rollback()

if err = fetchAndSetLastLedgerHeader(w, historyQ); err != nil {
return nil, err
}

return handler.withQ(historyQ).GetResourcePage(w, r)
}

type repeatableReadObjectHandler struct {
historyQ *history.Q
withQ func(*history.Q) ObjectHandler
}

func (handler repeatableReadObjectHandler) GetResource(
w HeaderWriter,
r *http.Request,
) (hal.Pageable, error) {
historyQ, err := repeatableReadSession(handler.historyQ, r)
if err != nil {
return nil, err
}
defer historyQ.Rollback()

if err = fetchAndSetLastLedgerHeader(w, historyQ); err != nil {
return nil, err
}

return handler.withQ(historyQ).GetResource(w, r)
}
Loading

0 comments on commit 8d8cfc5

Please sign in to comment.