Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest/pipeline: Fix pipeline data race during shutdown #2058

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions exp/ingest/live_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var _ Session = &LiveSession{}

const defaultCoreCursorName = "EXPINGESTLIVESESSION"

// Run runs the session starting from the last checkpoint ledger.
// Returns nil when session has been shutdown.
func (s *LiveSession) Run() error {
s.standardSession.shutdown = make(chan bool)

Expand Down Expand Up @@ -96,6 +98,7 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error {
}

// Resume resumes the session from `ledgerSequence`.
// Returns nil when session has been shutdown.
//
// WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()`
// to get the latest successfuly processed ledger after `Resume` returns error.
Expand All @@ -107,6 +110,14 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error {
func (s *LiveSession) Resume(ledgerSequence uint32) error {
s.standardSession.shutdown = make(chan bool)

err := s.validate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bartekn did we forget to add this before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are two entry points to LiveSession: Run or Resume.

if err != nil {
return errors.Wrap(err, "Validation error")
}

s.setRunningState(true)
defer s.setRunningState(false)

ledgerAdapter := &adapters.LedgerBackendAdapter{
Backend: s.LedgerBackend,
}
Expand Down Expand Up @@ -176,7 +187,6 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg

select {
case <-s.standardSession.shutdown:
s.LedgerPipeline.Shutdown()
return nil
case <-time.After(time.Second):
// TODO make the idle time smaller
Expand All @@ -193,27 +203,20 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg
ledgerReader = reporterLedgerReader{ledgerReader, s.LedgerReporter}
}

errChan := s.LedgerPipeline.Process(ledgerReader)
select {
case err2 := <-errChan:
if err2 != nil {
// Return with no errors if pipeline shutdown
if err2 == pipeline.ErrShutdown {
s.LedgerReporter.OnEndLedger(nil, true)
return nil
}

err = <-s.LedgerPipeline.Process(ledgerReader)
if err != nil {
// Return with no errors if pipeline shutdown
if err == pipeline.ErrShutdown {
if s.LedgerReporter != nil {
s.LedgerReporter.OnEndLedger(err2, false)
s.LedgerReporter.OnEndLedger(nil, true)
}
return errors.Wrap(err2, "Ledger pipeline errored")
return nil
}
case <-s.standardSession.shutdown:

if s.LedgerReporter != nil {
s.LedgerReporter.OnEndLedger(nil, true)
s.LedgerReporter.OnEndLedger(err, false)
}
s.LedgerPipeline.Shutdown()
return nil
return errors.Wrap(err, "Ledger pipeline errored")
}

if s.LedgerReporter != nil {
Expand All @@ -229,6 +232,14 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg
}

ledgerSequence++

// Exit early if Shutdown() was called.
select {
case <-s.standardSession.shutdown:
return nil
default:
// Continue
}
}

return nil
Expand Down Expand Up @@ -276,30 +287,47 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter,
stateReader = reporterStateReader{stateReader, s.StateReporter}
}

errChan := s.StatePipeline.Process(stateReader)
select {
case err := <-errChan:
if err != nil {
// Return with no errors if pipeline shutdown
if err == pipeline.ErrShutdown {
s.StateReporter.OnEndState(nil, true)
return nil
}

err = <-s.StatePipeline.Process(stateReader)
if err != nil {
// Return with no errors if pipeline shutdown
if err == pipeline.ErrShutdown {
if s.StateReporter != nil {
s.StateReporter.OnEndState(err, false)
s.StateReporter.OnEndState(nil, true)
}
return errors.Wrap(err, "State pipeline errored")
return nil
}
case <-s.standardSession.shutdown:

if s.StateReporter != nil {
s.StateReporter.OnEndState(nil, true)
s.StateReporter.OnEndState(err, false)
}
s.StatePipeline.Shutdown()
return errors.Wrap(err, "State pipeline errored")
}

if s.StateReporter != nil {
s.StateReporter.OnEndState(nil, false)
}
return nil
}

// Shutdown gracefully stops the pipelines and the session. This method blocks
// until pipelines are gracefully shutdown.
func (s *LiveSession) Shutdown() {
// Send shutdown signal
s.standardSession.Shutdown()

// Shutdown pipelines
s.StatePipeline.Shutdown()
s.LedgerPipeline.Shutdown()

// Shutdown signals sent, block/wait until pipelines are done
// shutting down.
for {
stateRunning := s.StatePipeline.IsRunning()
ledgerRunning := s.LedgerPipeline.IsRunning()
if stateRunning || ledgerRunning {
time.Sleep(time.Second)
continue
}
break
}
}
53 changes: 12 additions & 41 deletions exp/support/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/stellar/go/support/errors"
)
Expand Down Expand Up @@ -86,6 +85,14 @@ func (p *Pipeline) setRunning(setRunning bool) error {
return nil
}

// IsRunning returns true if pipeline is running
func (p *Pipeline) IsRunning() bool {
// Protects internal fields
p.mutex.Lock()
defer p.mutex.Unlock()
return p.running
}

// reset resets internal state of the pipeline and all the nodes and processors.
func (p *Pipeline) reset() {
p.cancelled = false
Expand Down Expand Up @@ -192,8 +199,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip
}
}()

finishUpdatingStats := p.updateStats(node, reader, writer)

for i, child := range node.Children {
wg.Add(1)
go func(i int, child *PipelineNode) {
Expand All @@ -209,8 +214,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip

go func() {
wg.Wait()
finishUpdatingStats <- true

if node == p.root {
// If pipeline processing is finished run post-hooks and send error
// if not already sent.
Expand Down Expand Up @@ -259,40 +262,8 @@ func (p *Pipeline) Shutdown() {
}
p.shutDown = true
p.cancelled = true
p.cancelFunc()
}

func (p *Pipeline) updateStats(node *PipelineNode, reader Reader, writer *multiWriter) chan<- bool {
// Update stats
interval := time.Second
done := make(chan bool)
ticker := time.NewTicker(interval)

go func() {
defer ticker.Stop()

for {
// This is not thread-safe: check if Mutex slows it down a lot...
readBuffer, readBufferIsBufferedReadWriter := reader.(*BufferedReadWriter)

node.writesPerSecond = (writer.wroteEntries - node.wroteEntries) * int(time.Second/interval)
node.wroteEntries = writer.wroteEntries

if readBufferIsBufferedReadWriter {
node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval)
node.readEntries = readBuffer.readEntries
node.queuedEntries = readBuffer.QueuedEntries()
}

select {
case <-ticker.C:
continue
case <-done:
// Pipeline done
return
}
}
}()

return done
// It's possible that Shutdown will be called before first run.
if p.cancelFunc != nil {
p.cancelFunc()
}
}
4 changes: 4 additions & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func (s *System) Run() {
"err": err,
"last_ingested_ledger": lastIngestedLedger,
}).Error("Error running session, resuming from the last ingested ledger")
} else {
// LiveSession.Run returns nil => shutdown
log.Info("Session shut down")
return nil
}
} else {
// The other node already ingested a state (just now or in the past)
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/expingest/run_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func (s *RunIngestionTestSuite) TestOutdatedIngestVersion() {
s.session.On("TruncateTables", history.ExperimentalIngestionTables).Return(nil).Once()
s.ingestSession.On("Run").Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
s.ingestSession.On("Resume", uint32(4)).Return(nil).Once()
s.system.retry = expectError(s.Assert(), "")
}

Expand Down