Skip to content

Commit

Permalink
exp/ingest/pipeline: Fix pipeline data race during shutdown (#2058)
Browse files Browse the repository at this point in the history
This commit fixes data race in `exp/ingest/pipeline` that can occur when
`LiveSession` (and Horizon) is shut down.

It also removes `updateStats` method that was known to have a data race
(see comment in that method). It is not actively used right now but was
being reported by race detector.

Previous code handling shutdown signal in `LiveSession` can be found below:
```
 errChan := s.LedgerPipeline.Process(ledgerReader) 
 select { 
 case err2 := <-errChan: 
 	if err2 != nil { 
 		// Return with no errors if pipeline shutdown 
 		if err2 == pipeline.ErrShutdown { 
 			s.LedgerReporter.OnEndLedger(nil, true) 
 			return nil 
 		} 
  
 		if s.LedgerReporter != nil { 
 			s.LedgerReporter.OnEndLedger(err2, false) 
 		} 
 		return errors.Wrap(err2, "Ledger pipeline errored") 
 	} 
 case <-s.standardSession.shutdown: 
 	if s.LedgerReporter != nil { 
 		s.LedgerReporter.OnEndLedger(nil, true) 
 	} 
 	s.LedgerPipeline.Shutdown() 
 	return nil 
 } 
```

The problem is when shutdown signal is received, `Resume` returns `nil`
so Horizon starts it's shutdown code which calls `Rollback()` (using
internal `tx` object) but at the same time pipeline is still running
until the code receiving from `ctx.Done` channel is executed. It means
that pipeline processors can execute transactions using `tx` transaction
object in DB session.

To fix this:
1. We don't `select` ingest session shutdown signal when waiting for
   pipeline to finish processing.
2. Instead we call `Shutdown` on pipelines inside `LiveSession.Shutdown`.
3. Then we wait/block until pipelines gracefully shutdown by calling
   `Pipeline.IsRunning` method.
4. Finally we `close(s.shutdown)` inside `expingest/System.Shutdown()`.

So the components now shut down exactly in the following order:
1. Pipelines.
2. Session.
3. Horizon Expingest System.

One comment on `-1` change in tests. When `ingestSession.Run()` returns
`nil` we shouldn't continue to `ingestSession.Resume()` because `nil`
value means that session ended. I updated the comment in `LiveSession`
and also fixed Horizon code.
  • Loading branch information
bartekn authored Dec 16, 2019
1 parent 5e4d247 commit 837b12c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 74 deletions.
92 changes: 60 additions & 32 deletions exp/ingest/live_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var _ Session = &LiveSession{}

const defaultCoreCursorName = "EXPINGESTLIVESESSION"

// Run runs the session starting from the last checkpoint ledger.
// Returns nil when session has been shutdown.
func (s *LiveSession) Run() error {
s.standardSession.shutdown = make(chan bool)

Expand Down Expand Up @@ -96,6 +98,7 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error {
}

// Resume resumes the session from `ledgerSequence`.
// Returns nil when session has been shutdown.
//
// WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()`
// to get the latest successfuly processed ledger after `Resume` returns error.
Expand All @@ -107,6 +110,14 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error {
func (s *LiveSession) Resume(ledgerSequence uint32) error {
s.standardSession.shutdown = make(chan bool)

err := s.validate()
if err != nil {
return errors.Wrap(err, "Validation error")
}

s.setRunningState(true)
defer s.setRunningState(false)

ledgerAdapter := &adapters.LedgerBackendAdapter{
Backend: s.LedgerBackend,
}
Expand Down Expand Up @@ -176,7 +187,6 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg

select {
case <-s.standardSession.shutdown:
s.LedgerPipeline.Shutdown()
return nil
case <-time.After(time.Second):
// TODO make the idle time smaller
Expand All @@ -193,27 +203,20 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg
ledgerReader = reporterLedgerReader{ledgerReader, s.LedgerReporter}
}

errChan := s.LedgerPipeline.Process(ledgerReader)
select {
case err2 := <-errChan:
if err2 != nil {
// Return with no errors if pipeline shutdown
if err2 == pipeline.ErrShutdown {
s.LedgerReporter.OnEndLedger(nil, true)
return nil
}

err = <-s.LedgerPipeline.Process(ledgerReader)
if err != nil {
// Return with no errors if pipeline shutdown
if err == pipeline.ErrShutdown {
if s.LedgerReporter != nil {
s.LedgerReporter.OnEndLedger(err2, false)
s.LedgerReporter.OnEndLedger(nil, true)
}
return errors.Wrap(err2, "Ledger pipeline errored")
return nil
}
case <-s.standardSession.shutdown:

if s.LedgerReporter != nil {
s.LedgerReporter.OnEndLedger(nil, true)
s.LedgerReporter.OnEndLedger(err, false)
}
s.LedgerPipeline.Shutdown()
return nil
return errors.Wrap(err, "Ledger pipeline errored")
}

if s.LedgerReporter != nil {
Expand All @@ -229,6 +232,14 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg
}

ledgerSequence++

// Exit early if Shutdown() was called.
select {
case <-s.standardSession.shutdown:
return nil
default:
// Continue
}
}

return nil
Expand Down Expand Up @@ -276,30 +287,47 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter,
stateReader = reporterStateReader{stateReader, s.StateReporter}
}

errChan := s.StatePipeline.Process(stateReader)
select {
case err := <-errChan:
if err != nil {
// Return with no errors if pipeline shutdown
if err == pipeline.ErrShutdown {
s.StateReporter.OnEndState(nil, true)
return nil
}

err = <-s.StatePipeline.Process(stateReader)
if err != nil {
// Return with no errors if pipeline shutdown
if err == pipeline.ErrShutdown {
if s.StateReporter != nil {
s.StateReporter.OnEndState(err, false)
s.StateReporter.OnEndState(nil, true)
}
return errors.Wrap(err, "State pipeline errored")
return nil
}
case <-s.standardSession.shutdown:

if s.StateReporter != nil {
s.StateReporter.OnEndState(nil, true)
s.StateReporter.OnEndState(err, false)
}
s.StatePipeline.Shutdown()
return errors.Wrap(err, "State pipeline errored")
}

if s.StateReporter != nil {
s.StateReporter.OnEndState(nil, false)
}
return nil
}

// Shutdown gracefully stops the pipelines and the session. This method blocks
// until pipelines are gracefully shutdown.
func (s *LiveSession) Shutdown() {
// Send shutdown signal
s.standardSession.Shutdown()

// Shutdown pipelines
s.StatePipeline.Shutdown()
s.LedgerPipeline.Shutdown()

// Shutdown signals sent, block/wait until pipelines are done
// shutting down.
for {
stateRunning := s.StatePipeline.IsRunning()
ledgerRunning := s.LedgerPipeline.IsRunning()
if stateRunning || ledgerRunning {
time.Sleep(time.Second)
continue
}
break
}
}
53 changes: 12 additions & 41 deletions exp/support/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/stellar/go/support/errors"
)
Expand Down Expand Up @@ -86,6 +85,14 @@ func (p *Pipeline) setRunning(setRunning bool) error {
return nil
}

// IsRunning returns true if pipeline is running
func (p *Pipeline) IsRunning() bool {
// Protects internal fields
p.mutex.Lock()
defer p.mutex.Unlock()
return p.running
}

// reset resets internal state of the pipeline and all the nodes and processors.
func (p *Pipeline) reset() {
p.cancelled = false
Expand Down Expand Up @@ -192,8 +199,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip
}
}()

finishUpdatingStats := p.updateStats(node, reader, writer)

for i, child := range node.Children {
wg.Add(1)
go func(i int, child *PipelineNode) {
Expand All @@ -209,8 +214,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip

go func() {
wg.Wait()
finishUpdatingStats <- true

if node == p.root {
// If pipeline processing is finished run post-hooks and send error
// if not already sent.
Expand Down Expand Up @@ -259,40 +262,8 @@ func (p *Pipeline) Shutdown() {
}
p.shutDown = true
p.cancelled = true
p.cancelFunc()
}

func (p *Pipeline) updateStats(node *PipelineNode, reader Reader, writer *multiWriter) chan<- bool {
// Update stats
interval := time.Second
done := make(chan bool)
ticker := time.NewTicker(interval)

go func() {
defer ticker.Stop()

for {
// This is not thread-safe: check if Mutex slows it down a lot...
readBuffer, readBufferIsBufferedReadWriter := reader.(*BufferedReadWriter)

node.writesPerSecond = (writer.wroteEntries - node.wroteEntries) * int(time.Second/interval)
node.wroteEntries = writer.wroteEntries

if readBufferIsBufferedReadWriter {
node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval)
node.readEntries = readBuffer.readEntries
node.queuedEntries = readBuffer.QueuedEntries()
}

select {
case <-ticker.C:
continue
case <-done:
// Pipeline done
return
}
}
}()

return done
// It's possible that Shutdown will be called before first run.
if p.cancelFunc != nil {
p.cancelFunc()
}
}
4 changes: 4 additions & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func (s *System) Run() {
"err": err,
"last_ingested_ledger": lastIngestedLedger,
}).Error("Error running session, resuming from the last ingested ledger")
} else {
// LiveSession.Run returns nil => shutdown
log.Info("Session shut down")
return nil
}
} else {
// The other node already ingested a state (just now or in the past)
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/expingest/run_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ func (s *RunIngestionTestSuite) TestOutdatedIngestVersion() {
s.session.On("TruncateTables", history.ExperimentalIngestionTables).Return(nil).Once()
s.ingestSession.On("Run").Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
s.ingestSession.On("Resume", uint32(4)).Return(nil).Once()
s.system.retry = expectError(s.Assert(), "")
}

Expand Down

0 comments on commit 837b12c

Please sign in to comment.