Skip to content

Commit

Permalink
Add destination integration test (failing)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Feb 1, 2024
1 parent 5a0a8d8 commit 776b034
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 3 deletions.
7 changes: 7 additions & 0 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}

return nil
}

Expand All @@ -42,11 +43,13 @@ func (d *Destination) Open(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
sdk.Logger(ctx).Debug().Msg("connected to RabbitMQ")

d.ch, err = d.conn.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: %w", err)
}
sdk.Logger(ctx).Debug().Msgf("opened channel")

return nil
}
Expand All @@ -62,6 +65,10 @@ func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, err
if err != nil {
return 0, fmt.Errorf("failed to publish: %w", err)
}

sdk.Logger(ctx).Debug().Msgf(
"published message %s on %s",
string(record.Position), d.config.QueueName)
}

return len(records), nil
Expand Down
83 changes: 82 additions & 1 deletion destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,95 @@ package rabbitmq

import (
"context"
"fmt"
"testing"

"github.com/alarbada/conduit-connector-rabbitmq/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/matryer/is"
"github.com/rabbitmq/amqp091-go"
)

func TestTeardown_NoOpen(t *testing.T) {
func TestTeardownDestination_NoOpen(t *testing.T) {
is := is.New(t)
con := NewDestination()
err := con.Teardown(context.Background())
is.NoErr(err)
}

func newDestinationCfg(queueName string) map[string]string {
return map[string]string{
"url": test.URL,
"queueName": queueName,
}
}

func TestDestination_Integration(t *testing.T) {
// ctx := test.CtxWithLogger()
ctx := context.Background()
is := is.New(t)

queueName := test.SetupQueueName(t, is)

{
destination := NewDestination()
cfg := newDestinationCfg(queueName)

err := destination.Configure(ctx, cfg)
is.NoErr(err)

err = destination.Open(ctx)
is.NoErr(err)

defer test.TeardownResource(is, ctx, destination)

recsToWrite := generate3Records(queueName)
writtenTotal, err := destination.Write(ctx, recsToWrite)
is.Equal(writtenTotal, len(recsToWrite))
is.NoErr(err)
}

{
conn, err := amqp091.Dial(test.URL)
is.NoErr(err)

defer test.CloseResource(is, conn)

ch, err := conn.Channel()
is.NoErr(err)

defer test.CloseResource(is, ch)

recs, err := ch.Consume(queueName, "", true, false, false, false, nil)
is.NoErr(err)

rec1 := <-recs
is.Equal(string(rec1.Body), "example message 0")

rec2 := <-recs
is.Equal(string(rec2.Body), "example message 1")

rec3 := <-recs
is.Equal(string(rec3.Body), "example message 2")
}
}

func generate3Records(queueName string) []sdk.Record {
recs := []sdk.Record{}

for i := 0; i < 3; i++ {
exampleMessage := fmt.Sprintf("example message %d", i)

rec := sdk.Util.Source.NewRecordCreate(
[]byte(uuid.NewString()),
sdk.Metadata{"rabbitmq.queue": queueName},
sdk.RawData("test-key"),
sdk.RawData(exampleMessage),
)

recs = append(recs, rec)
}

return recs
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ toolchain go1.21.1
require (
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/golangci/golangci-lint v1.55.2
github.com/google/uuid v1.4.0
github.com/matryer/is v1.4.1
github.com/rabbitmq/amqp091-go v1.9.0
github.com/rs/zerolog v1.31.0
Expand Down Expand Up @@ -83,7 +84,6 @@ require (
github.com/golangci/revgrep v0.5.2 // indirect
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20230610083614-0e73809eb601 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion source.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}

return nil
}

func (s *Source) Open(ctx context.Context, pos sdk.Position) (err error) {

s.conn, err = amqp091.Dial(s.config.URL)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
Expand Down
18 changes: 18 additions & 0 deletions test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,21 @@ func DeleteQueue(is *is.I, queueName string) {
_, err = ch.QueueDelete(queueName, false, false, false)
is.NoErr(err)
}

type closable interface {
Close() error
}

func CloseResource(is *is.I, c closable) {
err := c.Close()
is.NoErr(err)
}

type teardownable interface {
Teardown(context.Context) error
}

func TeardownResource(is *is.I, ctx context.Context, t teardownable) {
err := t.Teardown(ctx)
is.NoErr(err)
}

0 comments on commit 776b034

Please sign in to comment.