Skip to content

Commit

Permalink
chore: rsources flaky test - start services before all
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 7, 2022
1 parent 8ac05c2 commit c55edc2
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 37 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ test: install-tools test-run test-teardown
test-run: ## Run all unit tests
ifeq ($(filter 1,$(debug) $(RUNNER_DEBUG)),)
$(eval TEST_CMD = SLOW=0 gotestsum --format pkgname-and-test-fails --)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -vet=all --timeout=15m)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=count -coverpkg=./... -vet=all --timeout=15m)
else
$(eval TEST_CMD = SLOW=0 go test)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -vet=all --timeout=15m)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=count -coverpkg=./... -vet=all --timeout=15m)
endif
ifdef package
$(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true
Expand Down
4 changes: 4 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ coverage:
patch:
default:
informational: true
ignore:
- "mocks"
- "proto"
- "cmd/devtool"
75 changes: 40 additions & 35 deletions services/rsources/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ var _ = Describe("Using sources handler", Ordered, func() {
})

AfterAll(func() {
if resource.resource != nil {
purgeResource(pool, resource.resource)
}
purgeResources(pool, resource.resource)
})

It("should be able to add and get failed records", func() {
Expand Down Expand Up @@ -503,7 +501,7 @@ var _ = Describe("Using sources handler", Ordered, func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
const networkId = "TestMultitenantSourcesHandler"
networkId := randomString()
network, _ = pool.Client.NetworkInfo(networkId)
if network == nil {
network, err = pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: networkId})
Expand Down Expand Up @@ -532,40 +530,44 @@ var _ = Describe("Using sources handler", Ordered, func() {
SharedConn: pgC.externalDSN,
SubscriptionTargetConn: pgB.internalDSN,
}
})

AfterAll(func() {
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
}
if pgA.resource != nil {
purgeResource(pool, pgA.resource, pgB.resource, pgC.resource)
}
})
It("should be able to create two services", func() {
// Start 2 JobServices
// 1. js1 with local=pgA, remote=pgC
// 2. js2 with local=pgB, remote=pgC
// Increment the same jobRunId from both services
serviceA = createService(configA)
serviceB = createService(configB)
})

AfterAll(func() {
purgeResources(pool, pgA.resource, pgB.resource, pgC.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
}
})

It("should be able to query both services for the same jobRunId and receive same stats", func() {
GinkgoT().Logf("configA: %+v", configA)
GinkgoT().Logf("serviceA: %+v", serviceA)

GinkgoT().Logf("configB: %+v", configB)
GinkgoT().Logf("serviceB: %+v", serviceB)

// Status from both services should be same
jobRunId := newJobRunId()
statsA := Stats{
In: 5,
Out: 4,
Failed: 0,
}
GinkgoT().Logf("incrementing serviceA")
increment(pgA.db, jobRunId, defaultJobTargetKey, statsA, serviceA, nil)

statsB := Stats{
In: 3,
Out: 2,
Failed: 1,
}
GinkgoT().Logf("incrementing serviceB")
increment(pgB.db, jobRunId, defaultJobTargetKey, statsB, serviceB, nil)
Eventually(func() bool {
totalStatsA, err := serviceA.GetStatus(context.Background(), jobRunId, JobFilter{
Expand Down Expand Up @@ -672,7 +674,7 @@ var _ = Describe("Using sources handler", Ordered, func() {

It("shouldn't be able to create a service when wal_level=logical is not set on the local db", func() {
pgD := newDBResource(pool, network.ID, "postgres-4")
defer purgeResource(pool, pgD.resource)
defer purgeResources(pool, pgD.resource)
badConfig := JobServiceConfig{
LocalHostname: "postgres-4",
MaxPoolSize: 1,
Expand All @@ -686,7 +688,7 @@ var _ = Describe("Using sources handler", Ordered, func() {

It("shouldn't be able to create a service when an invalid SubscriptionTargetConn is provided", func() {
pgD := newDBResource(pool, network.ID, "postgres-4")
defer purgeResource(pool, pgD.resource)
defer purgeResources(pool, pgD.resource)
badConfig := JobServiceConfig{
LocalHostname: "postgres-4",
MaxPoolSize: 1,
Expand All @@ -699,7 +701,7 @@ var _ = Describe("Using sources handler", Ordered, func() {
})
})

Context("monitoring lag when shared db is configured", func() {
Context("monitoring lag when shared db is configured", Ordered, func() {
var (
pool *dockertest.Pool
network *docker.Network
Expand All @@ -712,7 +714,7 @@ var _ = Describe("Using sources handler", Ordered, func() {
var err error
pool, err = dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
const networkId = "TestMultitenantSourcesHandler"
networkId := randomString()
network, _ = pool.Client.NetworkInfo(networkId)
if network == nil {
network, err = pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: networkId})
Expand All @@ -736,12 +738,10 @@ var _ = Describe("Using sources handler", Ordered, func() {
})

AfterAll(func() {
purgeResources(pool, pgA.resource, pgB.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
}
if pgA.resource != nil {
purgeResource(pool, pgA.resource, pgB.resource)
}
})

It("should be able to monitor lag when shared db is configured", func() {
Expand Down Expand Up @@ -779,7 +779,7 @@ var _ = Describe("Using sources handler", Ordered, func() {
It("should be able to add rsources_failed_keys table to the publication and subscription seamlessly", func() {
pool, err := dockertest.NewPool("")
Expect(err).NotTo(HaveOccurred())
const networkId = "TestMultitenantSourcesHandlerMigration"
networkId := randomString()
network, _ := pool.Client.NetworkInfo(networkId)
if network == nil {
network, err = pool.Client.CreateNetwork(docker.CreateNetworkOptions{Name: networkId})
Expand All @@ -794,12 +794,10 @@ var _ = Describe("Using sources handler", Ordered, func() {
pgC := newDBResource(pool, network.ID, "postgres-mig-3")

defer func() {
purgeResources(pool, pgA.resource, pgB.resource, pgC.resource)
if network != nil {
_ = pool.Client.RemoveNetwork(network.ID)
}
if pgA.resource != nil {
purgeResource(pool, pgA.resource, pgB.resource, pgC.resource)
}
}()

configA := JobServiceConfig{
Expand Down Expand Up @@ -929,26 +927,27 @@ func (g *mockGauge) wasGauged() bool {

func createService(config JobServiceConfig) JobService {
service, err := NewJobService(config)
Expect(err).NotTo(HaveOccurred(), "it should be able to create the service")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to create the service")
return service
}

func addFailedRecords(db *sql.DB, jobRunId string, jobTargetKey JobTargetKey, sh JobService, records []json.RawMessage) {
tx, err := db.Begin()
Expect(err).NotTo(HaveOccurred(), "it should be able to begin the transaction")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to begin the transaction")
err = sh.AddFailedRecords(context.Background(), tx, jobRunId, jobTargetKey, records)
Expect(err).NotTo(HaveOccurred(), "it should be able to add failed records")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to add failed records")
err = tx.Commit()
Expect(err).NotTo(HaveOccurred(), "it should be able to commit the transaction")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to commit the transaction")
}

func increment(db *sql.DB, jobRunId string, key JobTargetKey, stat Stats, sh JobService, wg *sync.WaitGroup) {
GinkgoT().Logf("Incrementing using service %+v, db: %+v", sh, db)
tx, err := db.Begin()
Expect(err).NotTo(HaveOccurred(), "it should be able to begin the transaction")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to begin the transaction")
err = sh.IncrementStats(context.Background(), tx, jobRunId, key, stat)
Expect(err).NotTo(HaveOccurred(), "it should be able to increment stats")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to increment stats")
err = tx.Commit()
Expect(err).NotTo(HaveOccurred(), "it should be able to commit the transaction")
Expect(err).ShouldNot(HaveOccurred(), "it should be able to commit the transaction")
if wg != nil {
wg.Done()
}
Expand Down Expand Up @@ -1007,9 +1006,11 @@ func newDBResource(pool *dockertest.Pool, networkId, hostname string, params ...
}
}

func purgeResource(pool *dockertest.Pool, resources ...*dockertest.Resource) {
func purgeResources(pool *dockertest.Pool, resources ...*dockertest.Resource) {
for _, resource := range resources {
_ = pool.Purge(resource)
if resource != nil {
_ = pool.Purge(resource)
}
}
}

Expand All @@ -1019,3 +1020,7 @@ func getDB(conn string, maxOpenConns int) *sql.DB {
Expect(err).NotTo(HaveOccurred())
return db
}

func randomString() string {
return strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
}

0 comments on commit c55edc2

Please sign in to comment.