Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: disable pubsub tests #3698

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 3 additions & 128 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ package pubsub

import (
"fmt"
"path/filepath"
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/TBD54566975/ftl/backend/controller/async"
in "github.com/TBD54566975/ftl/internal/integration"
"github.com/TBD54566975/ftl/internal/schema"
)

func TestPubSub(t *testing.T) {
t.Skip("About to move away from legacy pubsub")
calls := 20
events := calls * 10
in.Run(t,
Expand Down Expand Up @@ -43,50 +41,8 @@ func TestPubSub(t *testing.T) {
)
}

func TestConsumptionDelay(t *testing.T) {
in.Run(t,
in.WithLanguages("go", "java"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish events with a small delay between each
// pubsub should trigger its poll a few times during this period
// each time it should continue processing each event until it reaches one that is too new to process
func(t testing.TB, ic in.TestContext) {
for i := 0; i < 120; i++ {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
time.Sleep(time.Millisecond * 25)
}
},

in.Sleep(time.Second*2),

// Get all event created ats, and all async call created ats
// Compare each, make sure none are less than 0.1s of each other
in.QueryRow("ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
) AS sub_event_times
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
) AS sub_async_calls
)
SELECT COUNT(*)
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.1;
`, 0),
)
}

func TestRetry(t *testing.T) {
t.Skip("About to move away from legacy pubsub")
retriesPerCall := 2
in.Run(t,
in.WithLanguages("java", "go"),
Expand Down Expand Up @@ -159,6 +115,7 @@ func TestRetry(t *testing.T) {
}

func TestExternalPublishRuntimeCheck(t *testing.T) {
t.Skip("About to move away from legacy pubsub")
// No java as there is no API for this
in.Run(t,
in.CopyModule("publisher"),
Expand All @@ -172,85 +129,3 @@ func TestExternalPublishRuntimeCheck(t *testing.T) {
),
)
}

func TestLeaseFailure(t *testing.T) {
t.Skip()
logFilePath := filepath.Join(t.TempDir(), "pubsub.log")
t.Setenv("TEST_LOG_FILE", logFilePath)

in.Run(t,
in.CopyModule("slow"),
in.Deploy("slow"),

// publish 2 events, with the first taking a long time to consume
in.Call("slow", "publish", in.Obj{
"durations": []int{20, 1},
}, func(t testing.TB, resp in.Obj) {}),

// while it is consuming the first event, force delete the lease in the db
in.QueryRow("ftl", `
WITH deleted_rows AS (
DELETE FROM leases WHERE id = (
SELECT lease_id FROM async_calls WHERE verb = 'slow.consume'
)
RETURNING *
)
SELECT COUNT(*) FROM deleted_rows;
`, 1),

in.Sleep(time.Second*7),

// confirm that the first event failed and the second event succeeded,
in.QueryRow("ftl", `SELECT state, error FROM async_calls WHERE verb = 'slow.consume' ORDER BY created_at`, "error", "async call lease expired"),
in.QueryRow("ftl", `SELECT state, error FROM async_calls WHERE verb = 'slow.consume' ORDER BY created_at OFFSET 1`, "success", nil),

// confirm that the first call did not keep executing for too long after the lease was expired
in.IfLanguage("go",
in.ExpectError(
in.FileContains(logFilePath, "slept for 5s"),
"Haystack does not contain needle",
),
),
)
}

// TestIdlePerformance tests that async calls are created quickly after an event is published
func TestIdlePerformance(t *testing.T) {
in.Run(t,
in.WithLanguages("go"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish a number of events with a delay between each
in.Repeat(5, func(t testing.TB, ic in.TestContext) {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
in.Sleep(time.Millisecond*1200)(t, ic)
}),

// compare publication date and consumption date of each event
in.ExpectError(func(t testing.TB, ic in.TestContext) {
badResult := in.GetRow(t, ic, "ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
) AS sub_event_times
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
) AS sub_async_calls
)
SELECT ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at)))
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) > 0.2
LIMIT 1;
`, 1)
assert.True(t, false, "async calls should be created quickly after an event is published, but it took %vs", badResult[0])
}, "sql: no rows in result set"), // no rows found means that all events were consumed quickly
)
}
Loading