diff --git a/internal/integration/actions.go b/internal/integration/actions.go index 3d674efb2..13e84e804 100644 --- a/internal/integration/actions.go +++ b/internal/integration/actions.go @@ -236,7 +236,7 @@ func Deploy(module string) Action { Exec("ftl", args...)(t, ic) }, - Wait(module), + WaitWithTimeout(module, time.Minute*2), ) } @@ -575,8 +575,17 @@ func DropDBAction(t testing.TB, dbName string) Action { func DropDB(t testing.TB, dbName string) { Infof("Dropping database %s", dbName) + db, err := sql.Open("pgx", dsn.PostgresDSN("postgres")) - assert.NoError(t, err, "failed to open database connection") + if err != nil { + // We just assume there is no DB running + return + } + _, err = db.Exec("SELECT 1") + if err != nil { + // We just assume there is no DB running + return + } terminateDanglingConnections(t, db, dbName) diff --git a/internal/integration/harness.go b/internal/integration/harness.go index d80fff23f..c123de52c 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -377,26 +377,16 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { } defer dumpKubePods(ctx, ic.kubeClient, ic.kubeNamespace) - if opts.startTimeline && !opts.kube { - ic.Timeline = rpc.Dial(timelinepbconnect.NewTimelineServiceClient, "http://localhost:8894", log.Debug) - - Infof("Waiting for timeline to be ready") - ic.AssertWithRetry(t, func(t testing.TB, ic TestContext) { - _, err := ic.Timeline.Ping(ic, connect.NewRequest(&ftlv1.PingRequest{})) - assert.NoError(t, err) - }) - } - if opts.startController || opts.kube { ic.Controller = controller ic.Schema = schema ic.Console = console Infof("Waiting for controller to be ready") - ic.AssertWithRetry(t, func(t testing.TB, ic TestContext) { + ic.AssertWithSpecificRetry(t, func(t testing.TB, ic TestContext) { _, err := ic.Controller.Status(ic, connect.NewRequest(&ftlv1.StatusRequest{})) assert.NoError(t, err) - }) + }, time.Minute*2) } if opts.startProvisioner { @@ -409,6 +399,16 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { }) } + if opts.startTimeline && !opts.kube { + ic.Timeline = rpc.Dial(timelinepbconnect.NewTimelineServiceClient, "http://localhost:8894", log.Debug) + + Infof("Waiting for timeline to be ready") + ic.AssertWithRetry(t, func(t testing.TB, ic TestContext) { + _, err := ic.Timeline.Ping(ic, connect.NewRequest(&ftlv1.PingRequest{})) + assert.NoError(t, err) + }) + } + if opts.resetPubSub { Infof("Resetting pubsub") envars := []string{"COMPOSE_IGNORE_ORPHANS=True"} @@ -543,7 +543,13 @@ func (i TestContext) WorkingDir() string { return i.workDir } // AssertWithRetry asserts that the given action passes within the timeout. func (i TestContext) AssertWithRetry(t testing.TB, assertion Action) { t.Helper() - waitCtx, done := context.WithTimeout(i, i.integrationTestTimeout()) + i.AssertWithSpecificRetry(t, assertion, i.integrationTestTimeout()) +} + +// AssertWithSpecificRetry asserts that the given action passes within the timeout. +func (i TestContext) AssertWithSpecificRetry(t testing.TB, assertion Action, timeout time.Duration) { + t.Helper() + waitCtx, done := context.WithTimeout(i, timeout) defer done() for { err := i.runAssertionOnce(t, assertion)