From de3b969282ebeb0361e968750ea6a4fa437df993 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Tue, 12 Mar 2024 23:40:35 +0000 Subject: [PATCH 1/2] fix: logging deadlock Fix logging deadlock, causing lots of test timeouts. This refactors how logging shutdown is handled, eliminating unnecessary captures, use idiomatic wait group to signal processor completion and remove unnecessary nil initialisation. Fix race condition in log testing which was reading Msg while the processor was still running. Switch to checking GITHUB_RUN_ID environment variable to detect GitHub as XDG_RUNTIME_DIR can be present in other situations. --- docker.go | 112 ++++++++++++++++++++++---------------------- logconsumer_test.go | 87 ++++++++++++++++++++-------------- 2 files changed, 108 insertions(+), 91 deletions(-) diff --git a/docker.go b/docker.go index c946103556..a2ef2b69ed 100644 --- a/docker.go +++ b/docker.go @@ -63,16 +63,26 @@ type DockerContainer struct { isRunning bool imageWasBuilt bool // keepBuiltImage makes Terminate not remove the image if imageWasBuilt. - keepBuiltImage bool - provider *DockerProvider - sessionID string - terminationSignal chan bool - consumers []LogConsumer - raw *types.ContainerJSON - stopLogProductionCh chan bool - logProductionDone chan bool - logProductionError chan error - logProductionMutex sync.Mutex + keepBuiltImage bool + provider *DockerProvider + sessionID string + terminationSignal chan bool + consumers []LogConsumer + raw *types.ContainerJSON + logProductionError chan error + + // TODO: Remove locking and wait group once the deprecated StartLogProducer and + // StopLogProducer have been removed and hence logging can only be started and + // stopped once. + + // logProductionWaitGroup is used to signal when the log production has stopped. + // This allows stopLogProduction to safely set logProductionStop to nil. + logProductionWaitGroup sync.WaitGroup + + // logProductionMutex protects logProductionStop channel so it can be started again. + logProductionMutex sync.Mutex + logProductionStop chan struct{} + logProductionTimeout *time.Duration logger Logging lifecycleHooks []ContainerLifecycleHooks @@ -675,9 +685,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro c.logProductionMutex.Lock() defer c.logProductionMutex.Unlock() - if c.stopLogProductionCh != nil { + if c.logProductionStop != nil { return errors.New("log production already started") } + + c.logProductionStop = make(chan struct{}) + c.logProductionWaitGroup.Add(1) } for _, opt := range opts { @@ -699,21 +712,12 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro c.logProductionTimeout = &maxLogProductionTimeout } - c.stopLogProductionCh = make(chan bool) - c.logProductionDone = make(chan bool) c.logProductionError = make(chan error, 1) - go func(stop <-chan bool, done chan<- bool, errorCh chan error) { - // signal the log production is done once go routine exits, this prevents race conditions around start/stop - // set c.stopLogProductionCh to nil so that it can be started again + go func() { defer func() { - defer c.logProductionMutex.Unlock() - close(done) - close(errorCh) - { - c.logProductionMutex.Lock() - c.stopLogProductionCh = nil - } + close(c.logProductionError) + c.logProductionWaitGroup.Done() }() since := "" @@ -731,15 +735,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { - errorCh <- err + c.logProductionError <- err return } defer c.provider.Close() for { select { - case <-stop: - errorCh <- r.Close() + case <-c.logProductionStop: + c.logProductionError <- r.Close() return default: h := make([]byte, 8) @@ -795,7 +799,7 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro } } } - }(c.stopLogProductionCh, c.logProductionDone, c.logProductionError) + }() return nil } @@ -805,17 +809,18 @@ func (c *DockerContainer) StopLogProducer() error { return c.stopLogProduction() } -// StopLogProducer will stop the concurrent process that is reading logs +// stopLogProduction will stop the concurrent process that is reading logs // and sending them to each added LogConsumer func (c *DockerContainer) stopLogProduction() error { + // TODO: Remove locking and wait group once StartLogProducer and StopLogProducer + // have been removed and hence logging can only be started / stopped once. c.logProductionMutex.Lock() defer c.logProductionMutex.Unlock() - if c.stopLogProductionCh != nil { - c.stopLogProductionCh <- true - // block until the log production is actually done in order to avoid strange races - <-c.logProductionDone - c.stopLogProductionCh = nil - c.logProductionDone = nil + if c.logProductionStop != nil { + close(c.logProductionStop) + c.logProductionWaitGroup.Wait() + // Set c.logProductionStop to nil so that it can be started again. + c.logProductionStop = nil return <-c.logProductionError } return nil @@ -1122,17 +1127,16 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } c := &DockerContainer{ - ID: resp.ID, - WaitingFor: req.WaitingFor, - Image: imageName, - imageWasBuilt: req.ShouldBuildImage(), - keepBuiltImage: req.ShouldKeepBuiltImage(), - sessionID: core.SessionID(), - provider: p, - terminationSignal: termSignal, - stopLogProductionCh: nil, - logger: p.Logger, - lifecycleHooks: req.LifecycleHooks, + ID: resp.ID, + WaitingFor: req.WaitingFor, + Image: imageName, + imageWasBuilt: req.ShouldBuildImage(), + keepBuiltImage: req.ShouldKeepBuiltImage(), + sessionID: core.SessionID(), + provider: p, + terminationSignal: termSignal, + logger: p.Logger, + lifecycleHooks: req.LifecycleHooks, } err = c.createdHook(ctx) @@ -1225,15 +1229,14 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain } dc := &DockerContainer{ - ID: c.ID, - WaitingFor: req.WaitingFor, - Image: c.Image, - sessionID: sessionID, - provider: p, - terminationSignal: termSignal, - stopLogProductionCh: nil, - logger: p.Logger, - lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)}, + ID: c.ID, + WaitingFor: req.WaitingFor, + Image: c.Image, + sessionID: sessionID, + provider: p, + terminationSignal: termSignal, + logger: p.Logger, + lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)}, } err = dc.startedHook(ctx) @@ -1545,7 +1548,6 @@ func containerFromDockerResponse(ctx context.Context, response types.Container) container.sessionID = core.SessionID() container.consumers = []LogConsumer{} - container.stopLogProductionCh = nil container.isRunning = response.State == "running" // the termination signal should be obtained from the reaper diff --git a/logconsumer_test.go b/logconsumer_test.go index 9c9b25fa09..192a00f954 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "strings" + "sync" "testing" "time" @@ -23,8 +24,9 @@ import ( const lastMessage = "DONE" type TestLogConsumer struct { - Msgs []string - Done chan bool + mtx sync.Mutex + msgs []string + Done chan struct{} // Accepted provides a blocking way of ensuring the logs messages have been consumed. // This allows for proper synchronization during Test_StartStop in particular. @@ -35,11 +37,21 @@ type TestLogConsumer struct { func (g *TestLogConsumer) Accept(l Log) { s := string(l.Content) if s == fmt.Sprintf("echo %s\n", lastMessage) { - g.Done <- true + close(g.Done) return } g.Accepted <- s - g.Msgs = append(g.Msgs, s) + + g.mtx.Lock() + defer g.mtx.Unlock() + g.msgs = append(g.msgs, s) +} + +func (g *TestLogConsumer) Msgs() []string { + g.mtx.Lock() + defer g.mtx.Unlock() + + return g.msgs } // devNullAcceptorChan returns string channel that essentially sends all strings to dev null @@ -57,8 +69,8 @@ func Test_LogConsumerGetsCalled(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -100,7 +112,7 @@ func Test_LogConsumerGetsCalled(t *testing.T) { t.Fatal("never received final log message") } - assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs()) terminateContainerOnEnd(t, ctx, c) } @@ -172,13 +184,13 @@ func Test_MultipleLogConsumers(t *testing.T) { ctx := context.Background() first := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } second := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -214,13 +226,13 @@ func Test_MultipleLogConsumers(t *testing.T) { <-first.Done <-second.Done - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs) - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs()) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs()) require.NoError(t, c.Terminate(ctx)) } func TestContainerLogWithErrClosed(t *testing.T) { - if os.Getenv("XDG_RUNTIME_DIR") != "" { + if os.Getenv("GITHUB_RUN_ID") != "" { t.Skip("Skipping as flaky on GitHub Actions, Please see https://github.com/testcontainers/testcontainers-go/issues/1924") } @@ -290,8 +302,8 @@ func TestContainerLogWithErrClosed(t *testing.T) { } consumer := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -317,7 +329,7 @@ func TestContainerLogWithErrClosed(t *testing.T) { // Gather the initial container logs time.Sleep(time.Second * 1) - existingLogs := len(consumer.Msgs) + existingLogs := len(consumer.Msgs()) hitNginx := func() { i, _, err := dind.Exec(ctx, []string{"wget", "--spider", "localhost:" + port.Port()}) @@ -328,10 +340,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { hitNginx() time.Sleep(time.Second * 1) - if len(consumer.Msgs)-existingLogs != 1 { - t.Fatalf("logConsumer should have 1 new log message, instead has: %v", consumer.Msgs[existingLogs:]) + msgs := consumer.Msgs() + if len(msgs)-existingLogs != 1 { + t.Fatalf("logConsumer should have 1 new log message, instead has: %v", msgs[existingLogs:]) } - existingLogs = len(consumer.Msgs) + existingLogs = len(consumer.Msgs()) iptableArgs := []string{ "INPUT", "-p", "tcp", "--dport", "2375", @@ -351,10 +364,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { hitNginx() hitNginx() time.Sleep(time.Second * 1) - if len(consumer.Msgs)-existingLogs != 2 { + msgs = consumer.Msgs() + if len(msgs)-existingLogs != 2 { t.Fatalf( "LogConsumer should have 2 new log messages after detecting closed connection and"+ - " re-requesting logs. Instead has:\n%s", consumer.Msgs[existingLogs:], + " re-requesting logs. Instead has:\n%s", msgs[existingLogs:], ) } } @@ -389,8 +403,8 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) { func TestContainerLogsEnableAtStart(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -434,7 +448,7 @@ func TestContainerLogsEnableAtStart(t *testing.T) { case <-time.After(10 * time.Second): t.Fatal("never received final log message") } - assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs()) terminateContainerOnEnd(t, ctx, c) } @@ -443,8 +457,8 @@ func Test_StartLogProductionStillStartsWithTooLowTimeout(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -475,8 +489,8 @@ func Test_StartLogProductionStillStartsWithTooHighTimeout(t *testing.T) { ctx := context.Background() g := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -518,10 +532,11 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // Context with cancellation functionality for simulating user interruption ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Ensure it gets called. first := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -555,8 +570,8 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { require.NoError(t, err) second := TestLogConsumer{ - Msgs: []string{}, - Done: make(chan bool), + msgs: []string{}, + Done: make(chan struct{}), Accepted: devNullAcceptorChan(), } @@ -592,7 +607,7 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // Handling the termination of the containers defer func() { shutdownCtx, shutdownCancel := context.WithTimeout( - context.Background(), 60*time.Second, + context.Background(), 10*time.Second, ) defer shutdownCancel() _ = c.Terminate(shutdownCtx) @@ -604,8 +619,8 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // We check log size due to context cancellation causing // varying message counts, leading to test failure. - assert.GreaterOrEqual(t, len(first.Msgs), 2) - assert.GreaterOrEqual(t, len(second.Msgs), 2) + assert.GreaterOrEqual(t, len(first.Msgs()), 2) + assert.GreaterOrEqual(t, len(second.Msgs()), 2) // Restore stderr w.Close() From 35cb59d38cf6bc741bd2f6d80eea0f68206f09d9 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Wed, 3 Apr 2024 19:25:29 +0100 Subject: [PATCH 2/2] fix: Docker container Terminate returning early Fix Docker container Terminate returning early when context is cancelled, which was leaving orphaned running containers. This ensures that all life cycle hooks are run even if one errors returning a multi error if needed. --- docker.go | 29 ++++------- lifecycle.go | 137 +++++++++++++++++++++------------------------------ 2 files changed, 64 insertions(+), 102 deletions(-) diff --git a/docker.go b/docker.go index a2ef2b69ed..6d10cf92a0 100644 --- a/docker.go +++ b/docker.go @@ -274,22 +274,13 @@ func (c *DockerContainer) Terminate(ctx context.Context) error { defer c.provider.client.Close() - err := c.terminatingHook(ctx) - if err != nil { - return err - } - - err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{ - RemoveVolumes: true, - Force: true, - }) - if err != nil { - return err - } - - err = c.terminatedHook(ctx) - if err != nil { - return err + errs := []error{ + c.terminatingHook(ctx), + c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{ + RemoveVolumes: true, + Force: true, + }), + c.terminatedHook(ctx), } if c.imageWasBuilt && !c.keepBuiltImage { @@ -297,14 +288,12 @@ func (c *DockerContainer) Terminate(ctx context.Context) error { Force: true, PruneChildren: true, }) - if err != nil { - return err - } + errs = append(errs, err) } c.sessionID = "" c.isRunning = false - return nil + return errors.Join(errs...) } // update container raw info diff --git a/lifecycle.go b/lifecycle.go index 4a10a90842..1cd0a8beb7 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -2,6 +2,7 @@ package testcontainers import ( "context" + "errors" "fmt" "io" "strings" @@ -225,65 +226,40 @@ var defaultReadinessHook = func() ContainerLifecycleHooks { // creatingHook is a hook that will be called before a container is created. func (req ContainerRequest) creatingHook(ctx context.Context) error { - for _, lifecycleHooks := range req.LifecycleHooks { - err := lifecycleHooks.Creating(ctx)(req) - if err != nil { - return err - } + errs := make([]error, len(req.LifecycleHooks)) + for i, lifecycleHooks := range req.LifecycleHooks { + errs[i] = lifecycleHooks.Creating(ctx)(req) } - return nil + return errors.Join(errs...) } -// createdHook is a hook that will be called after a container is created +// createdHook is a hook that will be called after a container is created. func (c *DockerContainer) createdHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostCreates)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostCreates + }) } -// startingHook is a hook that will be called before a container is started +// startingHook is a hook that will be called before a container is started. func (c *DockerContainer) startingHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PreStarts)(c) - if err != nil { - c.printLogs(ctx, err) - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, true, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PreStarts + }) } -// startedHook is a hook that will be called after a container is started +// startedHook is a hook that will be called after a container is started. func (c *DockerContainer) startedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostStarts)(c) - if err != nil { - c.printLogs(ctx, err) - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, true, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostStarts + }) } -// readiedHook is a hook that will be called after a container is ready +// readiedHook is a hook that will be called after a container is ready. func (c *DockerContainer) readiedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostReadies)(c) - if err != nil { - c.printLogs(ctx, err) - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, true, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostReadies + }) } // printLogs is a helper function that will print the logs of a Docker container @@ -304,49 +280,47 @@ func (c *DockerContainer) printLogs(ctx context.Context, cause error) { c.logger.Printf("container logs (%s):\n%s", cause, b) } -// stoppingHook is a hook that will be called before a container is stopped +// stoppingHook is a hook that will be called before a container is stopped. func (c *DockerContainer) stoppingHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PreStops)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PreStops + }) } -// stoppedHook is a hook that will be called after a container is stopped +// stoppedHook is a hook that will be called after a container is stopped. func (c *DockerContainer) stoppedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostStops)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostStops + }) } -// terminatingHook is a hook that will be called before a container is terminated +// terminatingHook is a hook that will be called before a container is terminated. func (c *DockerContainer) terminatingHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PreTerminates)(c) - if err != nil { - return err - } - } - - return nil + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PreTerminates + }) } -// terminatedHook is a hook that will be called after a container is terminated +// terminatedHook is a hook that will be called after a container is terminated. func (c *DockerContainer) terminatedHook(ctx context.Context) error { - for _, lifecycleHooks := range c.lifecycleHooks { - err := containerHookFn(ctx, lifecycleHooks.PostTerminates)(c) - if err != nil { - return err + return c.applyLifecycleHooks(ctx, false, func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook { + return lifecycleHooks.PostTerminates + }) +} + +// applyLifecycleHooks applies all lifecycle hooks reporting the container logs on error if logError is true. +func (c *DockerContainer) applyLifecycleHooks(ctx context.Context, logError bool, hooks func(lifecycleHooks ContainerLifecycleHooks) []ContainerHook) error { + errs := make([]error, len(c.lifecycleHooks)) + for i, lifecycleHooks := range c.lifecycleHooks { + errs[i] = containerHookFn(ctx, hooks(lifecycleHooks))(c) + } + + if err := errors.Join(errs...); err != nil { + if logError { + c.printLogs(ctx, err) } + + return err } return nil @@ -369,13 +343,12 @@ func (c ContainerLifecycleHooks) Creating(ctx context.Context) func(req Containe // container lifecycle hooks. The created function will iterate over all the hooks and call them one by one. func containerHookFn(ctx context.Context, containerHook []ContainerHook) func(container Container) error { return func(container Container) error { - for _, hook := range containerHook { - if err := hook(ctx, container); err != nil { - return err - } + errs := make([]error, len(containerHook)) + for i, hook := range containerHook { + errs[i] = hook(ctx, container) } - return nil + return errors.Join(errs...) } }