From c399e3ed77d96004a4131c4d6d1afae9400a18b8 Mon Sep 17 00:00:00 2001 From: Guillem Date: Wed, 31 Jan 2024 12:26:51 -0300 Subject: [PATCH] Fill destination.go --- destination.go | 70 ++++++++++++++++++++++++++------------------------ source.go | 17 +++++++----- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/destination.go b/destination.go index 431085c..6a76bc4 100644 --- a/destination.go +++ b/destination.go @@ -4,46 +4,31 @@ package rabbitmq import ( "context" + "errors" "fmt" sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/rabbitmq/amqp091-go" ) type Destination struct { sdk.UnimplementedDestination - config DestinationConfig -} + conn *amqp091.Connection + ch *amqp091.Channel -type DestinationConfig struct { - // Config includes parameters that are the same in the source and destination. - Config - // DestinationConfigParam must be either yes or no (defaults to yes). - DestinationConfigParam string `validate:"inclusion=yes|no" default:"yes"` + config DestinationConfig } func NewDestination() sdk.Destination { - // Create Destination and wrap it in the default middleware. return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) } func (d *Destination) Parameters() map[string]sdk.Parameter { - // Parameters is a map of named Parameters that describe how to configure - // the Destination. Parameters can be generated from DestinationConfig with - // paramgen. return d.config.Parameters() } func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { - // Configure is the first function to be called in a connector. It provides - // the connector with the configuration that can be validated and stored. - // In case the configuration is not valid it should return an error. - // Testing if your connector can reach the configured data source should be - // done in Open, not in Configure. - // The SDK will validate the configuration and populate default values - // before calling Configure. If you need to do more complex validations you - // can do them manually here. - sdk.Logger(ctx).Info().Msg("Configuring Destination...") err := sdk.Util.ParseConfig(cfg, &d.config) if err != nil { @@ -52,24 +37,43 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro return nil } -func (d *Destination) Open(ctx context.Context) error { - // Open is called after Configure to signal the plugin it can prepare to - // start writing records. If needed, the plugin should open connections in - // this function. +func (d *Destination) Open(ctx context.Context) (err error) { + d.conn, err = amqp091.Dial(d.config.URL) + if err != nil { + return fmt.Errorf("failed to dial: %w", err) + } + + d.ch, err = d.conn.Channel() + if err != nil { + return fmt.Errorf("failed to open channel: %w", err) + } + return nil } func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) { - // Write writes len(r) records from r to the destination right away without - // caching. It should return the number of records written from r - // (0 <= n <= len(r)) and any error encountered that caused the write to - // stop early. Write must return a non-nil error if it returns n < len(r). - return 0, nil + for _, record := range records { + pos, err := parseSdkPosition(record.Position) + if err != nil { + return 0, fmt.Errorf("failed to parse position: %w", err) + } + + msg := amqp091.Publishing{ + ContentType: "text/plain", + Body: record.Payload.After.Bytes(), + } + + err = d.ch.PublishWithContext(ctx, "", pos.QueueName, false, false, msg) + if err != nil { + return 0, fmt.Errorf("failed to publish: %w", err) + } + } + + return len(records), nil } func (d *Destination) Teardown(ctx context.Context) error { - // Teardown signals to the plugin that all records were written and there - // will be no more calls to any other function. After Teardown returns, the - // plugin should be ready for a graceful shutdown. - return nil + errch := d.ch.Close() + errconn := d.conn.Close() + return errors.Join(errch, errconn) } diff --git a/source.go b/source.go index 4b884cf..673deb4 100644 --- a/source.go +++ b/source.go @@ -72,7 +72,10 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) { return rec, errors.New("source message channel closed") } - pos := Position{DeliveryTag: msg.DeliveryTag}.ToSdkPosition() + pos := Position{ + DeliveryTag: msg.DeliveryTag, + QueueName: s.queue.Name, + }.ToSdkPosition() var metadata sdk.Metadata var key sdk.Data @@ -97,13 +100,15 @@ func (s *Source) Ack(ctx context.Context, position sdk.Position) error { } func (s *Source) Teardown(ctx context.Context) error { - chErr := s.ch.Close() - connErr := s.conn.Close() - - return errors.Join(chErr, connErr) + errCh := s.ch.Close() + errConn := s.conn.Close() + return errors.Join(errCh, errConn) } -type Position struct{ DeliveryTag uint64 } +type Position struct { + DeliveryTag uint64 + QueueName string +} func (p Position) ToSdkPosition() sdk.Position { bs, err := json.Marshal(p)