diff --git a/exp/ingest/live_session.go b/exp/ingest/live_session.go index 7a68b3e40c..277d5c7148 100644 --- a/exp/ingest/live_session.go +++ b/exp/ingest/live_session.go @@ -17,6 +17,8 @@ var _ Session = &LiveSession{} const defaultCoreCursorName = "EXPINGESTLIVESESSION" +// Run runs the session starting from the last checkpoint ledger. +// Returns nil when session has been shutdown. func (s *LiveSession) Run() error { s.standardSession.shutdown = make(chan bool) @@ -96,6 +98,7 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error { } // Resume resumes the session from `ledgerSequence`. +// Returns nil when session has been shutdown. // // WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()` // to get the latest successfuly processed ledger after `Resume` returns error. @@ -107,6 +110,14 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error { func (s *LiveSession) Resume(ledgerSequence uint32) error { s.standardSession.shutdown = make(chan bool) + err := s.validate() + if err != nil { + return errors.Wrap(err, "Validation error") + } + + s.setRunningState(true) + defer s.setRunningState(false) + ledgerAdapter := &adapters.LedgerBackendAdapter{ Backend: s.LedgerBackend, } @@ -176,7 +187,6 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg select { case <-s.standardSession.shutdown: - s.LedgerPipeline.Shutdown() return nil case <-time.After(time.Second): // TODO make the idle time smaller @@ -193,27 +203,20 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg ledgerReader = reporterLedgerReader{ledgerReader, s.LedgerReporter} } - errChan := s.LedgerPipeline.Process(ledgerReader) - select { - case err2 := <-errChan: - if err2 != nil { - // Return with no errors if pipeline shutdown - if err2 == pipeline.ErrShutdown { - s.LedgerReporter.OnEndLedger(nil, true) - return nil - } - + err = <-s.LedgerPipeline.Process(ledgerReader) + if err != nil { + // Return with no errors if pipeline shutdown + if err == pipeline.ErrShutdown { if s.LedgerReporter != nil { - s.LedgerReporter.OnEndLedger(err2, false) + s.LedgerReporter.OnEndLedger(nil, true) } - return errors.Wrap(err2, "Ledger pipeline errored") + return nil } - case <-s.standardSession.shutdown: + if s.LedgerReporter != nil { - s.LedgerReporter.OnEndLedger(nil, true) + s.LedgerReporter.OnEndLedger(err, false) } - s.LedgerPipeline.Shutdown() - return nil + return errors.Wrap(err, "Ledger pipeline errored") } if s.LedgerReporter != nil { @@ -229,6 +232,14 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg } ledgerSequence++ + + // Exit early if Shutdown() was called. + select { + case <-s.standardSession.shutdown: + return nil + default: + // Continue + } } return nil @@ -276,26 +287,20 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter, stateReader = reporterStateReader{stateReader, s.StateReporter} } - errChan := s.StatePipeline.Process(stateReader) - select { - case err := <-errChan: - if err != nil { - // Return with no errors if pipeline shutdown - if err == pipeline.ErrShutdown { - s.StateReporter.OnEndState(nil, true) - return nil - } - + err = <-s.StatePipeline.Process(stateReader) + if err != nil { + // Return with no errors if pipeline shutdown + if err == pipeline.ErrShutdown { if s.StateReporter != nil { - s.StateReporter.OnEndState(err, false) + s.StateReporter.OnEndState(nil, true) } - return errors.Wrap(err, "State pipeline errored") + return nil } - case <-s.standardSession.shutdown: + if s.StateReporter != nil { - s.StateReporter.OnEndState(nil, true) + s.StateReporter.OnEndState(err, false) } - s.StatePipeline.Shutdown() + return errors.Wrap(err, "State pipeline errored") } if s.StateReporter != nil { @@ -303,3 +308,26 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter, } return nil } + +// Shutdown gracefully stops the pipelines and the session. This method blocks +// until pipelines are gracefully shutdown. +func (s *LiveSession) Shutdown() { + // Send shutdown signal + s.standardSession.Shutdown() + + // Shutdown pipelines + s.StatePipeline.Shutdown() + s.LedgerPipeline.Shutdown() + + // Shutdown signals sent, block/wait until pipelines are done + // shutting down. + for { + stateRunning := s.StatePipeline.IsRunning() + ledgerRunning := s.LedgerPipeline.IsRunning() + if stateRunning || ledgerRunning { + time.Sleep(time.Second) + continue + } + break + } +} diff --git a/exp/support/pipeline/pipeline.go b/exp/support/pipeline/pipeline.go index 24eed9c10f..edefb14dd2 100644 --- a/exp/support/pipeline/pipeline.go +++ b/exp/support/pipeline/pipeline.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "time" "github.com/stellar/go/support/errors" ) @@ -86,6 +85,14 @@ func (p *Pipeline) setRunning(setRunning bool) error { return nil } +// IsRunning returns true if pipeline is running +func (p *Pipeline) IsRunning() bool { + // Protects internal fields + p.mutex.Lock() + defer p.mutex.Unlock() + return p.running +} + // reset resets internal state of the pipeline and all the nodes and processors. func (p *Pipeline) reset() { p.cancelled = false @@ -192,8 +199,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip } }() - finishUpdatingStats := p.updateStats(node, reader, writer) - for i, child := range node.Children { wg.Add(1) go func(i int, child *PipelineNode) { @@ -209,8 +214,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip go func() { wg.Wait() - finishUpdatingStats <- true - if node == p.root { // If pipeline processing is finished run post-hooks and send error // if not already sent. @@ -259,40 +262,8 @@ func (p *Pipeline) Shutdown() { } p.shutDown = true p.cancelled = true - p.cancelFunc() -} - -func (p *Pipeline) updateStats(node *PipelineNode, reader Reader, writer *multiWriter) chan<- bool { - // Update stats - interval := time.Second - done := make(chan bool) - ticker := time.NewTicker(interval) - - go func() { - defer ticker.Stop() - - for { - // This is not thread-safe: check if Mutex slows it down a lot... - readBuffer, readBufferIsBufferedReadWriter := reader.(*BufferedReadWriter) - - node.writesPerSecond = (writer.wroteEntries - node.wroteEntries) * int(time.Second/interval) - node.wroteEntries = writer.wroteEntries - - if readBufferIsBufferedReadWriter { - node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval) - node.readEntries = readBuffer.readEntries - node.queuedEntries = readBuffer.QueuedEntries() - } - - select { - case <-ticker.C: - continue - case <-done: - // Pipeline done - return - } - } - }() - - return done + // It's possible that Shutdown will be called before first run. + if p.cancelFunc != nil { + p.cancelFunc() + } } diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 42bb0e1106..cec34705cc 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -297,6 +297,10 @@ func (s *System) Run() { "err": err, "last_ingested_ledger": lastIngestedLedger, }).Error("Error running session, resuming from the last ingested ledger") + } else { + // LiveSession.Run returns nil => shutdown + log.Info("Session shut down") + return nil } } else { // The other node already ingested a state (just now or in the past) diff --git a/services/horizon/internal/expingest/run_ingestion_test.go b/services/horizon/internal/expingest/run_ingestion_test.go index a62de34053..8aa15b1069 100644 --- a/services/horizon/internal/expingest/run_ingestion_test.go +++ b/services/horizon/internal/expingest/run_ingestion_test.go @@ -253,7 +253,6 @@ func (s *RunIngestionTestSuite) TestOutdatedIngestVersion() { s.session.On("TruncateTables", history.ExperimentalIngestionTables).Return(nil).Once() s.ingestSession.On("Run").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.ingestSession.On("Resume", uint32(4)).Return(nil).Once() s.system.retry = expectError(s.Assert(), "") }