diff --git a/source.go b/source.go index 1cdaa5a..12218ce 100644 --- a/source.go +++ b/source.go @@ -97,25 +97,29 @@ func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) (err error) { return nil } -func (s *Source) Read(_ context.Context) (sdk.Record, error) { +func (s *Source) Read(ctx context.Context) (sdk.Record, error) { var rec sdk.Record - msg, ok := <-s.msgs - if !ok { - return rec, errors.New("source message channel closed") - } + select { + case <-ctx.Done(): + return rec, ctx.Err() + case msg, ok := <-s.msgs: + if !ok { + return rec, errors.New("source message channel closed") + } - var ( - pos = Position{msg.DeliveryTag, s.queue.Name} - sdkPos = pos.ToSdkPosition() - metadata = metadataFromMessage(msg) - key = sdk.RawData(msg.MessageId) - payload = sdk.RawData(msg.Body) - ) + var ( + pos = Position{msg.DeliveryTag, s.queue.Name} + sdkPos = pos.ToSdkPosition() + metadata = metadataFromMessage(msg) + key = sdk.RawData(msg.MessageId) + payload = sdk.RawData(msg.Body) + ) - rec = sdk.Util.Source.NewRecordCreate(sdkPos, metadata, key, payload) + rec = sdk.Util.Source.NewRecordCreate(sdkPos, metadata, key, payload) - return rec, nil + return rec, nil + } } func (s *Source) Ack(_ context.Context, position sdk.Position) error {