Skip to content

Commit

Permalink
Fill source.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Jan 31, 2024
1 parent 8277700 commit 67d0e1c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 58 deletions.
17 changes: 11 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package rabbitmq

// Config contains shared config parameters, common to the source and
// destination. If you don't need shared parameters you can entirely remove this
// file.
type Config struct {
// GlobalConfigParam is named global_config_param_name and needs to be
// provided by the user.
GlobalConfigParam string `json:"global_config_param_name" validate:"required"`
URL string `json:"url" validate:"required"`
}

type SourceConfig struct {
Config

QueueName string `json:"queueName" validate:"required"`
}

type DestinationConfig struct {
Config
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/golangci/golangci-lint v1.55.2
github.com/matryer/is v1.4.1
github.com/rabbitmq/amqp091-go v1.9.0
)

require (
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 h1:TCg2WBOl
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727/go.mod h1:rlzQ04UMyJXu/aOvhd8qT+hvDrFpiwqp8MRXDY9szc0=
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 h1:M8mH9eK4OUR4lu7Gd+PU1fV2/qnDNfzT635KRSObncs=
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567/go.mod h1:DWNGW8A4Y+GyBgPuaQJuWiy0XYftx4Xm/y5Jqk9I6VQ=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down Expand Up @@ -643,6 +645,7 @@ go.tmz.dev/musttag v0.7.2 h1:1J6S9ipDbalBSODNT5jCep8dhZyMr4ttnjQagmGYR5s=
go.tmz.dev/musttag v0.7.2/go.mod h1:m6q5NiiSKMnQYokefa2xGoyoXnrswCbJ0AWYzf4Zs28=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down
132 changes: 80 additions & 52 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,34 @@ package rabbitmq

import (
"context"
"encoding/json"
"errors"
"fmt"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/rabbitmq/amqp091-go"
)

type Source struct {
sdk.UnimplementedSource

config SourceConfig
lastPositionRead sdk.Position //nolint:unused // this is just an example
}
conn *amqp091.Connection
ch *amqp091.Channel
queue amqp091.Queue
msgs <-chan amqp091.Delivery

type SourceConfig struct {
// Config includes parameters that are the same in the source and destination.
Config
// SourceConfigParam is named foo and must be provided by the user.
SourceConfigParam string `json:"foo" validate:"required"`
config SourceConfig
}

func NewSource() sdk.Source {
// Create Source and wrap it in the default middleware.
return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...)
}

func (s *Source) Parameters() map[string]sdk.Parameter {
// Parameters is a map of named Parameters that describe how to configure
// the Source. Parameters can be generated from SourceConfig with paramgen.
return s.config.Parameters()
}

func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
// Configure is the first function to be called in a connector. It provides
// the connector with the configuration that can be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The SDK will validate the configuration and populate default values
// before calling Configure. If you need to do more complex validations you
// can do them manually here.

sdk.Logger(ctx).Info().Msg("Configuring Source...")
err := sdk.Util.ParseConfig(cfg, &s.config)
if err != nil {
Expand All @@ -52,47 +40,87 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
return nil
}

func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
// Open is called after Configure to signal the plugin it can prepare to
// start producing records. If needed, the plugin should open connections in
// this function. The position parameter will contain the position of the
// last record that was successfully processed, Source should therefore
// start producing records after this position. The context passed to Open
// will be cancelled once the plugin receives a stop signal from Conduit.
func (s *Source) Open(ctx context.Context, pos sdk.Position) (err error) {
s.conn, err = amqp091.Dial(s.config.URL)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}

s.ch, err = s.conn.Channel()
if err != nil {
return fmt.Errorf("failed to open channel: %w", err)
}

s.queue, err = s.ch.QueueDeclare(s.config.QueueName, false, false, false, false, nil)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}

s.msgs, err = s.ch.Consume(s.queue.Name, "", false, false, false, false, nil)
if err != nil {
return fmt.Errorf("failed to consume: %w", err)
}

return nil
}

func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
// Read returns a new Record and is supposed to block until there is either
// a new record or the context gets cancelled. It can also return the error
// ErrBackoffRetry to signal to the SDK it should call Read again with a
// backoff retry.
// If Read receives a cancelled context or the context is cancelled while
// Read is running it must stop retrieving new records from the source
// system and start returning records that have already been buffered. If
// there are no buffered records left Read must return the context error to
// signal a graceful stop. If Read returns ErrBackoffRetry while the context
// is cancelled it will also signal that there are no records left and Read
// won't be called again.
// After Read returns an error the function won't be called again (except if
// the error is ErrBackoffRetry, as mentioned above).
// Read can be called concurrently with Ack.
return sdk.Record{}, nil
var rec sdk.Record

msg, ok := <-s.msgs
if !ok {
return rec, errors.New("source message channel closed")
}

pos := Position{DeliveryTag: msg.DeliveryTag}.ToSdkPosition()
var metadata sdk.Metadata
var key sdk.Data

var payload sdk.Data = sdk.RawData(msg.Body)

rec = sdk.Util.Source.NewRecordCreate(pos, metadata, key, payload)

return rec, nil
}

func (s *Source) Ack(ctx context.Context, position sdk.Position) error {
// Ack signals to the implementation that the record with the supplied
// position was successfully processed. This method might be called after
// the context of Read is already cancelled, since there might be
// outstanding acks that need to be delivered. When Teardown is called it is
// guaranteed there won't be any more calls to Ack.
// Ack can be called concurrently with Read.
pos, err := parseSdkPosition(position)
if err != nil {
return fmt.Errorf("failed to parse position: %w", err)
}

if err := s.ch.Ack(pos.DeliveryTag, false); err != nil {
return fmt.Errorf("failed to ack message: %w", err)
}

return nil
}

func (s *Source) Teardown(ctx context.Context) error {
// Teardown signals to the plugin that there will be no more calls to any
// other function. After Teardown returns, the plugin should be ready for a
// graceful shutdown.
return nil
chErr := s.ch.Close()
connErr := s.conn.Close()

return errors.Join(chErr, connErr)
}

type Position struct{ DeliveryTag uint64 }

func (p Position) ToSdkPosition() sdk.Position {
bs, err := json.Marshal(p)
if err != nil {
// this error should not be possible
panic(fmt.Errorf("error marshaling position to JSON: %w", err))
}

return sdk.Position(bs)
}

func parseSdkPosition(pos sdk.Position) (Position, error) {
var p Position
err := json.Unmarshal([]byte(pos), &p)
if err != nil {
return p, fmt.Errorf("error unmarshaling position from JSON: %w", err)
}

return p, nil
}

0 comments on commit 67d0e1c

Please sign in to comment.