Skip to content

Commit

Permalink
fix: logging deadlock
Browse files Browse the repository at this point in the history
Fix logging deadlock, causing lots of test timeouts.

This refactors how logging shutdown is handled, eliminating unnecessary
captures, use idiomatic wait group to signal processor completion and
remove unnecessary nil initialisation.

Fix race condition in log testing which was reading Msg while the
processor was still running.

Switch to checking GITHUB_RUN_ID environment variable to detect GitHub
as XDG_RUNTIME_DIR can be present in other situations.
  • Loading branch information
stevenh committed Mar 12, 2024
1 parent fe0d3a8 commit b2cc7d7
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 91 deletions.
107 changes: 52 additions & 55 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,21 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
stopLogProductionCh chan bool
logProductionDone chan bool
logProductionError chan error
logProductionMutex sync.Mutex
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
logProductionError chan error

// logProductionMutex protects logProductionStop channel so it can be started again.
// TODO: Remove locking once StartLogProducer has been removed and hence logging can
// only be started once.
logProductionMutex sync.Mutex
logProductionWaitGroup sync.WaitGroup
logProductionStop chan struct{}

logProductionTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
Expand Down Expand Up @@ -652,9 +657,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()

if c.stopLogProductionCh != nil {
if c.logProductionStop != nil {
return errors.New("log production already started")
}

c.logProductionStop = make(chan struct{})
c.logProductionWaitGroup.Add(1)
}

for _, opt := range opts {
Expand All @@ -676,21 +684,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
c.logProductionTimeout = &maxLogProductionTimeout
}

c.stopLogProductionCh = make(chan bool)
c.logProductionDone = make(chan bool)
c.logProductionError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the log production is done once go routine exits, this prevents race conditions around start/stop
// set c.stopLogProductionCh to nil so that it can be started again
go func() {
defer func() {
defer c.logProductionMutex.Unlock()
close(done)
close(errorCh)
{
c.logProductionMutex.Lock()
c.stopLogProductionCh = nil
}
close(c.logProductionError)
c.logProductionWaitGroup.Done()
}()

since := ""
Expand All @@ -708,15 +707,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
errorCh <- err
c.logProductionError <- err
return
}
defer c.provider.Close()

for {
select {
case <-stop:
errorCh <- r.Close()
case <-c.logProductionStop:
c.logProductionError <- r.Close()
return
default:
h := make([]byte, 8)
Expand Down Expand Up @@ -772,7 +771,7 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
}
}
}
}(c.stopLogProductionCh, c.logProductionDone, c.logProductionError)
}()

return nil
}
Expand All @@ -782,17 +781,18 @@ func (c *DockerContainer) StopLogProducer() error {
return c.stopLogProduction()
}

// StopLogProducer will stop the concurrent process that is reading logs
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()
if c.stopLogProductionCh != nil {
c.stopLogProductionCh <- true
// block until the log production is actually done in order to avoid strange races
<-c.logProductionDone
c.stopLogProductionCh = nil
c.logProductionDone = nil
if c.logProductionStop != nil {
close(c.logProductionStop)
c.logProductionWaitGroup.Wait()
// Set c.logProductionStop to nil so that it can be started again.
// TODO: Remove this once StartLogProducer has been removed and hence logging can
// only be started once.
c.logProductionStop = nil
return <-c.logProductionError
}
return nil
Expand Down Expand Up @@ -1113,17 +1113,16 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

c := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
}

err = c.createdHook(ctx)
Expand Down Expand Up @@ -1216,15 +1215,14 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
}

dc := &DockerContainer{
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)},
}

err = dc.startedHook(ctx)
Expand Down Expand Up @@ -1526,7 +1524,6 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)

container.sessionID = core.SessionID()
container.consumers = []LogConsumer{}
container.stopLogProductionCh = nil
container.isRunning = response.State == "running"

// the termination signal should be obtained from the reaper
Expand Down
Loading

0 comments on commit b2cc7d7

Please sign in to comment.