Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

acceptance: fix spurious docker test failure unexpected extra event &{0 die} (after []) #86862

Merged
merged 7 commits into from
Aug 25, 2022
2 changes: 1 addition & 1 deletion pkg/acceptance/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDockerC(t *testing.T) {
}

func TestDockerCSharp(t *testing.T) {
skip.WithIssue(t, 58218, "flaky test")
skip.WithIssue(t, 86852, "test not to up to date anymore")
s := log.Scope(t)
defer s.Close(t)

Expand Down
6 changes: 3 additions & 3 deletions pkg/acceptance/cluster/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func (c *Container) Restart(ctx context.Context, timeout *time.Duration) error {
return nil
}

// Wait waits for a running container to exit.
func (c *Container) Wait(ctx context.Context, condition container.WaitCondition) error {
waitOKBodyCh, errCh := c.cluster.client.ContainerWait(ctx, c.id, condition)
// WaitUntilNotRunning waits for a running container to exit.
func (c *Container) WaitUntilNotRunning(ctx context.Context) error {
waitOKBodyCh, errCh := c.cluster.client.ContainerWait(ctx, c.id, container.WaitConditionNotRunning)
select {
case err := <-errCh:
return err
Expand Down
49 changes: 35 additions & 14 deletions pkg/acceptance/cluster/dockercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type DockerCluster struct {
stopper *stop.Stopper
monitorCtx context.Context
monitorCtxCancelFunc func()
monitorDone chan struct{}
clusterID string
networkID string
networkName string
Expand Down Expand Up @@ -191,7 +192,7 @@ func CreateDocker(

func (l *DockerCluster) expectEvent(c *Container, msgs ...string) {
for index, ctr := range l.Nodes {
if c.id != ctr.id {
if ctr.Container == nil || c.id != ctr.id {
continue
}
for _, status := range msgs {
Expand Down Expand Up @@ -237,7 +238,7 @@ func (l *DockerCluster) OneShot(
if err := l.oneshot.Start(ctx); err != nil {
return err
}
return l.oneshot.Wait(ctx, container.WaitConditionNotRunning)
return l.oneshot.WaitUntilNotRunning(ctx)
}

// stopOnPanic is invoked as a deferred function in Start in order to attempt
Expand Down Expand Up @@ -374,7 +375,7 @@ func (l *DockerCluster) initCluster(ctx context.Context) {
// and it'll get in the way of future runs.
l.vols = c
maybePanic(c.Start(ctx))
maybePanic(c.Wait(ctx, container.WaitConditionNotRunning))
maybePanic(c.WaitUntilNotRunning(ctx))
}

// cockroachEntryPoint returns the value to be used as
Expand Down Expand Up @@ -544,6 +545,7 @@ func (l *DockerCluster) processEvent(ctx context.Context, event events.Message)
// If there's currently a oneshot container, ignore any die messages from
// it because those are expected.
if l.oneshot != nil && event.ID == l.oneshot.id && event.Status == eventDie {
log.Infof(ctx, "Docker event was: the oneshot container terminated")
return true
}

Expand Down Expand Up @@ -585,7 +587,9 @@ func (l *DockerCluster) processEvent(ctx context.Context, event events.Message)
return false
}

func (l *DockerCluster) monitor(ctx context.Context) {
func (l *DockerCluster) monitor(ctx context.Context, monitorDone chan struct{}) {
defer close(monitorDone)

if log.V(1) {
log.Infof(ctx, "events monitor starts")
defer log.Infof(ctx, "events monitor exits")
Expand All @@ -603,6 +607,9 @@ func (l *DockerCluster) monitor(ctx context.Context) {
})
for {
select {
case <-l.monitorCtx.Done():
log.Infof(ctx, "monitor shutting down")
return false
case err := <-errq:
log.Infof(ctx, "event stream done, resetting...: %s", err)
// Sometimes we get a random string-wrapped EOF error back.
Expand Down Expand Up @@ -640,7 +647,8 @@ func (l *DockerCluster) Start(ctx context.Context) {

log.Infof(ctx, "starting %d nodes", len(l.Nodes))
l.monitorCtx, l.monitorCtxCancelFunc = context.WithCancel(context.Background())
go l.monitor(ctx)
l.monitorDone = make(chan struct{})
go l.monitor(ctx, l.monitorDone)
var wg sync.WaitGroup
wg.Add(len(l.Nodes))
for _, node := range l.Nodes {
Expand All @@ -661,7 +669,6 @@ func (l *DockerCluster) Start(ctx context.Context) {
// the cluster (restart, kill, ...). In the event of a mismatch, the passed
// Tester receives a fatal error.
func (l *DockerCluster) Assert(ctx context.Context, t testing.TB) {
const almostZero = 50 * time.Millisecond
filter := func(ch chan Event, wait time.Duration) *Event {
select {
case act := <-ch:
Expand All @@ -673,17 +680,28 @@ func (l *DockerCluster) Assert(ctx context.Context, t testing.TB) {

var events []Event
for {
// The expected event channel is buffered and should contain
// all expected events already.
const almostZero = 15 * time.Millisecond
exp := filter(l.expectedEvents, almostZero)
if exp == nil {
break
}
act := filter(l.events, 15*time.Second)
t.Logf("expecting event: %v", exp)
// l.events is connected to the docker controller and may
// receive events more slowly.
const waitForDockerEvent = 15 * time.Second
act := filter(l.events, waitForDockerEvent)
t.Logf("got event: %v", act)
if act == nil || *exp != *act {
t.Fatalf("expected event %v, got %v (after %v)", exp, act, events)
}
events = append(events, *exp)
}
if cur := filter(l.events, almostZero); cur != nil {
// At the end, we leave docker a bit more time to report a final event,
// if any.
const waitForLastDockerEvent = 1 * time.Second
if cur := filter(l.events, waitForLastDockerEvent); cur != nil {
t.Fatalf("unexpected extra event %v (after %v)", cur, events)
}
if log.V(2) {
Expand Down Expand Up @@ -713,13 +731,9 @@ func (l *DockerCluster) stop(ctx context.Context) {
if l.monitorCtxCancelFunc != nil {
l.monitorCtxCancelFunc()
l.monitorCtxCancelFunc = nil
<-l.monitorDone
}

if l.vols != nil {
maybePanic(l.vols.Kill(ctx))
maybePanic(l.vols.Remove(ctx))
l.vols = nil
}
for i, n := range l.Nodes {
if n.Container == nil {
continue
Expand All @@ -742,8 +756,14 @@ func (l *DockerCluster) stop(ctx context.Context) {
log.Infof(ctx, "~~~ node %d CRASHED ~~~~", i)
}
maybePanic(n.Remove(ctx))
n.Container = nil
}

if l.vols != nil {
maybePanic(l.vols.Kill(ctx))
maybePanic(l.vols.Remove(ctx))
l.vols = nil
}
l.Nodes = nil

if l.networkID != "" {
maybePanic(
Expand Down Expand Up @@ -878,6 +898,7 @@ func (l *DockerCluster) Cleanup(ctx context.Context, preserveLogs bool) {
}
for _, v := range volumes {
if preserveLogs && v.Name() == "logs" {
log.Infof(ctx, "preserving log directory: %s", l.volumesDir)
continue
}
if err := os.RemoveAll(filepath.Join(l.volumesDir, v.Name())); err != nil {
Expand Down
52 changes: 37 additions & 15 deletions pkg/acceptance/util_docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build/bazel"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/containerd/containerd/platforms"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -69,19 +70,6 @@ func testDocker(
) error {
var err error
RunDocker(t, func(t *testing.T) {
cfg := cluster.TestConfig{
Name: name,
Duration: *flagDuration,
}
for i := 0; i < num; i++ {
cfg.Nodes = append(cfg.Nodes, cluster.NodeConfig{Stores: []cluster.StoreConfig{{}}})
}
l := StartCluster(ctx, t, cfg).(*cluster.DockerCluster)
defer l.AssertAndStop(ctx, t)

if len(l.Nodes) > 0 {
containerConfig.Env = append(containerConfig.Env, "PGHOST="+l.Hostname(0))
}
var pwd string
pwd, err = os.Getwd()
if err != nil {
Expand Down Expand Up @@ -121,12 +109,46 @@ func testDocker(
}()
hostConfig.Binds = append(hostConfig.Binds, interactivetestsDir+":/mnt/interactive_tests")
}

// Prepare the docker cluster.
// We need to do this "under" the directory preparation above so as
// to prevent the test from crashing because the directory gets
// deleted before the container shutdown assertions get a chance to run.
cfg := cluster.TestConfig{
Name: name,
Duration: *flagDuration,
}
for i := 0; i < num; i++ {
cfg.Nodes = append(cfg.Nodes, cluster.NodeConfig{Stores: []cluster.StoreConfig{{}}})
}
l := StartCluster(ctx, t, cfg).(*cluster.DockerCluster)

var preserveLogs bool
defer func() {
// Check the final health of the cluster nodes and
// stop the cluster after that.
l.AssertAndStop(ctx, t)

// Note: we must be careful to clean up the volumes *after*
// the cluster has been shut down (in the `AssertAndStop` call).
// Otherwise, the directory removal will cause the cluster nodes
// to crash and report abnormal termination, even when the test
// succeeds otherwise.
log.Infof(ctx, "cleaning up docker volume")
l.Cleanup(ctx, preserveLogs)
}()

if len(l.Nodes) > 0 {
containerConfig.Env = append(containerConfig.Env, "PGHOST="+l.Hostname(0))
}

log.Infof(ctx, "starting one-shot container")
err = l.OneShot(
ctx, acceptanceImage, types.ImagePullOptions{}, containerConfig, hostConfig,
platforms.DefaultSpec(), "docker-"+name,
)
preserveLogs := err != nil
l.Cleanup(ctx, preserveLogs)
log.Infof(ctx, "one-shot container terminated: %v", err)
preserveLogs = err != nil
})
return err
}
Expand Down