diff --git a/destination.go b/destination.go index 97e45c0..73a08fd 100644 --- a/destination.go +++ b/destination.go @@ -34,6 +34,7 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro if err != nil { return fmt.Errorf("invalid config: %w", err) } + return nil } @@ -42,11 +43,13 @@ func (d *Destination) Open(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("failed to dial: %w", err) } + sdk.Logger(ctx).Debug().Msg("connected to RabbitMQ") d.ch, err = d.conn.Channel() if err != nil { return fmt.Errorf("failed to open channel: %w", err) } + sdk.Logger(ctx).Debug().Msgf("opened channel") return nil } @@ -62,6 +65,10 @@ func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, err if err != nil { return 0, fmt.Errorf("failed to publish: %w", err) } + + sdk.Logger(ctx).Debug().Msgf( + "published message %s on %s", + string(record.Position), d.config.QueueName) } return len(records), nil diff --git a/destination_test.go b/destination_test.go index 6a7489d..e961810 100644 --- a/destination_test.go +++ b/destination_test.go @@ -2,14 +2,95 @@ package rabbitmq import ( "context" + "fmt" "testing" + "github.com/alarbada/conduit-connector-rabbitmq/test" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/google/uuid" "github.com/matryer/is" + "github.com/rabbitmq/amqp091-go" ) -func TestTeardown_NoOpen(t *testing.T) { +func TestTeardownDestination_NoOpen(t *testing.T) { is := is.New(t) con := NewDestination() err := con.Teardown(context.Background()) is.NoErr(err) } + +func newDestinationCfg(queueName string) map[string]string { + return map[string]string{ + "url": test.URL, + "queueName": queueName, + } +} + +func TestDestination_Integration(t *testing.T) { + // ctx := test.CtxWithLogger() + ctx := context.Background() + is := is.New(t) + + queueName := test.SetupQueueName(t, is) + + { + destination := NewDestination() + cfg := newDestinationCfg(queueName) + + err := destination.Configure(ctx, cfg) + is.NoErr(err) + + err = destination.Open(ctx) + is.NoErr(err) + + defer test.TeardownResource(is, ctx, destination) + + recsToWrite := generate3Records(queueName) + writtenTotal, err := destination.Write(ctx, recsToWrite) + is.Equal(writtenTotal, len(recsToWrite)) + is.NoErr(err) + } + + { + conn, err := amqp091.Dial(test.URL) + is.NoErr(err) + + defer test.CloseResource(is, conn) + + ch, err := conn.Channel() + is.NoErr(err) + + defer test.CloseResource(is, ch) + + recs, err := ch.Consume(queueName, "", true, false, false, false, nil) + is.NoErr(err) + + rec1 := <-recs + is.Equal(string(rec1.Body), "example message 0") + + rec2 := <-recs + is.Equal(string(rec2.Body), "example message 1") + + rec3 := <-recs + is.Equal(string(rec3.Body), "example message 2") + } +} + +func generate3Records(queueName string) []sdk.Record { + recs := []sdk.Record{} + + for i := 0; i < 3; i++ { + exampleMessage := fmt.Sprintf("example message %d", i) + + rec := sdk.Util.Source.NewRecordCreate( + []byte(uuid.NewString()), + sdk.Metadata{"rabbitmq.queue": queueName}, + sdk.RawData("test-key"), + sdk.RawData(exampleMessage), + ) + + recs = append(recs, rec) + } + + return recs +} diff --git a/go.mod b/go.mod index 11ec702..8370c31 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.21.1 require ( github.com/conduitio/conduit-connector-sdk v0.8.0 github.com/golangci/golangci-lint v1.55.2 + github.com/google/uuid v1.4.0 github.com/matryer/is v1.4.1 github.com/rabbitmq/amqp091-go v1.9.0 github.com/rs/zerolog v1.31.0 @@ -83,7 +84,6 @@ require ( github.com/golangci/revgrep v0.5.2 // indirect github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/uuid v1.4.0 // indirect github.com/gordonklaus/ineffassign v0.0.0-20230610083614-0e73809eb601 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect diff --git a/source.go b/source.go index 64aa0c2..f521a2e 100644 --- a/source.go +++ b/source.go @@ -37,11 +37,11 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { if err != nil { return fmt.Errorf("invalid config: %w", err) } + return nil } func (s *Source) Open(ctx context.Context, pos sdk.Position) (err error) { - s.conn, err = amqp091.Dial(s.config.URL) if err != nil { return fmt.Errorf("failed to dial: %w", err) diff --git a/test/utils.go b/test/utils.go index 0192990..eab2149 100644 --- a/test/utils.go +++ b/test/utils.go @@ -39,3 +39,21 @@ func DeleteQueue(is *is.I, queueName string) { _, err = ch.QueueDelete(queueName, false, false, false) is.NoErr(err) } + +type closable interface { + Close() error +} + +func CloseResource(is *is.I, c closable) { + err := c.Close() + is.NoErr(err) +} + +type teardownable interface { + Teardown(context.Context) error +} + +func TeardownResource(is *is.I, ctx context.Context, t teardownable) { + err := t.Teardown(ctx) + is.NoErr(err) +}