From 36f465cfe0fc5ae87689723edc23b6a73191d165 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Tue, 4 May 2021 18:29:14 +0100 Subject: [PATCH] services/horizon: Change ingest/ledgerbackend/LedgerBackend.GetLedger to be always blocking (#3576) * Add ctx to TrustedLedgerHashStore * Add ctx to LedgerBackend and TrustedHashStore * Stop using old GetLedger * Rename GetLedgerBlocking -> GetLedger and remove old from the API * Clean up the GetLedger implementations * Set timeouts on captive core server so long-polling works * Re-use IsPrepared to simplify RemoteCaptiveCore.PrepareRange * Add explicit 408 timeout to captive core server to ensure long-polling works * Update changelog * remove dead code * startPrepareRange outlives the request context * Remove Present flag from exp/services/captivecore/internal/api --- exp/services/captivecore/internal/api.go | 29 +-- exp/services/captivecore/internal/api_test.go | 47 ++--- exp/services/captivecore/internal/server.go | 26 ++- .../captivecore/internal/server_test.go | 64 ++++-- exp/services/captivecore/main.go | 5 +- exp/tools/captive-core-start-tester/main.go | 11 +- ingest/doc_test.go | 5 +- ingest/ledger_change_reader.go | 5 +- ingest/ledger_change_reader_test.go | 57 +++--- ingest/ledger_transaction_reader.go | 9 +- ingest/ledgerbackend/captive_core_backend.go | 186 +++++++----------- .../captive_core_backend_test.go | 148 +++++++------- ingest/ledgerbackend/database_backend.go | 53 ++--- ingest/ledgerbackend/ledger_backend.go | 12 +- ingest/ledgerbackend/ledger_hash_store.go | 22 +-- ingest/ledgerbackend/mock_database_backend.go | 23 +-- ingest/ledgerbackend/remote_captive_core.go | 139 +++++-------- .../ledgerbackend/remote_captive_core_test.go | 74 +++++++ ingest/tutorial/example_hello.go | 10 +- ingest/tutorial/example_statistics.go | 7 +- ingest/tutorial/getting-started.md | 8 +- services/horizon/CHANGELOG.md | 4 + .../internal/db2/history/ledger_test.go | 6 +- .../internal/ingest/build_state_test.go | 16 +- .../internal/ingest/database_backend_test.go | 8 +- .../internal/ingest/db_integration_test.go | 4 +- .../internal/ingest/fake_ledger_backend.go | 14 +- services/horizon/internal/ingest/fsm.go | 39 ++-- .../ingest/ingest_history_range_state_test.go | 32 +-- services/horizon/internal/ingest/main.go | 6 +- services/horizon/internal/ingest/main_test.go | 21 +- .../internal/ingest/resume_state_test.go | 22 +-- .../ingest/verify_range_state_test.go | 16 +- 33 files changed, 558 insertions(+), 570 deletions(-) create mode 100644 ingest/ledgerbackend/remote_captive_core_test.go diff --git a/exp/services/captivecore/internal/api.go b/exp/services/captivecore/internal/api.go index 9e666e565c..b39f7c0854 100644 --- a/exp/services/captivecore/internal/api.go +++ b/exp/services/captivecore/internal/api.go @@ -63,14 +63,18 @@ func (c *CaptiveCoreAPI) Shutdown() { c.core.Close() } -func (c *CaptiveCoreAPI) startPrepareRange(ledgerRange ledgerbackend.Range) { +func (c *CaptiveCoreAPI) isShutdown() bool { + return c.ctx.Err() != nil +} + +func (c *CaptiveCoreAPI) startPrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) { defer c.wg.Done() - err := c.core.PrepareRange(ledgerRange) + err := c.core.PrepareRange(ctx, ledgerRange) c.activeRequest.Lock() defer c.activeRequest.Unlock() - if c.ctx.Err() != nil { + if c.isShutdown() { return } @@ -100,10 +104,10 @@ func (c *CaptiveCoreAPI) startPrepareRange(ledgerRange ledgerbackend.Range) { } // PrepareRange executes the PrepareRange operation on the captive core instance. -func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerbackend.PrepareRangeResponse, error) { +func (c *CaptiveCoreAPI) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) (ledgerbackend.PrepareRangeResponse, error) { c.activeRequest.Lock() defer c.activeRequest.Unlock() - if c.ctx.Err() != nil { + if c.isShutdown() { return ledgerbackend.PrepareRangeResponse{}, errors.New("Cannot prepare range when shut down") } @@ -121,7 +125,7 @@ func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerba c.activeRequest.valid = true c.wg.Add(1) - go c.startPrepareRange(ledgerRange) + go c.startPrepareRange(c.ctx, ledgerRange) return ledgerbackend.PrepareRangeResponse{ LedgerRange: ledgerRange, @@ -140,7 +144,7 @@ func (c *CaptiveCoreAPI) PrepareRange(ledgerRange ledgerbackend.Range) (ledgerba } // GetLatestLedgerSequence determines the latest ledger sequence available on the captive core instance. -func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSequenceResponse, error) { +func (c *CaptiveCoreAPI) GetLatestLedgerSequence(ctx context.Context) (ledgerbackend.LatestLedgerSequenceResponse, error) { c.activeRequest.Lock() defer c.activeRequest.Unlock() @@ -151,7 +155,7 @@ func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSe return ledgerbackend.LatestLedgerSequenceResponse{}, ErrPrepareRangeNotReady } - seq, err := c.core.GetLatestLedgerSequence() + seq, err := c.core.GetLatestLedgerSequence(ctx) if err != nil { c.activeRequest.valid = false } @@ -159,7 +163,7 @@ func (c *CaptiveCoreAPI) GetLatestLedgerSequence() (ledgerbackend.LatestLedgerSe } // GetLedger fetches the ledger with the given sequence number from the captive core instance. -func (c *CaptiveCoreAPI) GetLedger(sequence uint32) (ledgerbackend.LedgerResponse, error) { +func (c *CaptiveCoreAPI) GetLedger(ctx context.Context, sequence uint32) (ledgerbackend.LedgerResponse, error) { c.activeRequest.Lock() defer c.activeRequest.Unlock() @@ -170,12 +174,13 @@ func (c *CaptiveCoreAPI) GetLedger(sequence uint32) (ledgerbackend.LedgerRespons return ledgerbackend.LedgerResponse{}, ErrPrepareRangeNotReady } - present, ledger, err := c.core.GetLedger(sequence) + ledger, err := c.core.GetLedger(ctx, sequence) if err != nil { c.activeRequest.valid = false } + // TODO: We are always true here now, so this changes the semantics of this + // call a bit. We need to change the client to long-poll this endpoint. return ledgerbackend.LedgerResponse{ - Present: present, - Ledger: ledgerbackend.Base64Ledger(ledger), + Ledger: ledgerbackend.Base64Ledger(ledger), }, err } diff --git a/exp/services/captivecore/internal/api_test.go b/exp/services/captivecore/internal/api_test.go index 223f5e9bb4..b9c96bf8b4 100644 --- a/exp/services/captivecore/internal/api_test.go +++ b/exp/services/captivecore/internal/api_test.go @@ -1,10 +1,12 @@ package internal import ( + "context" "fmt" "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest/ledgerbackend" @@ -18,11 +20,13 @@ func TestAPITestSuite(t *testing.T) { type APITestSuite struct { suite.Suite + ctx context.Context ledgerBackend *ledgerbackend.MockDatabaseBackend api CaptiveCoreAPI } func (s *APITestSuite) SetupTest() { + s.ctx = context.Background() s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} s.api = NewCaptiveCoreAPI(s.ledgerBackend, log.New()) } @@ -32,23 +36,23 @@ func (s *APITestSuite) TearDownTest() { } func (s *APITestSuite) TestLatestSeqActiveRequestInvalid() { - _, err := s.api.GetLatestLedgerSequence() + _, err := s.api.GetLatestLedgerSequence(s.ctx) s.Assert().Equal(err, ErrMissingPrepareRange) } func (s *APITestSuite) TestGetLedgerActiveRequestInvalid() { - _, err := s.api.GetLedger(64) + _, err := s.api.GetLedger(s.ctx, 64) s.Assert().Equal(err, ErrMissingPrepareRange) } func (s *APITestSuite) runBeforeReady(prepareRangeErr error, f func()) { waitChan := make(chan time.Time) ledgerRange := ledgerbackend.UnboundedRange(63) - s.ledgerBackend.On("PrepareRange", ledgerRange). + s.ledgerBackend.On("PrepareRange", mock.Anything, ledgerRange). WaitUntil(waitChan). Return(prepareRangeErr).Once() - response, err := s.api.PrepareRange(ledgerRange) + response, err := s.api.PrepareRange(s.ctx, ledgerRange) s.Assert().NoError(err) s.Assert().False(response.Ready) s.Assert().Equal(response.LedgerRange, ledgerRange) @@ -61,23 +65,23 @@ func (s *APITestSuite) runBeforeReady(prepareRangeErr error, f func()) { func (s *APITestSuite) TestLatestSeqActiveRequestNotReady() { s.runBeforeReady(nil, func() { - _, err := s.api.GetLatestLedgerSequence() + _, err := s.api.GetLatestLedgerSequence(s.ctx) s.Assert().Equal(err, ErrPrepareRangeNotReady) }) } func (s *APITestSuite) TestGetLedgerNotReady() { s.runBeforeReady(nil, func() { - _, err := s.api.GetLedger(64) + _, err := s.api.GetLedger(s.ctx, 64) s.Assert().Equal(err, ErrPrepareRangeNotReady) }) } func (s *APITestSuite) waitUntilReady(ledgerRange ledgerbackend.Range) { - s.ledgerBackend.On("PrepareRange", ledgerRange). + s.ledgerBackend.On("PrepareRange", mock.Anything, ledgerRange). Return(nil).Once() - response, err := s.api.PrepareRange(ledgerRange) + response, err := s.api.PrepareRange(s.ctx, ledgerRange) s.Assert().NoError(err) s.Assert().False(response.Ready) s.Assert().Equal(response.LedgerRange, ledgerRange) @@ -89,9 +93,9 @@ func (s *APITestSuite) TestLatestSeqError() { s.waitUntilReady(ledgerbackend.UnboundedRange(63)) expectedErr := fmt.Errorf("test error") - s.ledgerBackend.On("GetLatestLedgerSequence").Return(uint32(0), expectedErr).Once() + s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(0), expectedErr).Once() - _, err := s.api.GetLatestLedgerSequence() + _, err := s.api.GetLatestLedgerSequence(s.ctx) s.Assert().Equal(err, expectedErr) } @@ -99,10 +103,10 @@ func (s *APITestSuite) TestGetLedgerError() { s.waitUntilReady(ledgerbackend.UnboundedRange(63)) expectedErr := fmt.Errorf("test error") - s.ledgerBackend.On("GetLedger", uint32(64)). - Return(false, xdr.LedgerCloseMeta{}, expectedErr).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(64)). + Return(xdr.LedgerCloseMeta{}, expectedErr).Once() - _, err := s.api.GetLedger(64) + _, err := s.api.GetLedger(s.ctx, 64) s.Assert().Equal(err, expectedErr) } @@ -110,8 +114,8 @@ func (s *APITestSuite) TestLatestSeqSucceeds() { s.waitUntilReady(ledgerbackend.UnboundedRange(63)) expectedSeq := uint32(100) - s.ledgerBackend.On("GetLatestLedgerSequence").Return(expectedSeq, nil).Once() - seq, err := s.api.GetLatestLedgerSequence() + s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(expectedSeq, nil).Once() + seq, err := s.api.GetLatestLedgerSequence(s.ctx) s.Assert().NoError(err) s.Assert().Equal(seq, ledgerbackend.LatestLedgerSequenceResponse{Sequence: expectedSeq}) } @@ -128,21 +132,20 @@ func (s *APITestSuite) TestGetLedgerSucceeds() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(64)). - Return(true, expectedLedger, nil).Once() - seq, err := s.api.GetLedger(64) + s.ledgerBackend.On("GetLedger", s.ctx, uint32(64)). + Return(expectedLedger, nil).Once() + seq, err := s.api.GetLedger(s.ctx, 64) s.Assert().NoError(err) s.Assert().Equal(seq, ledgerbackend.LedgerResponse{ - Present: true, - Ledger: ledgerbackend.Base64Ledger(expectedLedger), + Ledger: ledgerbackend.Base64Ledger(expectedLedger), }) } func (s *APITestSuite) TestShutDownBeforePrepareRange() { s.ledgerBackend.On("Close").Return(nil).Once() s.api.Shutdown() - _, err := s.api.PrepareRange(ledgerbackend.UnboundedRange(63)) + _, err := s.api.PrepareRange(s.ctx, ledgerbackend.UnboundedRange(63)) s.Assert().EqualError(err, "Cannot prepare range when shut down") } @@ -226,7 +229,7 @@ func (s *APITestSuite) TestRangeAlreadyPrepared() { ledgerbackend.UnboundedRange(100), ledgerbackend.BoundedRange(63, 70), } { - response, err := s.api.PrepareRange(ledgerRange) + response, err := s.api.PrepareRange(s.ctx, ledgerRange) s.Assert().NoError(err) s.Assert().True(response.Ready) s.Assert().Equal(superSetRange, response.LedgerRange) diff --git a/exp/services/captivecore/internal/server.go b/exp/services/captivecore/internal/server.go index b451a35c93..127444611f 100644 --- a/exp/services/captivecore/internal/server.go +++ b/exp/services/captivecore/internal/server.go @@ -1,8 +1,10 @@ package internal import ( + "context" "encoding/json" "net/http" + "time" "github.com/stellar/go/ingest/ledgerbackend" supporthttp "github.com/stellar/go/support/http" @@ -39,7 +41,7 @@ func Handler(api CaptiveCoreAPI) http.Handler { mux := supporthttp.NewMux(api.log) mux.Get("/latest-sequence", func(w http.ResponseWriter, r *http.Request) { - response, err := api.GetLatestLedgerSequence() + response, err := api.GetLatestLedgerSequence(r.Context()) serializeResponse(api.log, w, r, response, err) }) @@ -51,8 +53,24 @@ func Handler(api CaptiveCoreAPI) http.Handler { return } - response, err := api.GetLedger(req.Sequence) - serializeResponse(api.log, w, r, response, err) + // must be shorter than the RemoteCaptiveCore http client timeout. + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() // release timer + + var response ledgerbackend.LedgerResponse + var err error + done := make(chan struct{}) + go func() { + response, err = api.GetLedger(ctx, req.Sequence) + close(done) + }() + + select { + case <-ctx.Done(): + w.WriteHeader(http.StatusRequestTimeout) + case <-done: + serializeResponse(api.log, w, r, response, err) + } }) mux.Post("/prepare-range", func(w http.ResponseWriter, r *http.Request) { @@ -63,7 +81,7 @@ func Handler(api CaptiveCoreAPI) http.Handler { return } - response, err := api.PrepareRange(ledgerRange) + response, err := api.PrepareRange(r.Context(), ledgerRange) serializeResponse(api.log, w, r, response, err) }) diff --git a/exp/services/captivecore/internal/server_test.go b/exp/services/captivecore/internal/server_test.go index 8339641e2b..86ad989191 100644 --- a/exp/services/captivecore/internal/server_test.go +++ b/exp/services/captivecore/internal/server_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest/ledgerbackend" @@ -22,6 +23,7 @@ func TestServerTestSuite(t *testing.T) { type ServerTestSuite struct { suite.Suite + ctx context.Context ledgerBackend *ledgerbackend.MockDatabaseBackend api CaptiveCoreAPI handler http.Handler @@ -30,13 +32,13 @@ type ServerTestSuite struct { } func (s *ServerTestSuite) SetupTest() { + s.ctx = context.Background() s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} s.api = NewCaptiveCoreAPI(s.ledgerBackend, log.New()) s.handler = Handler(s.api) s.server = httptest.NewServer(s.handler) var err error s.client, err = ledgerbackend.NewRemoteCaptive( - context.Background(), s.server.URL, ledgerbackend.PrepareRangePollInterval(time.Millisecond), ) @@ -54,9 +56,9 @@ func (s *ServerTestSuite) TestLatestSequence() { s.api.activeRequest.ready = true expectedSeq := uint32(100) - s.ledgerBackend.On("GetLatestLedgerSequence").Return(expectedSeq, nil).Once() + s.ledgerBackend.On("GetLatestLedgerSequence", mock.Anything).Return(expectedSeq, nil).Once() - seq, err := s.client.GetLatestLedgerSequence() + seq, err := s.client.GetLatestLedgerSequence(s.ctx) s.Assert().NoError(err) s.Assert().Equal(expectedSeq, seq) } @@ -65,34 +67,34 @@ func (s *ServerTestSuite) TestLatestSequenceError() { s.api.activeRequest.valid = true s.api.activeRequest.ready = true - s.ledgerBackend.On("GetLatestLedgerSequence").Return(uint32(100), fmt.Errorf("test error")).Once() + s.ledgerBackend.On("GetLatestLedgerSequence", mock.Anything).Return(uint32(100), fmt.Errorf("test error")).Once() - _, err := s.client.GetLatestLedgerSequence() + _, err := s.client.GetLatestLedgerSequence(s.ctx) s.Assert().EqualError(err, "test error") } func (s *ServerTestSuite) TestPrepareBoundedRange() { ledgerRange := ledgerbackend.BoundedRange(10, 30) - s.ledgerBackend.On("PrepareRange", ledgerRange). + s.ledgerBackend.On("PrepareRange", mock.Anything, ledgerRange). Return(nil).Once() - s.Assert().NoError(s.client.PrepareRange(ledgerRange)) + s.Assert().NoError(s.client.PrepareRange(s.ctx, ledgerRange)) s.Assert().True(s.api.activeRequest.ready) - prepared, err := s.client.IsPrepared(ledgerRange) + prepared, err := s.client.IsPrepared(s.ctx, ledgerRange) s.Assert().NoError(err) s.Assert().True(prepared) } func (s *ServerTestSuite) TestPrepareUnboundedRange() { ledgerRange := ledgerbackend.UnboundedRange(100) - s.ledgerBackend.On("PrepareRange", ledgerRange). + s.ledgerBackend.On("PrepareRange", mock.Anything, ledgerRange). Return(nil).Once() - s.Assert().NoError(s.client.PrepareRange(ledgerRange)) + s.Assert().NoError(s.client.PrepareRange(s.ctx, ledgerRange)) s.Assert().True(s.api.activeRequest.ready) - prepared, err := s.client.IsPrepared(ledgerRange) + prepared, err := s.client.IsPrepared(s.ctx, ledgerRange) s.Assert().NoError(err) s.Assert().True(prepared) } @@ -102,13 +104,14 @@ func (s *ServerTestSuite) TestPrepareError() { s.api.Shutdown() s.Assert().EqualError( - s.client.PrepareRange(ledgerbackend.UnboundedRange(100)), + s.client.PrepareRange(s.ctx, ledgerbackend.UnboundedRange(100)), "Cannot prepare range when shut down", ) } func (s *ServerTestSuite) TestGetLedgerInvalidSequence() { req := httptest.NewRequest("GET", "/ledger/abcdef", nil) + req = req.WithContext(s.ctx) w := httptest.NewRecorder() s.handler.ServeHTTP(w, req) @@ -126,10 +129,10 @@ func (s *ServerTestSuite) TestGetLedgerError() { s.api.activeRequest.ready = true expectedErr := fmt.Errorf("test error") - s.ledgerBackend.On("GetLedger", uint32(64)). - Return(false, xdr.LedgerCloseMeta{}, expectedErr).Once() + s.ledgerBackend.On("GetLedger", mock.Anything, uint32(64)). + Return(xdr.LedgerCloseMeta{}, expectedErr).Once() - _, _, err := s.client.GetLedger(64) + _, err := s.client.GetLedger(s.ctx, 64) s.Assert().EqualError(err, "test error") } @@ -146,11 +149,34 @@ func (s *ServerTestSuite) TestGetLedgerSucceeds() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(64)). - Return(true, expectedLedger, nil).Once() + s.ledgerBackend.On("GetLedger", mock.Anything, uint32(64)). + Return(expectedLedger, nil).Once() - present, ledger, err := s.client.GetLedger(64) + ledger, err := s.client.GetLedger(s.ctx, 64) + s.Assert().NoError(err) + s.Assert().Equal(expectedLedger, ledger) +} + +func (s *ServerTestSuite) TestGetLedgerTakesAWhile() { + s.api.activeRequest.valid = true + s.api.activeRequest.ready = true + + expectedLedger := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: 64, + }, + }, + }, + } + s.ledgerBackend.On("GetLedger", mock.Anything, uint32(64)). + Run(func(mock.Arguments) { time.Sleep(6 * time.Second) }). + Return(xdr.LedgerCloseMeta{}, nil).Once() + s.ledgerBackend.On("GetLedger", mock.Anything, uint32(64)). + Return(expectedLedger, nil).Once() + + ledger, err := s.client.GetLedger(s.ctx, 64) s.Assert().NoError(err) - s.Assert().True(present) s.Assert().Equal(expectedLedger, ledger) } diff --git a/exp/services/captivecore/main.go b/exp/services/captivecore/main.go index d0f2fdd425..c8c2d425ab 100644 --- a/exp/services/captivecore/main.go +++ b/exp/services/captivecore/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "go/types" "strings" @@ -139,7 +138,7 @@ func main() { if err != nil { logger.WithError(err).Fatal("Could not create db connection instance") } - captiveConfig.LedgerHashStore = ledgerbackend.NewHorizonDBLedgerHashStore(context.Background(), dbConn) + captiveConfig.LedgerHashStore = ledgerbackend.NewHorizonDBLedgerHashStore(dbConn) } core, err := ledgerbackend.NewCaptive(captiveConfig) @@ -155,6 +154,8 @@ func main() { logger.Infof("Starting Captive Core server on %v", port) }, OnStopping: func() { + // TODO: Check this aborts in-progress requests instead of letting + // them finish, to preserve existing behaviour. api.Shutdown() if dbConn != nil { dbConn.Close() diff --git a/exp/tools/captive-core-start-tester/main.go b/exp/tools/captive-core-start-tester/main.go index 18984818ed..9e1183bd5c 100644 --- a/exp/tools/captive-core-start-tester/main.go +++ b/exp/tools/captive-core-start-tester/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/stellar/go/ingest/ledgerbackend" @@ -36,23 +37,19 @@ func check(ledger uint32) bool { } defer c.Close() - err = c.PrepareRange(ledgerbackend.UnboundedRange(ledger)) + ctx := context.Background() + err = c.PrepareRange(ctx, ledgerbackend.UnboundedRange(ledger)) if err != nil { fmt.Println(err) return false } - ok, meta, err := c.GetLedger(ledger) + meta, err := c.GetLedger(ctx, ledger) if err != nil { fmt.Println(err) return false } - if !ok { - fmt.Println("no ledger") - return false - } - if meta.LedgerSequence() != ledger { fmt.Println("wrong ledger", meta.LedgerSequence()) return false diff --git a/ingest/doc_test.go b/ingest/doc_test.go index efad710140..5277add909 100644 --- a/ingest/doc_test.go +++ b/ingest/doc_test.go @@ -64,6 +64,7 @@ func Example_ledgerentrieshistoryarchive() { // for a specific ledger using captive stellar-core. Please note that transaction // meta IS available when using this backend. func Example_changes() { + ctx := context.Background() archiveURL := "http://history.stellar.org/prd/core-live/core_live_001" networkPassphrase := network.PublicNetworkPassphrase @@ -82,12 +83,12 @@ func Example_changes() { sequence := uint32(3) - err = backend.PrepareRange(ledgerbackend.SingleLedgerRange(sequence)) + err = backend.PrepareRange(ctx, ledgerbackend.SingleLedgerRange(sequence)) if err != nil { panic(err) } - changeReader, err := NewLedgerChangeReader(backend, networkPassphrase, sequence) + changeReader, err := NewLedgerChangeReader(ctx, backend, networkPassphrase, sequence) if err != nil { panic(err) } diff --git a/ingest/ledger_change_reader.go b/ingest/ledger_change_reader.go index f693963070..e1aa3fb2ed 100644 --- a/ingest/ledger_change_reader.go +++ b/ingest/ledger_change_reader.go @@ -1,6 +1,7 @@ package ingest import ( + "context" "io" "github.com/stellar/go/ingest/ledgerbackend" @@ -46,8 +47,8 @@ var _ ChangeReader = (*LedgerChangeReader)(nil) // NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. // Note that the returned LedgerChangeReader is not thread safe and should not be shared // by multiple goroutines. -func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerChangeReader, error) { - transactionReader, err := NewLedgerTransactionReader(backend, networkPassphrase, sequence) +func NewLedgerChangeReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerChangeReader, error) { + transactionReader, err := NewLedgerTransactionReader(ctx, backend, networkPassphrase, sequence) if err != nil { return nil, err } diff --git a/ingest/ledger_change_reader_test.go b/ingest/ledger_change_reader_test.go index b338ba935a..c461ef28c3 100644 --- a/ingest/ledger_change_reader_test.go +++ b/ingest/ledger_change_reader_test.go @@ -1,6 +1,7 @@ package ingest import ( + "context" "fmt" "io" "testing" @@ -19,14 +20,14 @@ const ( ) func TestNewLedgerChangeReaderFails(t *testing.T) { + ctx := context.Background() mock := &ledgerbackend.MockDatabaseBackend{} seq := uint32(123) - mock.On("GetLedger", seq).Return( - true, + mock.On("GetLedger", ctx, seq).Return( xdr.LedgerCloseMeta{}, fmt.Errorf("ledger error"), ).Once() - _, err := NewLedgerChangeReader(mock, network.TestNetworkPassphrase, seq) + _, err := NewLedgerChangeReader(ctx, mock, network.TestNetworkPassphrase, seq) assert.EqualError( t, err, @@ -34,23 +35,8 @@ func TestNewLedgerChangeReaderFails(t *testing.T) { ) } -func TestNewLedgerChangeReaderLedgerDoesNotExist(t *testing.T) { - mock := &ledgerbackend.MockDatabaseBackend{} - seq := uint32(123) - mock.On("GetLedger", seq).Return( - false, - xdr.LedgerCloseMeta{}, - nil, - ).Once() - _, err := NewLedgerChangeReader(mock, network.TestNetworkPassphrase, seq) - assert.Equal( - t, - err, - ErrNotFound, - ) -} - func TestNewLedgerChangeReaderSucceeds(t *testing.T) { + ctx := context.Background() mock := &ledgerbackend.MockDatabaseBackend{} seq := uint32(123) @@ -61,8 +47,7 @@ func TestNewLedgerChangeReaderSucceeds(t *testing.T) { }, } - mock.On("GetLedger", seq).Return( - true, + mock.On("GetLedger", ctx, seq).Return( xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: header, @@ -71,7 +56,7 @@ func TestNewLedgerChangeReaderSucceeds(t *testing.T) { nil, ).Once() - reader, err := NewLedgerChangeReader(mock, network.TestNetworkPassphrase, seq) + reader, err := NewLedgerChangeReader(ctx, mock, network.TestNetworkPassphrase, seq) assert.NoError(t, err) assert.Equal(t, reader.GetHeader(), header) @@ -106,11 +91,12 @@ func parseChange(change Change) balanceEntry { func assertChangesEqual( t *testing.T, + ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, expected []balanceEntry, ) { - reader, err := NewLedgerChangeReader(backend, network.TestNetworkPassphrase, sequence) + reader, err := NewLedgerChangeReader(ctx, backend, network.TestNetworkPassphrase, sequence) assert.NoError(t, err) changes := []balanceEntry{} @@ -130,6 +116,7 @@ func assertChangesEqual( } func TestLedgerChangeReaderOrder(t *testing.T) { + ctx := context.Background() mock := &ledgerbackend.MockDatabaseBackend{} seq := uint32(123) @@ -235,9 +222,9 @@ func TestLedgerChangeReaderOrder(t *testing.T) { }, }, } - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() - assertChangesEqual(t, seq, mock, []balanceEntry{ + assertChangesEqual(t, ctx, seq, mock, []balanceEntry{ {feeAddress, 100}, {feeAddress, 200}, {feeAddress, 300}, @@ -253,8 +240,8 @@ func TestLedgerChangeReaderOrder(t *testing.T) { mock.AssertExpectations(t) ledger.V0.LedgerHeader.Header.LedgerVersion = 8 - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - _, err = NewLedgerChangeReader(mock, network.TestNetworkPassphrase, seq) + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() + _, err = NewLedgerChangeReader(ctx, mock, network.TestNetworkPassphrase, seq) assert.EqualError( t, err, @@ -266,9 +253,9 @@ func TestLedgerChangeReaderOrder(t *testing.T) { ledger.V0.LedgerHeader.Header.LedgerVersion = 9 ledger.V0.TxProcessing[0].FeeProcessing = xdr.LedgerEntryChanges{} ledger.V0.TxProcessing[1].FeeProcessing = xdr.LedgerEntryChanges{} - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() - assertChangesEqual(t, seq, mock, []balanceEntry{ + assertChangesEqual(t, ctx, seq, mock, []balanceEntry{ {metaAddress, 300}, {metaAddress, 400}, {metaAddress, 600}, @@ -283,9 +270,9 @@ func TestLedgerChangeReaderOrder(t *testing.T) { ledger.V0.LedgerHeader.Header.LedgerVersion = 10 ledger.V0.TxProcessing[0].FeeProcessing = xdr.LedgerEntryChanges{} ledger.V0.TxProcessing[1].FeeProcessing = xdr.LedgerEntryChanges{} - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() - assertChangesEqual(t, seq, mock, []balanceEntry{ + assertChangesEqual(t, ctx, seq, mock, []balanceEntry{ {metaAddress, 300}, {metaAddress, 400}, {metaAddress, 600}, @@ -305,9 +292,9 @@ func TestLedgerChangeReaderOrder(t *testing.T) { Changes: xdr.LedgerEntryChanges{}, }, } - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() - assertChangesEqual(t, seq, mock, []balanceEntry{ + assertChangesEqual(t, ctx, seq, mock, []balanceEntry{ {metaAddress, 300}, {metaAddress, 400}, {metaAddress, 600}, @@ -329,8 +316,8 @@ func TestLedgerChangeReaderOrder(t *testing.T) { Operations: []xdr.OperationMeta{}, }, } - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() - assertChangesEqual(t, seq, mock, []balanceEntry{}) + assertChangesEqual(t, ctx, seq, mock, []balanceEntry{}) mock.AssertExpectations(t) } diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index a33355261c..ef68e67aa0 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -1,6 +1,7 @@ package ingest import ( + "context" "encoding/hex" "io" @@ -20,16 +21,12 @@ type LedgerTransactionReader struct { // NewLedgerTransactionReader creates a new TransactionReader instance. // Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) { - exists, ledgerCloseMeta, err := backend.GetLedger(sequence) +func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) { + ledgerCloseMeta, err := backend.GetLedger(ctx, sequence) if err != nil { return nil, errors.Wrap(err, "error getting ledger from the backend") } - if !exists { - return nil, ErrNotFound - } - return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) } diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index ea09b9aa3e..85b4158022 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -85,15 +85,6 @@ type CaptiveStellarCore struct { // For testing stellarCoreRunnerFactory func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) - // Defines if the blocking mode (off by default) is on or off. In blocking mode, - // calling GetLedger blocks until the requested ledger is available. This is useful - // for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them - // and using `time.Sleep` when ledger is not available can actually slow entire - // ingestion process. - // blockingLock locks access to blocking. - blockingLock sync.Mutex - blocking bool - // cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger(). cachedMeta *xdr.LedgerCloseMeta @@ -235,13 +226,12 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error // the requested ledger c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from) c.lastLedger = &to - c.setBlocking(true) c.previousLedgerHash = nil return nil } -func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { +func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, from uint32) error { latestCheckpointSequence, err := c.getLatestCheckpointSequence() if err != nil { return errors.Wrap(err, "error getting latest checkpoint sequence") @@ -269,7 +259,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { c.stellarCoreRunner = runner } - runFrom, ledgerHash, nextLedger, err := c.runFromParams(from) + runFrom, ledgerHash, nextLedger, err := c.runFromParams(ctx, from) if err != nil { return errors.Wrap(err, "error calculating ledger and hash for stelar-core run") } @@ -285,7 +275,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { if c.ledgerHashStore != nil { var exists bool - ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(nextLedger - 1) + ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, nextLedger-1) if err != nil { return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1) } @@ -294,13 +284,11 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { } } - c.setBlocking(false) - return nil } // runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash -func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) { +func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) { if from == 1 { // Trying to start-from 1 results in an error from Stellar-Core: // Target ledger 1 is not newer than last closed ledger 1 - nothing to do @@ -338,7 +326,7 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH runFrom = from - 1 if c.ledgerHashStore != nil { var exists bool - ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(runFrom) + ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, runFrom) if err != nil { err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom) return @@ -357,7 +345,7 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH return } -func (c *CaptiveStellarCore) startPreparingRange(ledgerRange Range) (bool, error) { +func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRange Range) (bool, error) { c.stellarCoreLock.Lock() defer c.stellarCoreLock.Unlock() @@ -375,7 +363,7 @@ func (c *CaptiveStellarCore) startPreparingRange(ledgerRange Range) (bool, error if ledgerRange.bounded { err = c.openOfflineReplaySubprocess(ledgerRange.from, ledgerRange.to) } else { - err = c.openOnlineReplaySubprocess(ledgerRange.from) + err = c.openOnlineReplaySubprocess(ctx, ledgerRange.from) } if err != nil { return false, errors.Wrap(err, "opening subprocess") @@ -393,18 +381,14 @@ func (c *CaptiveStellarCore) startPreparingRange(ledgerRange Range) (bool, error // it normally (including connecting to the Stellar network). // Please note that using a BoundedRange, currently, requires a full-trust on // history archive. This issue is being fixed in Stellar-Core. -func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { - if alreadyPrepared, err := c.startPreparingRange(ledgerRange); err != nil { +func (c *CaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error { + if alreadyPrepared, err := c.startPreparingRange(ctx, ledgerRange); err != nil { return errors.Wrap(err, "error starting prepare range") } else if alreadyPrepared { return nil } - old := c.isBlocking() - c.setBlocking(true) - _, _, err := c.GetLedger(ledgerRange.from) - c.setBlocking(old) - + _, err := c.GetLedger(ctx, ledgerRange.from) if err != nil { return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from) } @@ -413,7 +397,7 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { } // IsPrepared returns true if a given ledgerRange is prepared. -func (c *CaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) { +func (c *CaptiveStellarCore) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { c.stellarCoreLock.RLock() defer c.stellarCoreLock.RUnlock() @@ -454,51 +438,39 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool { return false } -// GetLedgerBlocking works as GetLedger but will block until the ledger is -// available in the backend (even for UnboundedRange). -// Please note that requesting a ledger sequence far after current ledger will -// block the execution for a long time. -func (c *CaptiveStellarCore) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) { - old := c.isBlocking() - c.setBlocking(true) - _, meta, err := c.GetLedger(sequence) - c.setBlocking(old) - return meta, err -} - -// GetLedger returns true when ledger is found and it's LedgerCloseMeta. -// Call PrepareRange first to instruct the backend which ledgers to fetch. +// GetLedger will block until the ledger is available in the backend +// (even for UnboundedRange), then return it's LedgerCloseMeta. // +// Call PrepareRange first to instruct the backend which ledgers to fetch. // CaptiveStellarCore requires PrepareRange call first to initialize Stellar-Core. // Requesting a ledger on non-prepared backend will return an error. // -// Because data is streamed from Stellar-Core ledger after ledger user should +// Please note that requesting a ledger sequence far after current +// ledger will block the execution for a long time. +// +// Because ledger data is streamed from Stellar-Core sequentially, users should // request sequences in a non-decreasing order. If the requested sequence number // is less than the last requested sequence number, an error will be returned. // // This function behaves differently for bounded and unbounded ranges: -// * BoundedRange makes GetLedger blocking if the requested ledger is not yet -// available in the ledger. After getting the last ledger in a range this -// method will also Close() the backend. -// * UnboundedRange makes GetLedger non-blocking. The method will return with -// the first argument equal false. -// This is done to provide maximum performance when streaming old ledgers. -func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { +// * BoundedRange: After getting the last ledger in a range this method will +// also Close() the backend. +func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { c.stellarCoreLock.RLock() defer c.stellarCoreLock.RUnlock() if c.cachedMeta != nil && sequence == c.cachedMeta.LedgerSequence() { // GetLedger can be called multiple times using the same sequence, ex. to create // change and transaction readers. If we have this ledger buffered, let's return it. - return true, *c.cachedMeta, nil + return *c.cachedMeta, nil } if c.isClosed() { - return false, xdr.LedgerCloseMeta{}, errors.New("session is closed, call PrepareRange first") + return xdr.LedgerCloseMeta{}, errors.New("session is closed, call PrepareRange first") } if sequence < c.nextLedger { - return false, xdr.LedgerCloseMeta{}, errors.Errorf( + return xdr.LedgerCloseMeta{}, errors.Errorf( "requested ledger %d is behind the captive core stream (expected=%d)", sequence, c.nextLedger, @@ -506,7 +478,7 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe } if c.lastLedger != nil && sequence > *c.lastLedger { - return false, xdr.LedgerCloseMeta{}, errors.Errorf( + return xdr.LedgerCloseMeta{}, errors.Errorf( "reading past bounded range (requested sequence=%d, last ledger in range=%d)", sequence, *c.lastLedger, @@ -514,62 +486,66 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe } // Now loop along the range until we find the ledger we want. - var errOut error for { - if !c.isBlocking() && len(c.stellarCoreRunner.getMetaPipe()) == 0 { - return false, xdr.LedgerCloseMeta{}, nil + select { + case <-ctx.Done(): + return xdr.LedgerCloseMeta{}, ctx.Err() + case result, ok := <-c.stellarCoreRunner.getMetaPipe(): + found, ledger, err := c.handleMetaPipeResult(sequence, result, ok) + if found || err != nil { + return ledger, err + } } + } +} - result, ok := <-c.stellarCoreRunner.getMetaPipe() - if errOut = c.checkMetaPipeResult(result, ok); errOut != nil { - break - } +func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaResult, ok bool) (bool, xdr.LedgerCloseMeta, error) { + if err := c.checkMetaPipeResult(result, ok); err != nil { + c.stellarCoreRunner.close() + return false, xdr.LedgerCloseMeta{}, err + } - seq := result.LedgerCloseMeta.LedgerSequence() - if seq != c.nextLedger { - // We got something unexpected; close and reset - errOut = errors.Errorf( - "unexpected ledger sequence (expected=%d actual=%d)", - c.nextLedger, - seq, - ) - break - } + seq := result.LedgerCloseMeta.LedgerSequence() + if seq != c.nextLedger { + // We got something unexpected; close and reset + c.stellarCoreRunner.close() + return false, xdr.LedgerCloseMeta{}, errors.Errorf( + "unexpected ledger sequence (expected=%d actual=%d)", + c.nextLedger, + seq, + ) + } - newPreviousLedgerHash := result.LedgerCloseMeta.PreviousLedgerHash().HexString() - if c.previousLedgerHash != nil && *c.previousLedgerHash != newPreviousLedgerHash { - // We got something unexpected; close and reset - errOut = errors.Errorf( - "unexpected previous ledger hash for ledger %d (expected=%s actual=%s)", - seq, - *c.previousLedgerHash, - newPreviousLedgerHash, - ) - break - } + newPreviousLedgerHash := result.LedgerCloseMeta.PreviousLedgerHash().HexString() + if c.previousLedgerHash != nil && *c.previousLedgerHash != newPreviousLedgerHash { + // We got something unexpected; close and reset + c.stellarCoreRunner.close() + return false, xdr.LedgerCloseMeta{}, errors.Errorf( + "unexpected previous ledger hash for ledger %d (expected=%s actual=%s)", + seq, + *c.previousLedgerHash, + newPreviousLedgerHash, + ) + } - c.nextLedger++ - currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString() - c.previousLedgerHash = ¤tLedgerHash + c.nextLedger++ + currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString() + c.previousLedgerHash = ¤tLedgerHash - // Update cache with the latest value because we incremented nextLedger. - c.cachedMeta = result.LedgerCloseMeta + // Update cache with the latest value because we incremented nextLedger. + c.cachedMeta = result.LedgerCloseMeta - if seq == sequence { - // If we got the _last_ ledger in a segment, close before returning. - if c.lastLedger != nil && *c.lastLedger == seq { - if err := c.stellarCoreRunner.close(); err != nil { - return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "error closing session") - } + if seq == sequence { + // If we got the _last_ ledger in a segment, close before returning. + if c.lastLedger != nil && *c.lastLedger == seq { + if err := c.stellarCoreRunner.close(); err != nil { + return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "error closing session") } - return true, *c.cachedMeta, nil } + return true, *c.cachedMeta, nil } - // All paths above that break out of the loop (instead of return) - // set errOut to non-nil: there was an error and we should close and - // reset state before retuning an error to our caller. - c.stellarCoreRunner.close() - return false, xdr.LedgerCloseMeta{}, errOut + + return false, xdr.LedgerCloseMeta{}, nil } func (c *CaptiveStellarCore) checkMetaPipeResult(result metaResult, ok bool) error { @@ -609,7 +585,7 @@ func (c *CaptiveStellarCore) checkMetaPipeResult(result metaResult, ok bool) err // Note that for UnboundedRange the returned sequence number is not necessarily // the latest sequence closed by the network. It's always the last value available // in the backend. -func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) { +func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { c.stellarCoreLock.RLock() defer c.stellarCoreLock.RUnlock() @@ -627,18 +603,6 @@ func (c *CaptiveStellarCore) isClosed() bool { return c.nextLedger == 0 || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil } -func (c *CaptiveStellarCore) isBlocking() bool { - c.blockingLock.Lock() - defer c.blockingLock.Unlock() - return c.blocking -} - -func (c *CaptiveStellarCore) setBlocking(val bool) { - c.blockingLock.Lock() - c.blocking = val - c.blockingLock.Unlock() -} - // Close closes existing Stellar-Core process, streaming sessions and removes all // temporary files. Note, once a CaptiveStellarCore instance is closed it can can no longer be used and // all subsequent calls to PrepareRange(), GetLedger(), etc will fail. diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 29aeb9aa22..4c510ce2c4 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -187,7 +187,7 @@ func TestCaptivePrepareRange(t *testing.T) { }), } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.NoError(t, err) mockRunner.On("close").Return(nil).Once() err = captiveBackend.Close() @@ -223,7 +223,7 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.EqualError(t, err, "Error fast-forwarding to 100: stellar core exited unexpectedly: exit code -1") mockRunner.AssertExpectations(t) mockArchive.AssertExpectations(t) @@ -262,7 +262,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.NoError(t, err) mockRunner.AssertExpectations(t) mockArchive.AssertExpectations(t) @@ -279,16 +279,17 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { stellarCoreRunner: mockRunner, } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.EqualError(t, err, "error starting prepare range: error closing existing session: transient error") - err = captiveBackend.PrepareRange(UnboundedRange(64)) + err = captiveBackend.PrepareRange(ctx, UnboundedRange(64)) assert.EqualError(t, err, "error starting prepare range: error closing existing session: transient error") mockRunner.AssertExpectations(t) } func TestCaptivePrepareRange_ErrGettingRootHAS(t *testing.T) { + ctx := context.Background() mockArchive := &historyarchive.MockArchive{} mockArchive. On("GetRootHAS"). @@ -298,16 +299,17 @@ func TestCaptivePrepareRange_ErrGettingRootHAS(t *testing.T) { archive: mockArchive, } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error getting latest checkpoint sequence: error getting root HAS: transient error") - err = captiveBackend.PrepareRange(UnboundedRange(100)) + err = captiveBackend.PrepareRange(ctx, UnboundedRange(100)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error getting latest checkpoint sequence: error getting root HAS: transient error") mockArchive.AssertExpectations(t) } func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) { + ctx := context.Background() mockArchive := &historyarchive.MockArchive{} mockArchive. On("GetRootHAS"). @@ -319,10 +321,10 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) { archive: mockArchive, } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: sequence: 100 is greater than max available in history archives: 64") - err = captiveBackend.PrepareRange(UnboundedRange(100)) + err = captiveBackend.PrepareRange(ctx, UnboundedRange(100)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: trying to start online mode too far (latest checkpoint=64), only two checkpoints in the future allowed") mockArchive.AssertExpectations(t) @@ -361,7 +363,7 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.NoError(t, err) mockArchive.AssertExpectations(t) @@ -380,6 +382,7 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { CurrentLedger: uint32(192), }, nil) + ctx := context.Background() cancelCalled := false captiveBackend := CaptiveStellarCore{ archive: mockArchive, @@ -391,7 +394,7 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { }), } - err := captiveBackend.PrepareRange(BoundedRange(100, 200)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(100, 200)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error running stellar-core: transient error") // make sure we can Close without errors @@ -418,6 +421,7 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { On("GetLedgerHeader", uint32(127)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) + ctx := context.Background() cancelCalled := false captiveBackend := CaptiveStellarCore{ archive: mockArchive, @@ -430,7 +434,7 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { }), } - err := captiveBackend.PrepareRange(UnboundedRange(128)) + err := captiveBackend.PrepareRange(ctx, UnboundedRange(128)) assert.EqualError(t, err, "error starting prepare range: opening subprocess: error running stellar-core: transient error") // make sure we can Close without errors @@ -477,11 +481,11 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(UnboundedRange(65)) + err := captiveBackend.PrepareRange(ctx, UnboundedRange(65)) assert.NoError(t, err) captiveBackend.nextLedger = 64 - err = captiveBackend.PrepareRange(UnboundedRange(65)) + err = captiveBackend.PrepareRange(ctx, UnboundedRange(65)) assert.NoError(t, err) mockArchive.AssertExpectations(t) @@ -525,10 +529,10 @@ func TestGetLatestLedgerSequence(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(UnboundedRange(64)) + err := captiveBackend.PrepareRange(ctx, UnboundedRange(64)) assert.NoError(t, err) - latest, err := captiveBackend.GetLatestLedgerSequence() + latest, err := captiveBackend.GetLatestLedgerSequence(ctx) assert.NoError(t, err) assert.Equal(t, uint32(200), latest) @@ -570,23 +574,22 @@ func TestCaptiveGetLedger(t *testing.T) { } // requires PrepareRange - _, _, err := captiveBackend.GetLedger(64) + _, err := captiveBackend.GetLedger(ctx, 64) tt.EqualError(err, "session is closed, call PrepareRange first") - err = captiveBackend.PrepareRange(BoundedRange(65, 66)) + err = captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) assert.NoError(t, err) - _, _, err = captiveBackend.GetLedger(64) + _, err = captiveBackend.GetLedger(ctx, 64) tt.Error(err, "requested ledger 64 is behind the captive core stream (expected=66)") // reads value from buffer - found, meta, err := captiveBackend.GetLedger(65) + meta, err := captiveBackend.GetLedger(ctx, 65) tt.NoError(err) - tt.True(found) tt.Equal(xdr.Uint32(65), meta.V0.LedgerHeader.Header.LedgerSeq) // reads value from cachedMeta - _, cachedMeta, err := captiveBackend.GetLedger(65) + cachedMeta, err := captiveBackend.GetLedger(ctx, 65) tt.NoError(err) tt.Equal(meta, cachedMeta) @@ -597,14 +600,14 @@ func TestCaptiveGetLedger(t *testing.T) { cancel() }).Once() - _, _, err = captiveBackend.GetLedger(66) + _, err = captiveBackend.GetLedger(ctx, 66) tt.NoError(err) // closes after last ledger is consumed tt.True(captiveBackend.isClosed()) // we should be able to call last ledger even after get ledger is closed - _, _, err = captiveBackend.GetLedger(66) + _, err = captiveBackend.GetLedger(ctx, 66) tt.NoError(err) mockArchive.AssertExpectations(t) @@ -618,6 +621,8 @@ func TestCaptiveGetLedger(t *testing.T) { // // Before 3d97762 this test failed because cachedMeta was only updated when // the ledger with a requested sequence was reached while streaming meta. +// +// TODO: Not sure this test is really valid or worth it anymore, now that GetLedger is always blocking. func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { tt := assert.New(t) metaChan := make(chan metaResult, 300) @@ -659,18 +664,17 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(UnboundedRange(66)) + err := captiveBackend.PrepareRange(ctx, UnboundedRange(66)) assert.NoError(t, err) - found, _, err := captiveBackend.GetLedger(68) - tt.NoError(err) - tt.False(found) - tt.Equal(uint32(67), captiveBackend.cachedMeta.LedgerSequence()) - tt.Equal(uint32(68), captiveBackend.nextLedger) + // found, _, err := captiveBackend.GetLedger(ctx, 68) + // tt.NoError(err) + // tt.False(found) + // tt.Equal(uint32(67), captiveBackend.cachedMeta.LedgerSequence()) + // tt.Equal(uint32(68), captiveBackend.nextLedger) - found, meta, err := captiveBackend.GetLedger(67) + meta, err := captiveBackend.GetLedger(ctx, 67) tt.NoError(err) - tt.True(found) tt.Equal(uint32(67), meta.LedgerSequence()) mockArchive.AssertExpectations(t) @@ -715,10 +719,10 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(65, 66)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) assert.NoError(t, err) - _, _, err = captiveBackend.GetLedger(66) + _, err = captiveBackend.GetLedger(ctx, 66) assert.EqualError(t, err, "unexpected ledger sequence (expected=66 actual=68)") mockArchive.AssertExpectations(t) @@ -726,6 +730,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) } func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) { + ctx := context.Background() executablePath := "/etc/stellar-core" networkPassphrase := network.PublicNetworkPassphrase historyURLs := []string{"http://localhost"} @@ -743,7 +748,7 @@ func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) { assert.EqualError( t, - captiveStellarCore.PrepareRange(BoundedRange(65, 66)), + captiveStellarCore.PrepareRange(ctx, BoundedRange(65, 66)), "error starting prepare range: opening subprocess: error getting latest checkpoint sequence: "+ "error getting root HAS: Get \"http://localhost/.well-known/stellar-history.json\": context canceled", ) @@ -758,7 +763,7 @@ func TestCaptiveStellarCore_PrepareRangeAfterClose(t *testing.T) { captiveStellarCore.archive = mockArchive assert.EqualError( t, - captiveStellarCore.PrepareRange(BoundedRange(65, 66)), + captiveStellarCore.PrepareRange(ctx, BoundedRange(65, 66)), "error starting prepare range: opening subprocess: error running stellar-core: context canceled", ) mockArchive.AssertExpectations(t) @@ -805,16 +810,15 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(65, 66)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) assert.NoError(t, err) - found, meta, err := captiveBackend.GetLedger(65) + meta, err := captiveBackend.GetLedger(ctx, 65) tt.NoError(err) - tt.True(found) tt.Equal(xdr.Uint32(65), meta.V0.LedgerHeader.Header.LedgerSeq) // try reading from an empty buffer - _, _, err = captiveBackend.GetLedger(66) + _, err = captiveBackend.GetLedger(ctx, 66) tt.EqualError(err, "unmarshalling error") // closes if there is an error getting ledger @@ -857,10 +861,10 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(65, 66)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(65, 66)) assert.NoError(t, err) - _, _, err = captiveBackend.GetLedger(66) + _, err = captiveBackend.GetLedger(ctx, 66) tt.EqualError(err, "error closing session: transient error") mockArchive.AssertExpectations(t) @@ -901,20 +905,20 @@ func TestCaptiveAfterClose(t *testing.T) { } boundedRange := BoundedRange(65, 66) - err := captiveBackend.PrepareRange(boundedRange) + err := captiveBackend.PrepareRange(ctx, boundedRange) assert.NoError(t, err) assert.NoError(t, captiveBackend.Close()) - _, _, err = captiveBackend.GetLedger(boundedRange.to) + _, err = captiveBackend.GetLedger(ctx, boundedRange.to) assert.EqualError(t, err, "session is closed, call PrepareRange first") var prepared bool - prepared, err = captiveBackend.IsPrepared(boundedRange) + prepared, err = captiveBackend.IsPrepared(ctx, boundedRange) assert.False(t, prepared) assert.NoError(t, err) - _, err = captiveBackend.GetLatestLedgerSequence() + _, err = captiveBackend.GetLatestLedgerSequence(ctx) assert.EqualError(t, err, "stellar-core must be opened to return latest available sequence") mockArchive.AssertExpectations(t) @@ -952,21 +956,19 @@ func TestGetLedgerBoundsCheck(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(128, 130)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(128, 130)) assert.NoError(t, err) - exists, meta, err := captiveBackend.GetLedger(128) + meta, err := captiveBackend.GetLedger(ctx, 128) assert.NoError(t, err) - assert.True(t, exists) assert.Equal(t, uint32(128), meta.LedgerSequence()) prev := meta - exists, meta, err = captiveBackend.GetLedger(128) + meta, err = captiveBackend.GetLedger(ctx, 128) assert.NoError(t, err) - assert.True(t, exists) assert.Equal(t, prev, meta) - _, _, err = captiveBackend.GetLedger(64) + _, err = captiveBackend.GetLedger(ctx, 64) assert.EqualError(t, err, "requested ledger 64 is behind the captive core stream (expected=129)") mockArchive.AssertExpectations(t) @@ -1056,15 +1058,14 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(BoundedRange(64, 100)) + err := captiveBackend.PrepareRange(ctx, BoundedRange(64, 100)) assert.NoError(t, err) - exists, meta, err := captiveBackend.GetLedger(64) + meta, err := captiveBackend.GetLedger(ctx, 64) assert.NoError(t, err) - assert.True(t, exists) assert.Equal(t, uint32(64), meta.LedgerSequence()) - _, _, err = captiveBackend.GetLedger(65) + _, err = captiveBackend.GetLedger(ctx, 65) assert.EqualError(t, err, testCase.expectedError) mockArchive.AssertExpectations(t) @@ -1074,6 +1075,7 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { } func TestCaptiveUseOfLedgerHashStore(t *testing.T) { + ctx := context.Background() mockArchive := &historyarchive.MockArchive{} mockArchive. On("GetLedgerHeader", uint32(255)). @@ -1084,15 +1086,15 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { }, nil) mockLedgerHashStore := &MockLedgerHashStore{} - mockLedgerHashStore.On("GetLedgerHash", uint32(1022)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(1022)). Return("", false, fmt.Errorf("transient error")).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(254)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(254)). Return("", false, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(62)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(62)). Return("cde", true, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(126)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(126)). Return("ghi", true, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(2)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(2)). Return("mnb", true, nil).Once() cancelCalled := false @@ -1105,28 +1107,28 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { }), } - runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24) + runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(ctx, 24) assert.NoError(t, err) assert.Equal(t, uint32(2), runFrom) assert.Equal(t, "mnb", ledgerHash) assert.Equal(t, uint32(2), nextLedger) - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86) + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 86) assert.NoError(t, err) assert.Equal(t, uint32(62), runFrom) assert.Equal(t, "cde", ledgerHash) assert.Equal(t, uint32(2), nextLedger) - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(128) + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 128) assert.NoError(t, err) assert.Equal(t, uint32(126), runFrom) assert.Equal(t, "ghi", ledgerHash) assert.Equal(t, uint32(64), nextLedger) - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(1050) + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 1050) assert.EqualError(t, err, "error trying to read ledger hash 1022: transient error") - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(300) + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 300) assert.NoError(t, err) assert.Equal(t, uint32(254), runFrom, "runFrom") assert.Equal(t, "0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) @@ -1184,7 +1186,8 @@ func TestCaptiveRunFromParams(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(tc.from) + ctx := context.Background() + runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(ctx, tc.from) tt.NoError(err) tt.Equal(tc.runFrom, runFrom, "runFrom") tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) @@ -1292,9 +1295,9 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { }, nil).Once() mockLedgerHashStore := &MockLedgerHashStore{} - mockLedgerHashStore.On("GetLedgerHash", uint32(254)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(254)). Return("", false, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(191)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(191)). Return("0200000000000000000000000000000000000000000000000000000000000000", true, nil).Once() captiveBackend := CaptiveStellarCore{ @@ -1306,17 +1309,16 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { checkpointManager: historyarchive.NewCheckpointManager(64), } - err := captiveBackend.PrepareRange(UnboundedRange(300)) + err := captiveBackend.PrepareRange(ctx, UnboundedRange(300)) assert.NoError(t, err) - exists, meta, err := captiveBackend.GetLedger(300) + meta, err := captiveBackend.GetLedger(ctx, 300) assert.NoError(t, err) - assert.True(t, exists) assert.NotNil(t, captiveBackend.previousLedgerHash) assert.Equal(t, uint32(301), captiveBackend.nextLedger) assert.Equal(t, meta.LedgerHash().HexString(), *captiveBackend.previousLedgerHash) - _, _, err = captiveBackend.GetLedger(301) + _, err = captiveBackend.GetLedger(ctx, 301) assert.EqualError(t, err, "unexpected previous ledger hash for ledger 301 (expected=6f00000000000000000000000000000000000000000000000000000000000000 actual=0000000000000000000000000000000000000000000000000000000000000000)") mockRunner.AssertExpectations(t) diff --git a/ingest/ledgerbackend/database_backend.go b/ingest/ledgerbackend/database_backend.go index 0cd19a79b0..8586a072c2 100644 --- a/ingest/ledgerbackend/database_backend.go +++ b/ingest/ledgerbackend/database_backend.go @@ -27,67 +27,51 @@ var _ LedgerBackend = (*DatabaseBackend)(nil) // DatabaseBackend implements a database data store. type DatabaseBackend struct { - cancel context.CancelFunc - ctx context.Context networkPassphrase string session session } -func NewDatabaseBackend(ctx context.Context, dataSourceName, networkPassphrase string) (*DatabaseBackend, error) { +func NewDatabaseBackend(dataSourceName, networkPassphrase string) (*DatabaseBackend, error) { session, err := createSession(dataSourceName) if err != nil { return nil, err } - return NewDatabaseBackendFromSession(ctx, session, networkPassphrase) + return NewDatabaseBackendFromSession(session, networkPassphrase) } -func NewDatabaseBackendFromSession(ctx context.Context, session *db.Session, networkPassphrase string) (*DatabaseBackend, error) { - // TODO: To avoid changing the LedgerBackend interface in this call we create - // a context once for this, so that Close() can cancel any in-progress method-calls. - ctx, cancel := context.WithCancel(ctx) - +func NewDatabaseBackendFromSession(session *db.Session, networkPassphrase string) (*DatabaseBackend, error) { return &DatabaseBackend{ - cancel: cancel, - ctx: ctx, session: session, networkPassphrase: networkPassphrase, }, nil } -func (dbb *DatabaseBackend) PrepareRange(ledgerRange Range) error { - fromExists, _, err := dbb.GetLedger(ledgerRange.from) +func (dbb *DatabaseBackend) PrepareRange(ctx context.Context, ledgerRange Range) error { + _, err := dbb.GetLedger(ctx, ledgerRange.from) if err != nil { return errors.Wrap(err, "error getting ledger") } - if !fromExists { - return errors.New("`from` ledger does not exist") - } - if ledgerRange.bounded { - toExists, _, err := dbb.GetLedger(ledgerRange.to) + _, err := dbb.GetLedger(ctx, ledgerRange.to) if err != nil { return errors.Wrap(err, "error getting ledger") } - - if !toExists { - return errors.New("`to` ledger does not exist") - } } return nil } // IsPrepared returns true if a given ledgerRange is prepared. -func (*DatabaseBackend) IsPrepared(ledgerRange Range) (bool, error) { +func (*DatabaseBackend) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { return true, nil } // GetLatestLedgerSequence returns the most recent ledger sequence number present in the database. -func (dbb *DatabaseBackend) GetLatestLedgerSequence() (uint32, error) { +func (dbb *DatabaseBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { var ledger []ledgerHeader - err := dbb.session.SelectRaw(dbb.ctx, &ledger, latestLedgerSeqQuery) + err := dbb.session.SelectRaw(ctx, &ledger, latestLedgerSeqQuery) if err != nil { return 0, errors.Wrap(err, "couldn't select ledger sequence") } @@ -130,13 +114,13 @@ func sortByHash(transactions []xdr.TransactionEnvelope, passphrase string) error return nil } -// GetLedgerBlocking works as GetLedger but will block until the ledger is +// GetLedger will block until the ledger is // available in the backend (even for UnaboundedRange). // Please note that requesting a ledger sequence far after current ledger will // block the execution for a long time. -func (dbb *DatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) { +func (dbb *DatabaseBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { for { - exists, meta, err := dbb.GetLedger(sequence) + exists, meta, err := dbb.getLedgerQuery(ctx, sequence) if err != nil { return xdr.LedgerCloseMeta{}, err } @@ -149,9 +133,9 @@ func (dbb *DatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseM } } -// GetLedger returns the LedgerCloseMeta for the given ledger sequence number. +// getLedgerQuery returns the LedgerCloseMeta for the given ledger sequence number. // The first returned value is false when the ledger does not exist in the database. -func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { +func (dbb *DatabaseBackend) getLedgerQuery(ctx context.Context, sequence uint32) (bool, xdr.LedgerCloseMeta, error) { lcm := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{}, } @@ -159,7 +143,7 @@ func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMet // Query - ledgerheader var lRow ledgerHeaderHistory - err := dbb.session.GetRaw(dbb.ctx, &lRow, ledgerHeaderQuery, sequence) + err := dbb.session.GetRaw(ctx, &lRow, ledgerHeaderQuery, sequence) // Return errors... if err != nil { switch err { @@ -180,7 +164,7 @@ func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMet // Query - txhistory var txhRows []txHistory - err = dbb.session.SelectRaw(dbb.ctx, &txhRows, txHistoryQuery+orderBy, sequence) + err = dbb.session.SelectRaw(ctx, &txhRows, txHistoryQuery+orderBy, sequence) // Return errors... if err != nil { return false, lcm, errors.Wrap(err, "Error getting txHistory") @@ -206,7 +190,7 @@ func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMet // Query - txfeehistory var txfhRows []txFeeHistory - err = dbb.session.SelectRaw(dbb.ctx, &txfhRows, txFeeHistoryQuery+orderBy, sequence) + err = dbb.session.SelectRaw(ctx, &txfhRows, txFeeHistoryQuery+orderBy, sequence) // Return errors... if err != nil { return false, lcm, errors.Wrap(err, "Error getting txFeeHistory") @@ -223,7 +207,7 @@ func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMet // Query - upgradehistory var upgradeHistoryRows []upgradeHistory - err = dbb.session.SelectRaw(dbb.ctx, &upgradeHistoryRows, upgradeHistoryQuery, sequence) + err = dbb.session.SelectRaw(ctx, &upgradeHistoryRows, upgradeHistoryQuery, sequence) // Return errors... if err != nil { return false, lcm, errors.Wrap(err, "Error getting upgradeHistoryRows") @@ -252,6 +236,5 @@ func createSession(dataSourceName string) (*db.Session, error) { // Close disconnects an active database session. func (dbb *DatabaseBackend) Close() error { - dbb.cancel() return dbb.session.Close() } diff --git a/ingest/ledgerbackend/ledger_backend.go b/ingest/ledgerbackend/ledger_backend.go index 2e2a7db1de..572de2e183 100644 --- a/ingest/ledgerbackend/ledger_backend.go +++ b/ingest/ledgerbackend/ledger_backend.go @@ -10,17 +10,15 @@ import ( type LedgerBackend interface { // GetLatestLedgerSequence returns the sequence of the latest ledger available // in the backend. - GetLatestLedgerSequence() (sequence uint32, err error) - // The first returned value is false when the ledger does not exist in a backend. - GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) - // Works like GetLedger but will block until the ledger is available. - GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) + GetLatestLedgerSequence(ctx context.Context) (sequence uint32, err error) + // GetLedger will block until the ledger is available. + GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) // PrepareRange prepares the given range (including from and to) to be loaded. // Some backends (like captive stellar-core) need to initalize data to be // able to stream ledgers. Blocks until the first ledger is available. - PrepareRange(ledgerRange Range) error + PrepareRange(ctx context.Context, ledgerRange Range) error // IsPrepared returns true if a given ledgerRange is prepared. - IsPrepared(ledgerRange Range) (bool, error) + IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) Close() error } diff --git a/ingest/ledgerbackend/ledger_hash_store.go b/ingest/ledgerbackend/ledger_hash_store.go index 13cd3e4d6d..d0eeefa5a9 100644 --- a/ingest/ledgerbackend/ledger_hash_store.go +++ b/ingest/ledgerbackend/ledger_hash_store.go @@ -14,34 +14,27 @@ import ( // source like history archives. type TrustedLedgerHashStore interface { // GetLedgerHash returns the ledger hash for the given sequence number - GetLedgerHash(seq uint32) (string, bool, error) + GetLedgerHash(ctx context.Context, seq uint32) (string, bool, error) Close() error } // HorizonDBLedgerHashStore is a TrustedLedgerHashStore which uses horizon's db to look up ledger hashes type HorizonDBLedgerHashStore struct { - cancel context.CancelFunc - ctx context.Context session *db.Session } // NewHorizonDBLedgerHashStore constructs a new TrustedLedgerHashStore backed by the horizon db -func NewHorizonDBLedgerHashStore(ctx context.Context, session *db.Session) TrustedLedgerHashStore { - ctx, cancel := context.WithCancel(ctx) - return HorizonDBLedgerHashStore{ - cancel: cancel, - ctx: ctx, - session: session, - } +func NewHorizonDBLedgerHashStore(session *db.Session) TrustedLedgerHashStore { + return HorizonDBLedgerHashStore{session: session} } // GetLedgerHash returns the ledger hash for the given sequence number -func (h HorizonDBLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error) { +func (h HorizonDBLedgerHashStore) GetLedgerHash(ctx context.Context, seq uint32) (string, bool, error) { sql := sq.Select("hl.ledger_hash").From("history_ledgers hl"). Limit(1).Where("sequence = ?", seq) var hash string - err := h.session.Get(h.ctx, &hash, sql) + err := h.session.Get(ctx, &hash, sql) if h.session.NoRows(err) { return hash, false, nil } @@ -49,7 +42,6 @@ func (h HorizonDBLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error } func (h HorizonDBLedgerHashStore) Close() error { - h.cancel() return h.session.Close() } @@ -59,8 +51,8 @@ type MockLedgerHashStore struct { } // GetLedgerHash returns the ledger hash for the given sequence number -func (m *MockLedgerHashStore) GetLedgerHash(seq uint32) (string, bool, error) { - args := m.Called(seq) +func (m *MockLedgerHashStore) GetLedgerHash(ctx context.Context, seq uint32) (string, bool, error) { + args := m.Called(ctx, seq) return args.Get(0).(string), args.Get(1).(bool), args.Error(2) } diff --git a/ingest/ledgerbackend/mock_database_backend.go b/ingest/ledgerbackend/mock_database_backend.go index 67d9a367d7..c5f85ecef7 100644 --- a/ingest/ledgerbackend/mock_database_backend.go +++ b/ingest/ledgerbackend/mock_database_backend.go @@ -1,6 +1,8 @@ package ledgerbackend import ( + "context" + "github.com/stretchr/testify/mock" "github.com/stellar/go/xdr" @@ -12,28 +14,23 @@ type MockDatabaseBackend struct { mock.Mock } -func (m *MockDatabaseBackend) GetLatestLedgerSequence() (uint32, error) { - args := m.Called() +func (m *MockDatabaseBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { + args := m.Called(ctx) return args.Get(0).(uint32), args.Error(1) } -func (m *MockDatabaseBackend) PrepareRange(ledgerRange Range) error { - args := m.Called(ledgerRange) +func (m *MockDatabaseBackend) PrepareRange(ctx context.Context, ledgerRange Range) error { + args := m.Called(ctx, ledgerRange) return args.Error(0) } -func (m *MockDatabaseBackend) IsPrepared(ledgerRange Range) (bool, error) { - args := m.Called(ledgerRange) +func (m *MockDatabaseBackend) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { + args := m.Called(ctx, ledgerRange) return args.Bool(0), args.Error(1) } -func (m *MockDatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { - args := m.Called(sequence) - return args.Bool(0), args.Get(1).(xdr.LedgerCloseMeta), args.Error(2) -} - -func (m *MockDatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) { - args := m.Called(sequence) +func (m *MockDatabaseBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + args := m.Called(ctx, sequence) return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1) } diff --git a/ingest/ledgerbackend/remote_captive_core.go b/ingest/ledgerbackend/remote_captive_core.go index 52332ac76e..18946a8d3c 100644 --- a/ingest/ledgerbackend/remote_captive_core.go +++ b/ingest/ledgerbackend/remote_captive_core.go @@ -31,8 +31,7 @@ type LatestLedgerSequenceResponse struct { // LedgerResponse is the response for the GetLedger command. type LedgerResponse struct { - Present bool `json:"present"` - Ledger Base64Ledger `json:"ledger"` + Ledger Base64Ledger `json:"ledger"` } // Base64Ledger extends xdr.LedgerCloseMeta with JSON encoding and decoding @@ -66,8 +65,6 @@ type RemoteCaptiveStellarCore struct { url *url.URL client *http.Client lock *sync.Mutex - cancel context.CancelFunc - parentCtx context.Context prepareRangePollInterval time.Duration } @@ -85,7 +82,7 @@ func PrepareRangePollInterval(d time.Duration) RemoteCaptiveOption { // NewRemoteCaptive returns a new RemoteCaptiveStellarCore instance. // // Only the captiveCoreURL parameter is required. -func NewRemoteCaptive(ctx context.Context, captiveCoreURL string, options ...RemoteCaptiveOption) (RemoteCaptiveStellarCore, error) { +func NewRemoteCaptive(captiveCoreURL string, options ...RemoteCaptiveOption) (RemoteCaptiveStellarCore, error) { u, err := url.Parse(captiveCoreURL) if err != nil { return RemoteCaptiveStellarCore{}, errors.Wrap(err, "unparseable url") @@ -94,9 +91,8 @@ func NewRemoteCaptive(ctx context.Context, captiveCoreURL string, options ...Rem client := RemoteCaptiveStellarCore{ prepareRangePollInterval: time.Second, url: u, - client: &http.Client{Timeout: 5 * time.Second}, + client: &http.Client{Timeout: 10 * time.Second}, lock: &sync.Mutex{}, - parentCtx: ctx, } for _, option := range options { option(&client) @@ -129,13 +125,17 @@ func decodeResponse(response *http.Response, payload interface{}) error { // Note that for UnboundedRange the returned sequence number is not necessarily // the latest sequence closed by the network. It's always the last value available // in the backend. -func (c RemoteCaptiveStellarCore) GetLatestLedgerSequence() (sequence uint32, err error) { +func (c RemoteCaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (sequence uint32, err error) { // TODO: Have a context on this request so we can cancel all outstanding // requests, not just PrepareRange. u := *c.url u.Path = path.Join(u.Path, "latest-sequence") + request, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return 0, errors.Wrap(err, "cannot construct http request") + } - response, err := c.client.Get(u.String()) + response, err := c.client.Do(request) if err != nil { return 0, errors.Wrap(err, "failed to execute request") } @@ -150,29 +150,9 @@ func (c RemoteCaptiveStellarCore) GetLatestLedgerSequence() (sequence uint32, er // Close cancels any pending PrepareRange requests. func (c RemoteCaptiveStellarCore) Close() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.cancel != nil { - c.cancel() - } return nil } -func (c RemoteCaptiveStellarCore) createContext() context.Context { - c.lock.Lock() - defer c.lock.Unlock() - - // Cancel any outstanding PrepareRange request - if c.cancel != nil { - c.cancel() - } - - // Make a new context for this new request. - ctx, cancel := context.WithCancel(c.parentCtx) - c.cancel = cancel - return ctx -} - // PrepareRange prepares the given range (including from and to) to be loaded. // Captive stellar-core backend needs to initalize Stellar-Core state to be // able to stream ledgers. @@ -182,36 +162,20 @@ func (c RemoteCaptiveStellarCore) createContext() context.Context { // it normally (including connecting to the Stellar network). // Please note that using a BoundedRange, currently, requires a full-trust on // history archive. This issue is being fixed in Stellar-Core. -func (c RemoteCaptiveStellarCore) PrepareRange(ledgerRange Range) error { - ctx := c.createContext() - u := *c.url - u.Path = path.Join(u.Path, "prepare-range") - rangeBytes, err := json.Marshal(ledgerRange) - if err != nil { - return errors.Wrap(err, "cannot serialize range") - } +func (c RemoteCaptiveStellarCore) PrepareRange(ctx context.Context, ledgerRange Range) error { + // TODO: removing createContext call here means we could technically have + // multiple prepareRange requests happening at the same time. Do we still + // need to enforce that? timer := time.NewTimer(c.prepareRangePollInterval) defer timer.Stop() for { - req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), bytes.NewReader(rangeBytes)) - if err != nil { - return errors.Wrap(err, "cannot construct http request") - } - - var response *http.Response - response, err = c.client.Do(req) + ready, err := c.IsPrepared(ctx, ledgerRange) if err != nil { - return errors.Wrap(err, "failed to execute request") - } - - var parsed PrepareRangeResponse - if err = decodeResponse(response, &parsed); err != nil { return err } - - if parsed.Ready { + if ready { return nil } @@ -225,9 +189,9 @@ func (c RemoteCaptiveStellarCore) PrepareRange(ledgerRange Range) error { } // IsPrepared returns true if a given ledgerRange is prepared. -func (c RemoteCaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) { - // TODO: Have a context on this request so we can cancel all outstanding - // requests, not just PrepareRange. +func (c RemoteCaptiveStellarCore) IsPrepared(ctx context.Context, ledgerRange Range) (bool, error) { + // TODO: Have some way to cancel all outstanding requests, not just + // PrepareRange. u := *c.url u.Path = path.Join(u.Path, "prepare-range") rangeBytes, err := json.Marshal(ledgerRange) @@ -235,9 +199,14 @@ func (c RemoteCaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) { return false, errors.Wrap(err, "cannot serialize range") } body := bytes.NewReader(rangeBytes) + request, err := http.NewRequestWithContext(ctx, "POST", u.String(), body) + if err != nil { + return false, errors.Wrap(err, "cannot construct http request") + } + request.Header.Add("Content-Type", "application/json; charset=utf-8") var response *http.Response - response, err = c.client.Post(u.String(), "application/json; charset=utf-8", body) + response, err = c.client.Do(request) if err != nil { return false, errors.Wrap(err, "failed to execute request") } @@ -250,53 +219,43 @@ func (c RemoteCaptiveStellarCore) IsPrepared(ledgerRange Range) (bool, error) { return parsed.Ready, nil } -// GetLedger returns true when ledger is found and it's LedgerCloseMeta. +// GetLedger long-polls a remote stellar core backend, until the requested +// ledger is ready. + // Call PrepareRange first to instruct the backend which ledgers to fetch. // -// CaptiveStellarCore requires PrepareRange call first to initialize Stellar-Core. // Requesting a ledger on non-prepared backend will return an error. // // Because data is streamed from Stellar-Core ledger after ledger user should // request sequences in a non-decreasing order. If the requested sequence number // is less than the last requested sequence number, an error will be returned. -// -// This function behaves differently for bounded and unbounded ranges: -// * BoundedRange makes GetLedger blocking if the requested ledger is not yet -// available in the ledger. After getting the last ledger in a range this -// method will also Close() the backend. -// * UnboundedRange makes GetLedger non-blocking. The method will return with -// the first argument equal false. -// This is done to provide maximum performance when streaming old ledgers. -func (c RemoteCaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { - // TODO: Have a context on this request so we can cancel all outstanding - // requests, not just PrepareRange. - u := *c.url - u.Path = path.Join(u.Path, "ledger", strconv.FormatUint(uint64(sequence), 10)) - - response, err := c.client.Get(u.String()) - if err != nil { - return false, xdr.LedgerCloseMeta{}, errors.Wrap(err, "failed to execute request") - } +func (c RemoteCaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + for { + // TODO: Have some way to cancel all outstanding requests, not just + // PrepareRange. + u := *c.url + u.Path = path.Join(u.Path, "ledger", strconv.FormatUint(uint64(sequence), 10)) + request, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return xdr.LedgerCloseMeta{}, errors.Wrap(err, "cannot construct http request") + } - var parsed LedgerResponse - if err = decodeResponse(response, &parsed); err != nil { - return false, xdr.LedgerCloseMeta{}, err - } + response, err := c.client.Do(request) + if err != nil { + return xdr.LedgerCloseMeta{}, errors.Wrap(err, "failed to execute request") + } - return parsed.Present, xdr.LedgerCloseMeta(parsed.Ledger), nil -} + if response.StatusCode == http.StatusRequestTimeout { + response.Body.Close() + // This request timed out. Retry. + continue + } -func (c RemoteCaptiveStellarCore) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) { - for { - exists, meta, err := c.GetLedger(sequence) - if err != nil { + var parsed LedgerResponse + if err = decodeResponse(response, &parsed); err != nil { return xdr.LedgerCloseMeta{}, err } - if exists { - return meta, nil - } else { - time.Sleep(time.Second) - } + return xdr.LedgerCloseMeta(parsed.Ledger), nil } } diff --git a/ingest/ledgerbackend/remote_captive_core_test.go b/ingest/ledgerbackend/remote_captive_core_test.go new file mode 100644 index 0000000000..3a4b4d28a7 --- /dev/null +++ b/ingest/ledgerbackend/remote_captive_core_test.go @@ -0,0 +1,74 @@ +package ledgerbackend + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/stellar/go/xdr" +) + +func TestGetLedgerSucceeds(t *testing.T) { + expectedLedger := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: 64, + }, + }, + }, + } + called := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called++ + json.NewEncoder(w).Encode(LedgerResponse{ + Ledger: Base64Ledger(expectedLedger), + }) + })) + defer server.Close() + + client, err := NewRemoteCaptive(server.URL) + require.NoError(t, err) + + ledger, err := client.GetLedger(context.Background(), 64) + require.NoError(t, err) + require.Equal(t, 1, called) + require.Equal(t, expectedLedger, ledger) +} + +func TestGetLedgerTakesAWhile(t *testing.T) { + expectedLedger := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: 64, + }, + }, + }, + } + called := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called++ + if called == 1 { + // TODO: Check this is what the server really does. + w.WriteHeader(http.StatusRequestTimeout) + return + } + json.NewEncoder(w).Encode(LedgerResponse{ + Ledger: Base64Ledger(expectedLedger), + }) + })) + defer server.Close() + + client, err := NewRemoteCaptive(server.URL) + require.NoError(t, err) + + ledger, err := client.GetLedger(context.Background(), 64) + require.NoError(t, err) + require.Equal(t, 2, called) + require.Equal(t, expectedLedger, ledger) +} diff --git a/ingest/tutorial/example_hello.go b/ingest/tutorial/example_hello.go index 229ffeacad..44c9f409b2 100644 --- a/ingest/tutorial/example_hello.go +++ b/ingest/tutorial/example_hello.go @@ -1,26 +1,24 @@ package main import ( + "context" "fmt" backends "github.com/stellar/go/ingest/ledgerbackend" ) func helloworld() { + ctx := context.Background() backend, err := backends.NewCaptive(config) panicIf(err) defer backend.Close() // Prepare a single ledger to be ingested, - err = backend.PrepareRange(backends.BoundedRange(123456, 123456)) + err = backend.PrepareRange(ctx, backends.BoundedRange(123456, 123456)) panicIf(err) // then retrieve it: - ok, ledger, err := backend.GetLedger(123456) - if !ok { - err = fmt.Errorf("The ledger doesn't exist on the backend.") - } - + ledger, err := backend.GetLedger(ctx, 123456) panicIf(err) // Now `ledger` is a raw `xdr.LedgerCloseMeta` object containing the diff --git a/ingest/tutorial/example_statistics.go b/ingest/tutorial/example_statistics.go index 4a5fe18d6d..f6174fb1b0 100644 --- a/ingest/tutorial/example_statistics.go +++ b/ingest/tutorial/example_statistics.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "io" @@ -11,6 +12,7 @@ import ( ) func statistics() { + ctx := context.Background() // Only log errors from the backend to keep output cleaner. lg := log.New() lg.SetLevel(logrus.ErrorLevel) @@ -26,7 +28,7 @@ func statistics() { fmt.Printf("Preparing range (%d ledgers)...\n", ledgersToRead) ledgerRange := backends.BoundedRange(startingSeq, startingSeq+ledgersToRead) - err = backend.PrepareRange(ledgerRange) + err = backend.PrepareRange(ctx, ledgerRange) panicIf(err) // These are the statistics that we're tracking. @@ -37,7 +39,8 @@ func statistics() { fmt.Printf("Processed ledger %d...\r", seq) txReader, err := ingest.NewLedgerTransactionReader( - backend, config.NetworkPassphrase, seq) + ctx, backend, config.NetworkPassphrase, seq, + ) panicIf(err) defer txReader.Close() diff --git a/ingest/tutorial/getting-started.md b/ingest/tutorial/getting-started.md index 6bec707d03..0091880c1d 100644 --- a/ingest/tutorial/getting-started.md +++ b/ingest/tutorial/getting-started.md @@ -49,6 +49,7 @@ With that in mind, here's a minimalist example of the ingestion library: package main import ( + "context" "fmt" "github.com/stellar/go/ingest/ledgerbackend" @@ -64,11 +65,8 @@ func main() { panicIf(err) // then retrieve it: - ok, ledger, err := ledgerbackend.GetLedger(123456) - if !ok { - err = fmt.Errorf("The ledger doesn't exist on the backend.") - } - + ctx := context.Background() + ledger, err := ledgerbackend.GetLedger(ctx, 123456) panicIf(err) // Now `ledger` is a raw `xdr.LedgerCloseMeta` object containing the diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 735075576d..25326cdd5e 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Unreleased + +* Refactor `ingest/ledgerbackend/LedgerBackend.GetLedger` method to always block, removing `ingest/ledgerbackend/LedgerBackend.GetLedgerBlocking`. Adds a first `context.Context` param to most `LedgerBackend` methods. + ## v2.2.0 **Upgrading to this version will trigger state rebuild. During this process (which can take up to 20 minutes) it will not ingest new ledgers.** diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index c148b6ed8e..6101b99bd5 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -58,8 +58,8 @@ func TestInsertLedger(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - ledgerHashStore := ledgerbackend.NewHorizonDBLedgerHashStore(tt.Ctx, tt.HorizonSession()) - _, exists, err := ledgerHashStore.GetLedgerHash(100) + ledgerHashStore := ledgerbackend.NewHorizonDBLedgerHashStore(tt.HorizonSession()) + _, exists, err := ledgerHashStore.GetLedgerHash(tt.Ctx, 100) tt.Assert.NoError(err) tt.Assert.False(exists) @@ -143,7 +143,7 @@ func TestInsertLedger(t *testing.T) { tt.Assert.Equal(expectedLedger, ledgerFromDB) - hash, exists, err := ledgerHashStore.GetLedgerHash(uint32(expectedLedger.Sequence)) + hash, exists, err := ledgerHashStore.GetLedgerHash(tt.Ctx, uint32(expectedLedger.Sequence)) tt.Assert.NoError(err) tt.Assert.True(exists) tt.Assert.Equal(expectedLedger.LedgerHash, hash) diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index 6b13539043..75e5c83c13 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -53,9 +53,9 @@ func (s *BuildStateTestSuite) SetupTest() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback", s.ctx).Return(nil).Once() - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(63)).Return(nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", uint32(63)).Return(xdr.LedgerCloseMeta{ + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(63)).Return(xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ @@ -107,8 +107,8 @@ func (s *BuildStateTestSuite) TestRangeNotPreparedFailPrepare() { *s.historyQ = mockDBQ{} *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(63)).Return(errors.New("my error")).Once() + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(errors.New("my error")).Once() next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) @@ -122,9 +122,9 @@ func (s *BuildStateTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() *s.historyQ = mockDBQ{} *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(63)).Return(nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", uint32(63)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(63)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system) diff --git a/services/horizon/internal/ingest/database_backend_test.go b/services/horizon/internal/ingest/database_backend_test.go index 64769b602a..69de728116 100644 --- a/services/horizon/internal/ingest/database_backend_test.go +++ b/services/horizon/internal/ingest/database_backend_test.go @@ -13,9 +13,9 @@ func TestGetLatestLedger(t *testing.T) { tt.ScenarioWithoutHorizon("base") defer tt.Finish() - backend, err := ledgerbackend.NewDatabaseBackendFromSession(tt.Ctx, tt.CoreSession(), network.TestNetworkPassphrase) + backend, err := ledgerbackend.NewDatabaseBackendFromSession(tt.CoreSession(), network.TestNetworkPassphrase) tt.Assert.NoError(err) - seq, err := backend.GetLatestLedgerSequence() + seq, err := backend.GetLatestLedgerSequence(tt.Ctx) tt.Assert.NoError(err) tt.Assert.Equal(uint32(3), seq) } @@ -28,8 +28,8 @@ func TestGetLatestLedgerNotFound(t *testing.T) { _, err := tt.CoreDB.Exec(`DELETE FROM ledgerheaders`) tt.Assert.NoError(err, "failed to remove ledgerheaders") - backend, err := ledgerbackend.NewDatabaseBackendFromSession(tt.Ctx, tt.CoreSession(), network.TestNetworkPassphrase) + backend, err := ledgerbackend.NewDatabaseBackendFromSession(tt.CoreSession(), network.TestNetworkPassphrase) tt.Assert.NoError(err) - _, err = backend.GetLatestLedgerSequence() + _, err = backend.GetLatestLedgerSequence(tt.Ctx) tt.Assert.EqualError(err, "no ledgers exist in ledgerheaders table") } diff --git a/services/horizon/internal/ingest/db_integration_test.go b/services/horizon/internal/ingest/db_integration_test.go index 6f20de97bf..2857a277ef 100644 --- a/services/horizon/internal/ingest/db_integration_test.go +++ b/services/horizon/internal/ingest/db_integration_test.go @@ -113,8 +113,8 @@ func (s *DBTestSuite) setupMocksForBuildState() { s.historyAdapter.On("BucketListHash", s.sequence). Return(checkpointHash, nil).Once() - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(s.sequence)).Return(true, nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", s.sequence). + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(s.sequence)).Return(true, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, s.sequence). Return( xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ diff --git a/services/horizon/internal/ingest/fake_ledger_backend.go b/services/horizon/internal/ingest/fake_ledger_backend.go index b985e43569..74e82d8708 100644 --- a/services/horizon/internal/ingest/fake_ledger_backend.go +++ b/services/horizon/internal/ingest/fake_ledger_backend.go @@ -1,6 +1,8 @@ package ingest import ( + "context" + "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/keypair" logpkg "github.com/stellar/go/support/log" @@ -12,15 +14,15 @@ type fakeLedgerBackend struct { changesPerTransaction int } -func (fakeLedgerBackend) GetLatestLedgerSequence() (uint32, error) { +func (fakeLedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { return 1, nil } -func (fakeLedgerBackend) PrepareRange(r ledgerbackend.Range) error { +func (fakeLedgerBackend) PrepareRange(ctx context.Context, r ledgerbackend.Range) error { return nil } -func (fakeLedgerBackend) IsPrepared(r ledgerbackend.Range) (bool, error) { +func (fakeLedgerBackend) IsPrepared(ctx context.Context, r ledgerbackend.Range) (bool, error) { return true, nil } @@ -98,7 +100,7 @@ func fakeOffer(offerID int64) xdr.LedgerEntryChange { } } -func (f fakeLedgerBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { +func (f fakeLedgerBackend) getLedgerAsync(ctx context.Context, sequence uint32) (bool, xdr.LedgerCloseMeta, error) { ledgerCloseMeta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ @@ -199,8 +201,8 @@ func (f fakeLedgerBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta return true, ledgerCloseMeta, nil } -func (f *fakeLedgerBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) { - _, meta, err := f.GetLedger(sequence) +func (f *fakeLedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + _, meta, err := f.getLedgerAsync(ctx, sequence) return meta, err } diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 88eeafa8b7..bfd6df15db 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -251,7 +251,7 @@ func (b buildState) run(s *system) (transition, error) { log.WithField("ledger", b.checkpointLedger).Info("Waiting for ledger to be available in the backend...") startTime := time.Now() - ledgerCloseMeta, err = s.ledgerBackend.GetLedgerBlocking(b.checkpointLedger) + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, b.checkpointLedger) if err != nil { return nextFailState, errors.Wrap(err, "error getting ledger blocking") } @@ -379,7 +379,7 @@ func (r resumeState) run(s *system) (transition, error) { log.WithField("ledger", ingestLedger).Info("Waiting for ledger to be available in the backend...") startTime := time.Now() - ledgerCloseMeta, err := s.ledgerBackend.GetLedgerBlocking(ingestLedger) + ledgerCloseMeta, err := s.ledgerBackend.GetLedger(s.ctx, ingestLedger) if err != nil { return start(), errors.Wrap(err, "error getting ledger blocking") } @@ -565,7 +565,7 @@ func (h historyRangeState) run(s *system) (transition, error) { log.WithField("ledger", cur).Info("Waiting for ledger to be available in the backend...") startTime := time.Now() - ledgerCloseMeta, err = s.ledgerBackend.GetLedgerBlocking(cur) + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) if err != nil { // Commit finished work in case of ledger backend error. commitErr := s.historyQ.Commit(s.ctx) @@ -656,15 +656,11 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u } for cur := fromLedger; cur <= toLedger; cur++ { - exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(cur) + ledgerCloseMeta, err := s.ledgerBackend.GetLedger(s.ctx, cur) if err != nil { return errors.Wrap(err, "error getting ledger") } - if !exists { - return errors.New("error getting ledger: ledger does not exist") - } - if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil { return err } @@ -691,7 +687,7 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) { }).Info("Preparing ledger backend to retrieve range") startTime := time.Now() - err := s.ledgerBackend.PrepareRange(ledgerbackend.BoundedRange(h.fromLedger, h.toLedger)) + err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger)) if err != nil { return stop(), errors.Wrap(err, "error preparing range") } @@ -820,7 +816,7 @@ func (v verifyRangeState) run(s *system) (transition, error) { log.WithField("ledger", v.fromLedger).Info("Preparing range") startTime := time.Now() - err = s.ledgerBackend.PrepareRange(ledgerbackend.BoundedRange(v.fromLedger, v.toLedger)) + err = s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(v.fromLedger, v.toLedger)) if err != nil { return stop(), errors.Wrap(err, "Error preparing range") } @@ -833,15 +829,11 @@ func (v verifyRangeState) run(s *system) (transition, error) { log.WithField("ledger", v.fromLedger).Info("Processing state") startTime = time.Now() - exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(v.fromLedger) + ledgerCloseMeta, err := s.ledgerBackend.GetLedger(s.ctx, v.fromLedger) if err != nil { return stop(), errors.Wrap(err, "error getting ledger") } - if !exists { - return stop(), errors.New("error getting ledger: ledger does not exist") - } - stats, err := s.runner.RunHistoryArchiveIngestion( ledgerCloseMeta.LedgerSequence(), ledgerCloseMeta.ProtocolVersion(), @@ -878,17 +870,12 @@ func (v verifyRangeState) run(s *system) (transition, error) { return stop(), err } - var exists bool var ledgerCloseMeta xdr.LedgerCloseMeta - exists, ledgerCloseMeta, err = s.ledgerBackend.GetLedger(sequence) + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, sequence) if err != nil { return stop(), errors.Wrap(err, "error getting ledger") } - if !exists { - return stop(), errors.New("error getting ledger: ledger does not exist") - } - var changeStats ingest.StatsChangeProcessorResults var ledgerTransactionStats processors.StatsLedgerTransactionProcessorResults changeStats, _, ledgerTransactionStats, _, err = @@ -959,15 +946,11 @@ func (stressTestState) run(s *system) (transition, error) { }).Info("Processing ledger") startTime := time.Now() - exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(sequence) + ledgerCloseMeta, err := s.ledgerBackend.GetLedger(s.ctx, sequence) if err != nil { return stop(), errors.Wrap(err, "error getting ledger") } - if !exists { - return stop(), errors.New("error getting ledger: ledger does not exist") - } - changeStats, _, ledgerTransactionStats, _, err := s.runner.RunAllProcessorsOnLedger(ledgerCloseMeta) if err != nil { err = errors.Wrap(err, "Error running processors on ledger") @@ -1017,7 +1000,7 @@ func (s *system) completeIngestion(ctx context.Context, ledger uint32) error { func (s *system) maybePrepareRange(ctx context.Context, from uint32) error { ledgerRange := ledgerbackend.UnboundedRange(from) - prepared, err := s.ledgerBackend.IsPrepared(ledgerRange) + prepared, err := s.ledgerBackend.IsPrepared(ctx, ledgerRange) if err != nil { return errors.Wrap(err, "error checking prepared range") } @@ -1026,7 +1009,7 @@ func (s *system) maybePrepareRange(ctx context.Context, from uint32) error { log.WithFields(logpkg.F{"from": from}).Info("Preparing range") startTime := time.Now() - err = s.ledgerBackend.PrepareRange(ledgerRange) + err = s.ledgerBackend.PrepareRange(ctx, ledgerRange) if err != nil { return errors.Wrap(err, "error preparing range") } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index 751e5bbfa3..9e29d7d635 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -47,8 +47,8 @@ func (s *IngestHistoryRangeStateTestSuite) SetupTest() { s.historyQ.On("Rollback", s.ctx).Return(nil).Once() - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(100)).Return(nil).Once() + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(nil).Once() } func (s *IngestHistoryRangeStateTestSuite) TearDownTest() { @@ -90,8 +90,8 @@ func (s *IngestHistoryRangeStateTestSuite) TestRangeNotPreparedFailPrepare() { *s.historyQ = mockDBQ{} *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(100)).Return(errors.New("my error")).Once() + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) @@ -158,7 +158,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerR }, }, } - s.ledgerBackend.On("GetLedgerBlocking", uint32(100)).Return(meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( processors.StatsLedgerTransactionProcessorResults{}, @@ -187,7 +187,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccess() { }, }, } - s.ledgerBackend.On("GetLedgerBlocking", uint32(i)).Return(meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( processors.StatsLedgerTransactionProcessorResults{}, @@ -217,7 +217,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccessOneLedger() { }, }, } - s.ledgerBackend.On("GetLedgerBlocking", uint32(100)).Return(meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( processors.StatsLedgerTransactionProcessorResults{}, @@ -246,8 +246,8 @@ func (s *IngestHistoryRangeStateTestSuite) TestCommitsWorkOnLedgerBackendFailure }, }, } - s.ledgerBackend.On("GetLedgerBlocking", uint32(100)).Return(meta, nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", uint32(101)). + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)). Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( @@ -296,7 +296,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.historyQ.On("Rollback", s.ctx).Return(nil).Once() s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() } func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { @@ -407,7 +407,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedge }, }, } - s.ledgerBackend.On("GetLedger", uint32(100)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta). Return( @@ -443,7 +443,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestCommitFails() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(100)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( processors.StatsLedgerTransactionProcessorResults{}, @@ -482,7 +482,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(i)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( processors.StatsLedgerTransactionProcessorResults{}, @@ -527,8 +527,8 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { // Recreate mock in this single test to remove previous assertion. *s.ledgerBackend = mockLedgerBackend{} - s.ledgerBackend.On("PrepareRange", ledgerbackend.BoundedRange(100, 100)).Return(nil).Once() - s.ledgerBackend.On("GetLedger", uint32(100)).Return(true, meta, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 100)).Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() err := s.system.ReingestRange(100, 100, false) s.Assert().NoError(err) @@ -562,7 +562,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(i)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( processors.StatsLedgerTransactionProcessorResults{}, diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index ac4669b412..a19890b703 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -188,7 +188,7 @@ func NewSystem(config Config) (System, error) { var ledgerBackend ledgerbackend.LedgerBackend if config.EnableCaptiveCore { if len(config.RemoteCaptiveCoreURL) > 0 { - ledgerBackend, err = ledgerbackend.NewRemoteCaptive(ctx, config.RemoteCaptiveCoreURL) + ledgerBackend, err = ledgerbackend.NewRemoteCaptive(config.RemoteCaptiveCoreURL) if err != nil { cancel() return nil, errors.Wrap(err, "error creating captive core backend") @@ -206,7 +206,7 @@ func NewSystem(config Config) (System, error) { NetworkPassphrase: config.NetworkPassphrase, HistoryArchiveURLs: []string{config.HistoryArchiveURL}, CheckpointFrequency: config.CheckpointFrequency, - LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(ctx, config.HistorySession), + LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), Log: logger, Context: ctx, }, @@ -218,7 +218,7 @@ func NewSystem(config Config) (System, error) { } } else { coreSession := config.CoreSession.Clone() - ledgerBackend, err = ledgerbackend.NewDatabaseBackendFromSession(ctx, coreSession, config.NetworkPassphrase) + ledgerBackend, err = ledgerbackend.NewDatabaseBackendFromSession(coreSession, config.NetworkPassphrase) if err != nil { cancel() return nil, errors.Wrap(err, "error creating ledger backend") diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index bddafb3239..764c4765a0 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -360,28 +360,23 @@ type mockLedgerBackend struct { mock.Mock } -func (m *mockLedgerBackend) GetLatestLedgerSequence() (sequence uint32, err error) { - args := m.Called() +func (m *mockLedgerBackend) GetLatestLedgerSequence(ctx context.Context) (sequence uint32, err error) { + args := m.Called(ctx) return args.Get(0).(uint32), args.Error(1) } -func (m *mockLedgerBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) { - args := m.Called(sequence) - return args.Get(0).(bool), args.Get(1).(xdr.LedgerCloseMeta), args.Error(2) -} - -func (m *mockLedgerBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) { - args := m.Called(sequence) +func (m *mockLedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + args := m.Called(ctx, sequence) return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1) } -func (m *mockLedgerBackend) PrepareRange(ledgerRange ledgerbackend.Range) error { - args := m.Called(ledgerRange) +func (m *mockLedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error { + args := m.Called(ctx, ledgerRange) return args.Error(0) } -func (m *mockLedgerBackend) IsPrepared(ledgerRange ledgerbackend.Range) (bool, error) { - args := m.Called(ledgerRange) +func (m *mockLedgerBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) { + args := m.Called(ctx, ledgerRange) return args.Get(0).(bool), args.Error(1) } diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 92c95a675d..b6e2b4c2a8 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -52,9 +52,9 @@ func (s *ResumeTestTestSuite) SetupTest() { s.historyQ.On("Rollback", s.ctx).Return(nil).Once() - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(101)).Return(nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", uint32(101)).Return(xdr.LedgerCloseMeta{ + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)).Return(xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ @@ -95,8 +95,8 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { *s.historyQ = mockDBQ{} *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(101)).Return(errors.New("my error")).Once() + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(errors.New("my error")).Once() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) s.Assert().Error(err) @@ -112,9 +112,9 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() *s.historyQ = mockDBQ{} *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(101)).Return(nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", uint32(101)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) s.Assert().Error(err) @@ -291,9 +291,9 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { func (s *ResumeTestTestSuite) TestBumpIngestLedger() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.UnboundedRange(100)).Return(nil).Once() - s.ledgerBackend.On("GetLedgerBlocking", uint32(100)).Return(xdr.LedgerCloseMeta{ + s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index 5eb763e992..503becf979 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -144,7 +144,7 @@ func (s *VerifyRangeStateTestSuite) TestGetLastLedgerIngestNonEmpty() { func (s *VerifyRangeStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -157,7 +157,7 @@ func (s *VerifyRangeStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() }, }, } - s.ledgerBackend.On("GetLedger", uint32(100)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunHistoryArchiveIngestion", uint32(100), MaxSupportedProtocolVersion, xdr.Hash{1, 2, 3}).Return(ingest.StatsChangeProcessorResults{}, errors.New("my error")).Once() next, err := verifyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -172,7 +172,7 @@ func (s *VerifyRangeStateTestSuite) TestRunHistoryArchiveIngestionReturnsError() func (s *VerifyRangeStateTestSuite) TestSuccess() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -185,7 +185,7 @@ func (s *VerifyRangeStateTestSuite) TestSuccess() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(100)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunHistoryArchiveIngestion", uint32(100), MaxSupportedProtocolVersion, xdr.Hash{1, 2, 3}).Return(ingest.StatsChangeProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(100)).Return(nil).Once() @@ -203,7 +203,7 @@ func (s *VerifyRangeStateTestSuite) TestSuccess() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(i)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() s.runner.On("RunAllProcessorsOnLedger", meta).Return( ingest.StatsChangeProcessorResults{}, @@ -227,7 +227,7 @@ func (s *VerifyRangeStateTestSuite) TestSuccess() { func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() - s.ledgerBackend.On("PrepareRange", ledgerbackend.BoundedRange(100, 110)).Return(nil).Once() + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 110)).Return(nil).Once() meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -240,7 +240,7 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(100)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunHistoryArchiveIngestion", uint32(100), MaxSupportedProtocolVersion, xdr.Hash{1, 2, 3}).Return(ingest.StatsChangeProcessorResults{}, nil).Once() s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(100)).Return(nil).Once() @@ -258,7 +258,7 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { }, }, } - s.ledgerBackend.On("GetLedger", uint32(i)).Return(true, meta, nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() s.runner.On("RunAllProcessorsOnLedger", meta).Return( ingest.StatsChangeProcessorResults{},