-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add sqs queue extractor #318
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #318 +/- ##
==========================================
+ Coverage 97.70% 97.74% +0.04%
==========================================
Files 139 142 +3
Lines 4662 4754 +92
==========================================
+ Hits 4555 4647 +92
Misses 107 107
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@grantleehoffman Should this have an update in the documentation? Maybe worth adding it to the docs here: |
""" | ||
|
||
@classmethod | ||
def from_file_data(cls, connector: str, record_format: str, **connector_args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to add a test case for this? It looks like it may be tricky and that may have been why you omitted it in which case I am okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
@@ -0,0 +1,88 @@ | |||
import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe overkill, but could be beneficial to add some design diagrams to the developer reference: https://nodestream-proj.github.io/docs/docs/category/developer-reference/
@@ -0,0 +1,122 @@ | |||
# Customizing the Queue Extractor | |||
|
|||
The `QueueExtractor` is responsible for extracting data from a queue source and converting it into a stream of records. The `QueueExtractor` is a subclass of `nodestream.pipeline.extractors:Extractor` and is responsible for the following: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can keep this here for now, but we'll need to move these docs to somewhere like here so they are on the public facing docs: https://github.com/nodestream-proj/docs/blob/main/docs/tutorials-advanced/extending-the-dsl.mdx
@@ -55,6 +55,54 @@ set the `record_format` to be `json` in the `StreamExtractor` configuration. For | |||
record_format: json | |||
``` | |||
|
|||
## `QueueExtractor` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a pr for the public docs after we finalize this pr.
I will add an update to the public docs once these changes have gone through review and are finalized. |
Adds Queue extractor and SQS connector for processing queue messages from a SQS queue.
Has property
delete_after_read
which defaults toTrue
. This will delete queue message batches after successfully reading to pipeline from extractor step.Future Feature: Add in tracking of message processing through pipeline and delete once all writes are finished for each message.