Skip to content

Commit

Permalink
Handle context timeout on source.Read
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Feb 6, 2024
1 parent 63797be commit 91599f5
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,29 @@ func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) (err error) {
return nil
}

func (s *Source) Read(_ context.Context) (sdk.Record, error) {
func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
var rec sdk.Record

msg, ok := <-s.msgs
if !ok {
return rec, errors.New("source message channel closed")
}
select {
case <-ctx.Done():
return rec, ctx.Err()
case msg, ok := <-s.msgs:
if !ok {
return rec, errors.New("source message channel closed")
}

var (
pos = Position{msg.DeliveryTag, s.queue.Name}
sdkPos = pos.ToSdkPosition()
metadata = metadataFromMessage(msg)
key = sdk.RawData(msg.MessageId)
payload = sdk.RawData(msg.Body)
)
var (
pos = Position{msg.DeliveryTag, s.queue.Name}
sdkPos = pos.ToSdkPosition()
metadata = metadataFromMessage(msg)
key = sdk.RawData(msg.MessageId)
payload = sdk.RawData(msg.Body)
)

rec = sdk.Util.Source.NewRecordCreate(sdkPos, metadata, key, payload)
rec = sdk.Util.Source.NewRecordCreate(sdkPos, metadata, key, payload)

return rec, nil
return rec, nil
}
}

func (s *Source) Ack(_ context.Context, position sdk.Position) error {
Expand Down

0 comments on commit 91599f5

Please sign in to comment.