Skip to content

Commit

Permalink
services/horizon: Change ingest/ledgerbackend/LedgerBackend.GetLedger…
Browse files Browse the repository at this point in the history
… to be always blocking (#3576)

* Add ctx to TrustedLedgerHashStore

* Add ctx to LedgerBackend and TrustedHashStore

* Stop using old GetLedger

* Rename GetLedgerBlocking -> GetLedger and remove old from the API

* Clean up the GetLedger implementations

* Set timeouts on captive core server so long-polling works

* Re-use IsPrepared to simplify RemoteCaptiveCore.PrepareRange

* Add explicit 408 timeout to captive core server to ensure long-polling works

* Update changelog

* remove dead code

* startPrepareRange outlives the request context

* Remove Present flag from exp/services/captivecore/internal/api
  • Loading branch information
Paul Bellamy authored May 4, 2021
1 parent 73441a4 commit 36f465c
Show file tree
Hide file tree
Showing 33 changed files with 558 additions and 570 deletions.
29 changes: 17 additions & 12 deletions exp/services/captivecore/internal/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ func (c *CaptiveCoreAPI) Shutdown() {
c.core.Close()
}

func (c *CaptiveCoreAPI) startPrepareRange(ledgerRange ledgerbackend.Range) {
func (c *CaptiveCoreAPI) isShutdown() bool {
return c.ctx.Err() != nil
}

func (c *CaptiveCoreAPI) startPrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) {
defer c.wg.Done()

err := c.core.PrepareRange(ledgerRange)
err := c.core.PrepareRange(ctx, ledgerRange)

c.activeRequest.Lock()
defer c.activeRequest.Unlock()
if c.ctx.Err() != nil {
if c.isShutdown() {
return
}

Expand Down Expand Up @@ -100,10 +104,10 @@ func (c *CaptiveCoreAPI) startPrepareRange(ledgerRange ledgerbackend.Range) {
}

// PrepareRange executes the PrepareRange operation on the captive core instance.
func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerbackend.PrepareRangeResponse, error) {
func (c *CaptiveCoreAPI) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) (ledgerbackend.PrepareRangeResponse, error) {
c.activeRequest.Lock()
defer c.activeRequest.Unlock()
if c.ctx.Err() != nil {
if c.isShutdown() {
return ledgerbackend.PrepareRangeResponse{}, errors.New("Cannot prepare range when shut down")
}

Expand All @@ -121,7 +125,7 @@ func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerba
c.activeRequest.valid = true

c.wg.Add(1)
go c.startPrepareRange(ledgerRange)
go c.startPrepareRange(c.ctx, ledgerRange)

return ledgerbackend.PrepareRangeResponse{
LedgerRange: ledgerRange,
Expand All @@ -140,7 +144,7 @@ func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerba
}

// GetLatestLedgerSequence determines the latest ledger sequence available on the captive core instance.
func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSequenceResponse, error) {
func (c *CaptiveCoreAPI) GetLatestLedgerSequence(ctx context.Context) (ledgerbackend.LatestLedgerSequenceResponse, error) {
c.activeRequest.Lock()
defer c.activeRequest.Unlock()

Expand All @@ -151,15 +155,15 @@ func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSe
return ledgerbackend.LatestLedgerSequenceResponse{}, ErrPrepareRangeNotReady
}

seq, err := c.core.GetLatestLedgerSequence()
seq, err := c.core.GetLatestLedgerSequence(ctx)
if err != nil {
c.activeRequest.valid = false
}
return ledgerbackend.LatestLedgerSequenceResponse{Sequence: seq}, err
}

// GetLedger fetches the ledger with the given sequence number from the captive core instance.
func (c *CaptiveCoreAPI) GetLedger(sequence uint32) (ledgerbackend.LedgerResponse, error) {
func (c *CaptiveCoreAPI) GetLedger(ctx context.Context, sequence uint32) (ledgerbackend.LedgerResponse, error) {
c.activeRequest.Lock()
defer c.activeRequest.Unlock()

Expand All @@ -170,12 +174,13 @@ func (c *CaptiveCoreAPI) GetLedger(sequence uint32) (ledgerbackend.LedgerRespons
return ledgerbackend.LedgerResponse{}, ErrPrepareRangeNotReady
}

present, ledger, err := c.core.GetLedger(sequence)
ledger, err := c.core.GetLedger(ctx, sequence)
if err != nil {
c.activeRequest.valid = false
}
// TODO: We are always true here now, so this changes the semantics of this
// call a bit. We need to change the client to long-poll this endpoint.
return ledgerbackend.LedgerResponse{
Present: present,
Ledger: ledgerbackend.Base64Ledger(ledger),
Ledger: ledgerbackend.Base64Ledger(ledger),
}, err
}
47 changes: 25 additions & 22 deletions exp/services/captivecore/internal/api_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package internal

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest/ledgerbackend"
Expand All @@ -18,11 +20,13 @@ func TestAPITestSuite(t *testing.T) {

type APITestSuite struct {
suite.Suite
ctx context.Context
ledgerBackend *ledgerbackend.MockDatabaseBackend
api CaptiveCoreAPI
}

func (s *APITestSuite) SetupTest() {
s.ctx = context.Background()
s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{}
s.api = NewCaptiveCoreAPI(s.ledgerBackend, log.New())
}
Expand All @@ -32,23 +36,23 @@ func (s *APITestSuite) TearDownTest() {
}

func (s *APITestSuite) TestLatestSeqActiveRequestInvalid() {
_, err := s.api.GetLatestLedgerSequence()
_, err := s.api.GetLatestLedgerSequence(s.ctx)
s.Assert().Equal(err, ErrMissingPrepareRange)
}

func (s *APITestSuite) TestGetLedgerActiveRequestInvalid() {
_, err := s.api.GetLedger(64)
_, err := s.api.GetLedger(s.ctx, 64)
s.Assert().Equal(err, ErrMissingPrepareRange)
}

func (s *APITestSuite) runBeforeReady(prepareRangeErr error, f func()) {
waitChan := make(chan time.Time)
ledgerRange := ledgerbackend.UnboundedRange(63)
s.ledgerBackend.On("PrepareRange", ledgerRange).
s.ledgerBackend.On("PrepareRange", mock.Anything, ledgerRange).
WaitUntil(waitChan).
Return(prepareRangeErr).Once()

response, err := s.api.PrepareRange(ledgerRange)
response, err := s.api.PrepareRange(s.ctx, ledgerRange)
s.Assert().NoError(err)
s.Assert().False(response.Ready)
s.Assert().Equal(response.LedgerRange, ledgerRange)
Expand All @@ -61,23 +65,23 @@ func (s *APITestSuite) runBeforeReady(prepareRangeErr error, f func()) {

func (s *APITestSuite) TestLatestSeqActiveRequestNotReady() {
s.runBeforeReady(nil, func() {
_, err := s.api.GetLatestLedgerSequence()
_, err := s.api.GetLatestLedgerSequence(s.ctx)
s.Assert().Equal(err, ErrPrepareRangeNotReady)
})
}

func (s *APITestSuite) TestGetLedgerNotReady() {
s.runBeforeReady(nil, func() {
_, err := s.api.GetLedger(64)
_, err := s.api.GetLedger(s.ctx, 64)
s.Assert().Equal(err, ErrPrepareRangeNotReady)
})
}

func (s *APITestSuite) waitUntilReady(ledgerRange ledgerbackend.Range) {
s.ledgerBackend.On("PrepareRange", ledgerRange).
s.ledgerBackend.On("PrepareRange", mock.Anything, ledgerRange).
Return(nil).Once()

response, err := s.api.PrepareRange(ledgerRange)
response, err := s.api.PrepareRange(s.ctx, ledgerRange)
s.Assert().NoError(err)
s.Assert().False(response.Ready)
s.Assert().Equal(response.LedgerRange, ledgerRange)
Expand All @@ -89,29 +93,29 @@ func (s *APITestSuite) TestLatestSeqError() {
s.waitUntilReady(ledgerbackend.UnboundedRange(63))

expectedErr := fmt.Errorf("test error")
s.ledgerBackend.On("GetLatestLedgerSequence").Return(uint32(0), expectedErr).Once()
s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(0), expectedErr).Once()

_, err := s.api.GetLatestLedgerSequence()
_, err := s.api.GetLatestLedgerSequence(s.ctx)
s.Assert().Equal(err, expectedErr)
}

func (s *APITestSuite) TestGetLedgerError() {
s.waitUntilReady(ledgerbackend.UnboundedRange(63))

expectedErr := fmt.Errorf("test error")
s.ledgerBackend.On("GetLedger", uint32(64)).
Return(false, xdr.LedgerCloseMeta{}, expectedErr).Once()
s.ledgerBackend.On("GetLedger", s.ctx, uint32(64)).
Return(xdr.LedgerCloseMeta{}, expectedErr).Once()

_, err := s.api.GetLedger(64)
_, err := s.api.GetLedger(s.ctx, 64)
s.Assert().Equal(err, expectedErr)
}

func (s *APITestSuite) TestLatestSeqSucceeds() {
s.waitUntilReady(ledgerbackend.UnboundedRange(63))

expectedSeq := uint32(100)
s.ledgerBackend.On("GetLatestLedgerSequence").Return(expectedSeq, nil).Once()
seq, err := s.api.GetLatestLedgerSequence()
s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(expectedSeq, nil).Once()
seq, err := s.api.GetLatestLedgerSequence(s.ctx)
s.Assert().NoError(err)
s.Assert().Equal(seq, ledgerbackend.LatestLedgerSequenceResponse{Sequence: expectedSeq})
}
Expand All @@ -128,21 +132,20 @@ func (s *APITestSuite) TestGetLedgerSucceeds() {
},
},
}
s.ledgerBackend.On("GetLedger", uint32(64)).
Return(true, expectedLedger, nil).Once()
seq, err := s.api.GetLedger(64)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(64)).
Return(expectedLedger, nil).Once()
seq, err := s.api.GetLedger(s.ctx, 64)

s.Assert().NoError(err)
s.Assert().Equal(seq, ledgerbackend.LedgerResponse{
Present: true,
Ledger: ledgerbackend.Base64Ledger(expectedLedger),
Ledger: ledgerbackend.Base64Ledger(expectedLedger),
})
}

func (s *APITestSuite) TestShutDownBeforePrepareRange() {
s.ledgerBackend.On("Close").Return(nil).Once()
s.api.Shutdown()
_, err := s.api.PrepareRange(ledgerbackend.UnboundedRange(63))
_, err := s.api.PrepareRange(s.ctx, ledgerbackend.UnboundedRange(63))
s.Assert().EqualError(err, "Cannot prepare range when shut down")
}

Expand Down Expand Up @@ -226,7 +229,7 @@ func (s *APITestSuite) TestRangeAlreadyPrepared() {
ledgerbackend.UnboundedRange(100),
ledgerbackend.BoundedRange(63, 70),
} {
response, err := s.api.PrepareRange(ledgerRange)
response, err := s.api.PrepareRange(s.ctx, ledgerRange)
s.Assert().NoError(err)
s.Assert().True(response.Ready)
s.Assert().Equal(superSetRange, response.LedgerRange)
Expand Down
26 changes: 22 additions & 4 deletions exp/services/captivecore/internal/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package internal

import (
"context"
"encoding/json"
"net/http"
"time"

"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
Expand Down Expand Up @@ -39,7 +41,7 @@ func Handler(api CaptiveCoreAPI) http.Handler {
mux := supporthttp.NewMux(api.log)

mux.Get("/latest-sequence", func(w http.ResponseWriter, r *http.Request) {
response, err := api.GetLatestLedgerSequence()
response, err := api.GetLatestLedgerSequence(r.Context())
serializeResponse(api.log, w, r, response, err)
})

Expand All @@ -51,8 +53,24 @@ func Handler(api CaptiveCoreAPI) http.Handler {
return
}

response, err := api.GetLedger(req.Sequence)
serializeResponse(api.log, w, r, response, err)
// must be shorter than the RemoteCaptiveCore http client timeout.
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel() // release timer

var response ledgerbackend.LedgerResponse
var err error
done := make(chan struct{})
go func() {
response, err = api.GetLedger(ctx, req.Sequence)
close(done)
}()

select {
case <-ctx.Done():
w.WriteHeader(http.StatusRequestTimeout)
case <-done:
serializeResponse(api.log, w, r, response, err)
}
})

mux.Post("/prepare-range", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -63,7 +81,7 @@ func Handler(api CaptiveCoreAPI) http.Handler {
return
}

response, err := api.PrepareRange(ledgerRange)
response, err := api.PrepareRange(r.Context(), ledgerRange)
serializeResponse(api.log, w, r, response, err)
})

Expand Down
Loading

0 comments on commit 36f465c

Please sign in to comment.