Skip to content

Commit

Permalink
Move test/utils.go to utils_test.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Feb 13, 2024
1 parent f191b29 commit 7e33223
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 153 deletions.
80 changes: 0 additions & 80 deletions destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,92 +15,12 @@
package rabbitmq

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"testing"

"github.com/alarbada/conduit-connector-rabbitmq/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/matryer/is"
"github.com/rabbitmq/amqp091-go"
)

func TestTeardownDestination_NoOpen(t *testing.T) {
is := is.New(t)
con := NewDestination()
err := con.Teardown(context.Background())
is.NoErr(err)
}

func TestDestination_Integration(t *testing.T) {
ctx := context.Background()
is := is.New(t)

queueName := test.SetupQueueName(t, is)

{
destination := NewDestination()
cfg := cfgToMap(DestinationConfig{
Config: Config{
URL: test.URL,
QueueName: queueName,
},
})

err := destination.Configure(ctx, cfg)
is.NoErr(err)

err = destination.Open(ctx)
is.NoErr(err)

defer test.TeardownResource(ctx, is, destination)

recsToWrite := generate3Records(queueName)
writtenTotal, err := destination.Write(ctx, recsToWrite)
is.Equal(writtenTotal, len(recsToWrite))
is.NoErr(err)
}

{
conn, err := amqp091.Dial(test.URL)
is.NoErr(err)

defer test.CloseResource(is, conn)

ch, err := conn.Channel()
is.NoErr(err)

defer test.CloseResource(is, ch)

recs, err := ch.Consume(queueName, "", true, false, false, false, nil)
is.NoErr(err)

assertNextPayloadIs := func(expectedPayload string) {
delivery := <-recs

var rec struct {
Payload struct {
After string `json:"after"`
} `json:"payload"`
}
err = json.Unmarshal(delivery.Body, &rec)
is.NoErr(err)

body, err := base64.StdEncoding.DecodeString(rec.Payload.After)
is.NoErr(err)

is.Equal(string(body), expectedPayload)
}

assertNextPayloadIs("example message 0")
assertNextPayloadIs("example message 1")
assertNextPayloadIs("example message 2")
}
}

func generate3Records(queueName string) []sdk.Record {
recs := []sdk.Record{}

Expand Down
11 changes: 5 additions & 6 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/alarbada/conduit-connector-rabbitmq/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
"github.com/rabbitmq/amqp091-go"
Expand All @@ -39,9 +38,9 @@ func TestSource_Integration_RestartFull(t *testing.T) {
ctx := context.Background()
is := is.New(t)

queueName := test.SetupQueueName(t, is)
queueName := setupQueueName(t, is)
cfgMap := cfgToMap(SourceConfig{
Config: Config{URL: test.URL, QueueName: queueName},
Config: Config{URL: testURL, QueueName: queueName},
})

recs1 := generateRabbitmqMsgs(1, 3)
Expand All @@ -59,10 +58,10 @@ func TestSource_Integration_RestartPartial(t *testing.T) {

is := is.New(t)
ctx := context.Background()
queueName := test.SetupQueueName(t, is)
queueName := setupQueueName(t, is)

cfgMap := cfgToMap(SourceConfig{
Config: Config{URL: test.URL, QueueName: queueName},
Config: Config{URL: testURL, QueueName: queueName},
})

recs1 := generateRabbitmqMsgs(1, 3)
Expand Down Expand Up @@ -104,7 +103,7 @@ func generateRabbitmqMsgs(from, to int) []amqp091.Publishing {
}

func produceRabbitmqMsgs(ctx context.Context, is *is.I, queueName string, msgs []amqp091.Publishing) {
conn, err := amqp091.Dial(test.URL)
conn, err := amqp091.Dial(testURL)
is.NoErr(err)

defer conn.Close()
Expand Down
66 changes: 0 additions & 66 deletions test/utils.go

This file was deleted.

66 changes: 65 additions & 1 deletion utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rabbitmq

import "encoding/json"
import (
"context"
"encoding/json"
"testing"

"github.com/matryer/is"
"github.com/rabbitmq/amqp091-go"
)

// cfgToMap converts a config struct to a map. This is useful for more type
// safety on tests.
Expand All @@ -18,3 +39,46 @@ func cfgToMap(cfg any) map[string]string {

return m
}

const testURL = "amqp://guest:guest@localhost:5672"

// setupQueueName creates a new topic name for the test and deletes it if it
// exists, so that the test can start from a clean slate.
func setupQueueName(t *testing.T, is *is.I) string {
queueName := "rabbitmq.queue." + t.Name()
deleteQueue(is, queueName)

return queueName
}

func deleteQueue(is *is.I, queueName string) {
conn, err := amqp091.Dial(testURL)
is.NoErr(err)
defer closeResource(is, conn)

ch, err := conn.Channel()
is.NoErr(err)
defer closeResource(is, ch)

// force queue delete
_, err = ch.QueueDelete(queueName, false, false, false)
is.NoErr(err)
}

type closable interface {
Close() error
}

func closeResource(is *is.I, c closable) {
err := c.Close()
is.NoErr(err)
}

type teardownable interface {
Teardown(context.Context) error
}

func teardownResource(ctx context.Context, is *is.I, t teardownable) {
err := t.Teardown(ctx)
is.NoErr(err)
}

0 comments on commit 7e33223

Please sign in to comment.