Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/actions: Remove inappropriate use of context in ingestion endpoints #1861

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Remove inappropriate use of context in ingestion endpoints
Commit 9161997 used a request context to ensure
that all queries to the horizon database from ingestion endpoints were wrapped
in a repeatable read transaction.

This commit refactors the ingestion endpoints to acheive the same effect without
using a context to store a request scoped database session.
  • Loading branch information
tamirms committed Oct 18, 2019
commit 8d8cfc5e4958832881bd7ed878fa22d247aae539
15 changes: 12 additions & 3 deletions services/horizon/internal/actions/account.go
Original file line number Diff line number Diff line change
@@ -54,18 +54,27 @@ func AccountInfo(ctx context.Context, cq *core.Q, addr string) (*protocol.Accoun
return &resource, errors.Wrap(err, "populating account")
}

// GetAccountsHandler is the action handler for the /accounts endpoint
type GetAccountsHandler struct {
type getAccountsHandler struct {
HistoryQ *history.Q
}

// NewAccounts returns a PageHandler for the `/accounts` endpoint
func NewAccounts(historyQ *history.Q) PageHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tamirms shall we use the same convention as we were using before so it's clear that we are creating a handler? so instead of NewAccounts we use NewAccountsHandler -- it's more explicit about what we are getting.

return repeatableReadPageHandler{
historyQ: historyQ,
withQ: func(q *history.Q) PageHandler {
return getAccountsHandler{q}
},
}
}
leighmcculloch marked this conversation as resolved.
Show resolved Hide resolved

// GetResourcePage returns a page containing the account records that have
// `signer` as a signer. This doesn't return full account details resource
// because of the limitations of existing ingestion architecture. In a future,
// when the new ingestion system is fully integrated, this endpoint can be used
// to find accounts for signer but also accounts for assets, home domain,
// inflation_dest etc.
func (handler GetAccountsHandler) GetResourcePage(
func (handler getAccountsHandler) GetResourcePage(
w HeaderWriter,
r *http.Request,
) ([]hal.Pageable, error) {
7 changes: 2 additions & 5 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ func TestGetAccountsHandlerPageNoResults(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)

q := &history.Q{tt.HorizonSession()}
handler := &GetAccountsHandler{HistoryQ: q}
handler := NewAccounts(q)
records, err := handler.GetResourcePage(
httptest.NewRecorder(),
makeRequest(
@@ -43,7 +43,6 @@ func TestGetAccountsHandlerPageNoResults(t *testing.T) {
"signer": "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU",
},
map[string]string{},
q.Session,
),
)
tt.Assert.NoError(err)
@@ -56,7 +55,7 @@ func TestGetAccountsHandlerPageResults(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)

q := &history.Q{tt.HorizonSession()}
handler := &GetAccountsHandler{HistoryQ: q}
handler := NewAccounts(q)

rows := []history.AccountSigner{
history.AccountSigner{
@@ -88,7 +87,6 @@ func TestGetAccountsHandlerPageResults(t *testing.T) {
"signer": "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU",
},
map[string]string{},
q.Session,
),
)

@@ -111,7 +109,6 @@ func TestGetAccountsHandlerPageResults(t *testing.T) {
"cursor": "GABGMPEKKDWR2WFH5AJOZV5PDKLJEHGCR3Q24ALETWR5H3A7GI3YTS7V",
},
map[string]string{},
q.Session,
),
)

32 changes: 18 additions & 14 deletions services/horizon/internal/actions/asset.go
Original file line number Diff line number Diff line change
@@ -15,11 +15,21 @@ import (
"github.com/stellar/go/xdr"
)

// AssetStatsHandler is the action handler for the /asset endpoint
type AssetStatsHandler struct {
type assetStatsHandler struct {
historyQ *history.Q
}

func (handler AssetStatsHandler) validateAssetParams(code, issuer string, pq db2.PageQuery) error {
// NewAssetStats returns a PageHandler for the `/assets` endpoint
func NewAssetStats(historyQ *history.Q) PageHandler {
return repeatableReadPageHandler{
historyQ: historyQ,
withQ: func(q *history.Q) PageHandler {
return assetStatsHandler{q}
},
}
}

func (handler assetStatsHandler) validateAssetParams(code, issuer string, pq db2.PageQuery) error {
if code != "" {
if !validAssetCode.MatchString(code) {
return problem.MakeInvalidFieldProblem(
@@ -66,8 +76,7 @@ func (handler AssetStatsHandler) validateAssetParams(code, issuer string, pq db2
return nil
}

func (handler AssetStatsHandler) findIssuersForAssets(
historyQ *history.Q,
func (handler assetStatsHandler) findIssuersForAssets(
assetStats []history.ExpAssetStat,
) (map[string]history.AccountEntry, error) {
issuerSet := map[string]bool{}
@@ -81,7 +90,7 @@ func (handler AssetStatsHandler) findIssuersForAssets(
}

accountsByID := map[string]history.AccountEntry{}
accounts, err := historyQ.GetAccountsByIDs(issuers)
accounts, err := handler.historyQ.GetAccountsByIDs(issuers)
if err != nil {
return nil, err
}
@@ -103,7 +112,7 @@ func (handler AssetStatsHandler) findIssuersForAssets(
}

// GetResourcePage returns a page of offers.
func (handler AssetStatsHandler) GetResourcePage(
func (handler assetStatsHandler) GetResourcePage(
w HeaderWriter,
r *http.Request,
) ([]hal.Pageable, error) {
@@ -128,17 +137,12 @@ func (handler AssetStatsHandler) GetResourcePage(
return nil, err
}

historyQ, err := historyQFromRequest(r)
if err != nil {
return nil, err
}

assetStats, err := historyQ.GetAssetStats(code, issuer, pq)
assetStats, err := handler.historyQ.GetAssetStats(code, issuer, pq)
if err != nil {
return nil, err
}

issuerAccounts, err := handler.findIssuersForAssets(historyQ, assetStats)
issuerAccounts, err := handler.findIssuersForAssets(assetStats)
if err != nil {
return nil, err
}
12 changes: 6 additions & 6 deletions services/horizon/internal/actions/asset_test.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ import (
)

func TestAssetStatsValidation(t *testing.T) {
handler := AssetStatsHandler{}
handler := assetStatsHandler{}

for _, testCase := range []struct {
name string
@@ -65,7 +65,7 @@ func TestAssetStatsValidation(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
r := makeRequest(t, testCase.queryParams, map[string]string{}, nil)
r := makeRequest(t, testCase.queryParams, map[string]string{})
_, err := handler.GetResourcePage(httptest.NewRecorder(), r)
if err == nil {
t.Fatalf("expected error %v but got %v", testCase.expectedError, err)
@@ -93,7 +93,7 @@ func TestAssetStats(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}
handler := AssetStatsHandler{}
handler := NewAssetStats(q)

issuer := history.AccountEntry{
AccountID: "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
@@ -292,7 +292,7 @@ func TestAssetStats(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
r := makeRequest(t, testCase.queryParams, map[string]string{}, q.Session)
r := makeRequest(t, testCase.queryParams, map[string]string{})
results, err := handler.GetResourcePage(httptest.NewRecorder(), r)
if err != nil {
t.Fatalf("unexpected error %v", err)
@@ -321,7 +321,7 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}
handler := AssetStatsHandler{}
handler := NewAssetStats(q)

usdAssetStat := history.ExpAssetStat{
AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4,
@@ -334,7 +334,7 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Equal(numChanged, int64(1))

r := makeRequest(t, map[string]string{}, map[string]string{}, q.Session)
r := makeRequest(t, map[string]string{}, map[string]string{})
_, err = handler.GetResourcePage(httptest.NewRecorder(), r)
if err == nil {
t.Fatal("error but got not nil")
122 changes: 122 additions & 0 deletions services/horizon/internal/actions/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package actions

import (
"database/sql"
"net/http"
"strconv"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
)

// LastLedgerHeaderName is the header which is set on all experimental ingestion endpoints
const LastLedgerHeaderName = "Latest-Ledger"

// HeaderWriter is an interface for setting HTTP response headers
type HeaderWriter interface {
Header() http.Header
}

// StreamableObjectResponse is an interface for objects returned by streamable object endpoints
// A streamable object endpoint is an SSE endpoint which returns a single JSON object response
// instead of a page of items.
type StreamableObjectResponse interface {
Equals(other StreamableObjectResponse) bool
}

// PageHandler represents a handler for a horizon endpoint which produces a response body
// consisting of a list of `hal.Pageable` items
type PageHandler interface {
GetResourcePage(w HeaderWriter, r *http.Request) ([]hal.Pageable, error)
}

// StreamableObjectHandler represents a handler for a horizon endpoint which produces a
// response body consisting of a single `StreamableObjectResponse` instance
type StreamableObjectHandler interface {
GetResource(
w HeaderWriter,
r *http.Request,
) (StreamableObjectResponse, error)
}

// ObjectHandler represents a handler for a horizon endpoint which produces a
// response body consisting of a single `hal.Pageable` instance
type ObjectHandler interface {
GetResource(
w HeaderWriter,
r *http.Request,
) (hal.Pageable, error)
}

func repeatableReadSession(source *history.Q, r *http.Request) (*history.Q, error) {
repeatableReadSession := source.Clone()
repeatableReadSession.Ctx = r.Context()
err := repeatableReadSession.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return nil, errors.Wrap(err, "could not begin repeatable read transaction")
}
return &history.Q{repeatableReadSession}, nil
}

// SetLastLedgerHeader sets the Latest-Ledger header
func SetLastLedgerHeader(w HeaderWriter, lastIngestedLedger uint32) {
w.Header().Set(LastLedgerHeaderName, strconv.FormatUint(uint64(lastIngestedLedger), 10))
}

func fetchAndSetLastLedgerHeader(w HeaderWriter, historyQ *history.Q) error {
lastIngestedLedger, err := historyQ.GetLastLedgerExpIngestNonBlocking()
if err != nil {
return errors.Wrap(err, "could not determine last ingested ledger")
}

SetLastLedgerHeader(w, lastIngestedLedger)
return nil
}

type repeatableReadPageHandler struct {
historyQ *history.Q
withQ func(*history.Q) PageHandler
}

func (handler repeatableReadPageHandler) GetResourcePage(
w HeaderWriter,
r *http.Request,
) ([]hal.Pageable, error) {
historyQ, err := repeatableReadSession(handler.historyQ, r)
if err != nil {
return nil, err
}
defer historyQ.Rollback()

if err = fetchAndSetLastLedgerHeader(w, historyQ); err != nil {
return nil, err
}

return handler.withQ(historyQ).GetResourcePage(w, r)
}

type repeatableReadObjectHandler struct {
historyQ *history.Q
withQ func(*history.Q) ObjectHandler
}

func (handler repeatableReadObjectHandler) GetResource(
w HeaderWriter,
r *http.Request,
) (hal.Pageable, error) {
historyQ, err := repeatableReadSession(handler.historyQ, r)
if err != nil {
return nil, err
}
defer historyQ.Rollback()

if err = fetchAndSetLastLedgerHeader(w, historyQ); err != nil {
return nil, err
}

return handler.withQ(historyQ).GetResource(w, r)
}
Loading