-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-braintree-native: update transactions
to fetch updates
#2208
Conversation
|
||
queue = asyncio.Queue(maxsize=10) | ||
|
||
async with asyncio.TaskGroup() as tg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably use asyncio.gather
here as well.
A possible consideration is memory usage if there are a lot of batches which all get results concurrently before their documents can be emitted. I doubt this will be a problem in practice since the speed at emitting documents should be quite quick in comparison to fetching batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll refactor to the asyncio.gather
pattern you linked in your other comment.
I agree about memory usage. Theoretically with the current batch size of TRANSACTION_SEARCH_LIMIT
, there could be at max 9 batches (the same as the the number of transaction search fields) if all of them return unique transaction IDs within the time window. In practice, there's a lot of overlap between some of these (like created_at
and submitted_for_settlement_at
), and searches for the non-created_at
fields usually return fewer IDs than created_at
searches. I could add some handling to only allow X batches to fetch results at a time, but I suggest we could wait & see if that's actually needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to refactor this one, but I had to omit the await
. Otherwise, the connector would get stuck putting docs onto the queue but never taking them off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change in _fetch_unique_updated_transaction_ids
looks good 👍
For fetch_transactions
I see how the same approach won't work, since we specifically don't want to keep every single full record in memory.
I'm not 100% sure what you've got in fetch_transactions
right now will work. I think the gather has to be await'd at some point for it to be assured to complete.
As an alternative to using a queue, take a look at asyncio.as_completed
- I think that would do what we want, which is to eagerly process the results of each task as it becomes available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based off our discussion, I made fetch_transactions
fetch batches in sequence instead of in parallel. It'll probably be uncommon to have more than 1-2 batches while we use a batch size of 50,000. We can look into reducing batch size & processing the batches in parallel at a later point.
0a47336
to
c51c0af
Compare
Previously, the transactions stream only incrementally captured creates since Braintree does not expose the `updated_at` field for API searches. The transactions stream has been updated to capture updates. The strategy is: 1. Get the unique ids of all transactions that have been created, voided, settled, etc. (i.e. figuring out what transactions have been updated) in the date window. 2. Request the full transaction objects associated with the unique ids fetched in step 1.
c51c0af
to
3d50de7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description:
Previously, the
transactions
stream only incrementally captured creates since Braintree does not expose theupdated_at
field for API searches.The
transactions
stream has been updated to capture updates, assuming Braintree'supdated_at
field is just the latest of the various searchable???ed_at
fields. The strategy is:updated) in the date window.
asyncio.Queue
as they arrive.The
transactions
stream also has a distinct backfill task now. It ignores any documents with anupdated_at
field that's after the cutoff date, meaning that the incremental task will pick up the updated document.Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
Docs should be updated to reflect that the
transactions
stream captures updates and does not require regular backfills.Notes for reviewers:
Tested on a local stack. Confirmed:
transactions
document IDs in a date window fetched by_fetch_unique_updated_transaction_ids
equals the number of documents yielded with the scatter/gather strategy withinfetch_transactions
.transactions
backfills complete.This change is