Skip to content

Commit

Permalink
Fill destination.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Jan 31, 2024
1 parent 67d0e1c commit c399e3e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 39 deletions.
70 changes: 37 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,31 @@ package rabbitmq

import (
"context"
"errors"
"fmt"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/rabbitmq/amqp091-go"
)

type Destination struct {
sdk.UnimplementedDestination

config DestinationConfig
}
conn *amqp091.Connection
ch *amqp091.Channel

type DestinationConfig struct {
// Config includes parameters that are the same in the source and destination.
Config
// DestinationConfigParam must be either yes or no (defaults to yes).
DestinationConfigParam string `validate:"inclusion=yes|no" default:"yes"`
config DestinationConfig
}

func NewDestination() sdk.Destination {
// Create Destination and wrap it in the default middleware.
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
// Parameters is a map of named Parameters that describe how to configure
// the Destination. Parameters can be generated from DestinationConfig with
// paramgen.
return d.config.Parameters()
}

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
// Configure is the first function to be called in a connector. It provides
// the connector with the configuration that can be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The SDK will validate the configuration and populate default values
// before calling Configure. If you need to do more complex validations you
// can do them manually here.

sdk.Logger(ctx).Info().Msg("Configuring Destination...")
err := sdk.Util.ParseConfig(cfg, &d.config)
if err != nil {
Expand All @@ -52,24 +37,43 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro
return nil
}

func (d *Destination) Open(ctx context.Context) error {
// Open is called after Configure to signal the plugin it can prepare to
// start writing records. If needed, the plugin should open connections in
// this function.
func (d *Destination) Open(ctx context.Context) (err error) {
d.conn, err = amqp091.Dial(d.config.URL)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}

d.ch, err = d.conn.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: %w", err)
}

return nil
}

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
// Write writes len(r) records from r to the destination right away without
// caching. It should return the number of records written from r
// (0 <= n <= len(r)) and any error encountered that caused the write to
// stop early. Write must return a non-nil error if it returns n < len(r).
return 0, nil
for _, record := range records {
pos, err := parseSdkPosition(record.Position)
if err != nil {
return 0, fmt.Errorf("failed to parse position: %w", err)
}

msg := amqp091.Publishing{
ContentType: "text/plain",
Body: record.Payload.After.Bytes(),
}

err = d.ch.PublishWithContext(ctx, "", pos.QueueName, false, false, msg)
if err != nil {
return 0, fmt.Errorf("failed to publish: %w", err)
}
}

return len(records), nil
}

func (d *Destination) Teardown(ctx context.Context) error {
// Teardown signals to the plugin that all records were written and there
// will be no more calls to any other function. After Teardown returns, the
// plugin should be ready for a graceful shutdown.
return nil
errch := d.ch.Close()
errconn := d.conn.Close()
return errors.Join(errch, errconn)
}
17 changes: 11 additions & 6 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
return rec, errors.New("source message channel closed")
}

pos := Position{DeliveryTag: msg.DeliveryTag}.ToSdkPosition()
pos := Position{
DeliveryTag: msg.DeliveryTag,
QueueName: s.queue.Name,
}.ToSdkPosition()
var metadata sdk.Metadata
var key sdk.Data

Expand All @@ -97,13 +100,15 @@ func (s *Source) Ack(ctx context.Context, position sdk.Position) error {
}

func (s *Source) Teardown(ctx context.Context) error {
chErr := s.ch.Close()
connErr := s.conn.Close()

return errors.Join(chErr, connErr)
errCh := s.ch.Close()
errConn := s.conn.Close()
return errors.Join(errCh, errConn)
}

type Position struct{ DeliveryTag uint64 }
type Position struct {
DeliveryTag uint64
QueueName string
}

func (p Position) ToSdkPosition() sdk.Position {
bs, err := json.Marshal(p)
Expand Down

0 comments on commit c399e3e

Please sign in to comment.