-
Notifications
You must be signed in to change notification settings - Fork 349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1206: add async to aws:s3 and aws:ecr #1192
Conversation
cartography/util.py
Outdated
return asyncio.get_event_loop().run_in_executor(None, call) | ||
|
||
|
||
def to_sync(*awaitables: Awaitable[Any]) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def to_sync(*awaitables: Awaitable[Any]) -> Any: | |
def to_synchronous(*awaitables: Awaitable[Any]) -> Any: |
since the word "sync" is used a lot in the project
cartography/util.py
Outdated
@@ -297,3 +301,31 @@ def batch(items: Iterable, size: int = DEFAULT_BATCH_SIZE) -> List[List]: | |||
items[i: i + size] | |||
for i in range(0, len(items), size) | |||
] | |||
|
|||
|
|||
def to_async(func: Callable, *args: Any, **kwargs: Any) -> asyncio.Future: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def to_async(func: Callable, *args: Any, **kwargs: Any) -> asyncio.Future: | |
def to_asynchronous(func: Callable, *args: Any, **kwargs: Any) -> asyncio.Future: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
cartography/intel/aws/ecr.py
Outdated
for repo in repositories: | ||
repo_image_obj = get_ecr_repository_images(boto3_session, region, repo['repositoryName']) | ||
|
||
async def async_get_images(repo: Dict[str, Any]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For someone like me who does not know how async
works, can you briefly describe what's going on here? E.g. how many calls to get_ecr_repository_images
are happening in parallel, is it configurable, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 at most becuase of the default threadpool size. checking to see if we can make it configurable
Update.
There was a time when the default was 10. but here are docs
https://docs.python.org/3.8/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
Changed in version 3.8: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.
I think it's better to leave it at the default. On an M1 with 12 cores, it doesn't get noticeably faster than the default, even with max_workers=1000
.
Also: I just played with this and it is indeed very slick 👍 |
Adds docs on - discovering what other nodes are connected to a given node type - discovering what node properties are present on a given node type Updates docs on - roadmap link - making syncmetadata docs more discoverable
…iple intel modules (#1214) See #1210 for full context. #1154 tried to solve this problem by updating the querybuilder but this was too complex and would not generalize well. This solution is simpler where we use different property classes for each API response so that we don't overwrite properties on a node set by another sync job. This PR can be reviewed commit-by-commit: - c0d9ac4 shows a repro of the error with a failing integration test. - facb63b shows the solution using multiple classes. --------- Co-authored-by: Ramon Petgrave <[email protected]>
It's possible for neo4j sessions `read_transaction` in `get_state` to return an empty list in the drift detection module. This PR ensures that there are entries before referencing index 0. ``` File "/code/venvs/venv/lib/python3.8/site-packages/cartography/driftdetect/get_states.py", line 123, in get_query_state get_state(session, state) File "/code/venvs/venv/lib/python3.8/site-packages/cartography/driftdetect/get_states.py", line 148, in get_state state.properties = list(new_results[0].keys()) IndexError: list index out of range ```
Instruct how we plan to use the Discussions feature
c2cc178
to
4c89a35
Compare
6c9afd1
to
c8273f1
Compare
|
||
Calls are also wrapped within a backoff decorator to handle throttling errors. | ||
|
||
example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good that you included an example, but since this is going to be a core function, I think we should have detailed docs on what each of the params is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
cartography/util.py
Outdated
def wrapper(*args: Any, **kwargs: Any) -> R: | ||
try: | ||
return func(*args, **kwargs) | ||
except botocore.exceptions.ClientError as error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This func is located in cartography.util. If we decide to use this for other sync jobs (non-AWS), will we add all of the different exception types here too, like will GCP exceptions be here too? Might end up having a lot of excepts
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I've refactored to another is_throttling_exception()
function
cartography/util.py
Outdated
# nest_asyncio.apply() | ||
''' | ||
CartographyThrottlingException = type('CartographyThrottlingException', (Exception,), {}) | ||
throttling_error_codes = ['LimitExceededException', 'Throttling'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This set of functions is located in cartography.util. Other libraries might use different exception names, so it might get unwieldy to have AWS, GCP, Azure names for throttling exceptions all in one place. I wonder if it might make sense to have this in cartography.aws.util (or whatever the equivalent is) first until we have an example of doing this with another module.
Waits for the Awaitable(s) to complete and returns their result(s). | ||
See https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables | ||
|
||
example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing about explicitly documenting the parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added as the normal :param : spec
CartographyThrottlingException = type('CartographyThrottlingException', (Exception,), {}) | ||
|
||
@wraps(func) | ||
def wrapper(*args: Any, **kwargs: Any) -> R: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nonblock][question] What's the typevar R do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It binds the return type of func
so that mypy can understand that the wrapped function is supposed to still return type R
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome thank you!
…ncf#1192) Add some helper functions for refactoring existing modules to be more async and use them for aws:s3 and aws:ecr. In a test on one of our larger aws accounts, we've seen ~90% reduction in sync time for s3 and ecr. - `to_async` wraps a regular synchronous function so that it can be used within async functions in a non-blocking fashion - `so_sync` 1. takes a series are [Awaitables](https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables) (e.g., container objects returned when invoking async functions) 2. schedules them all to be run simultaneously 3. blocks until they are all finished 4. returns all the results in the same order There is a caveat when running these functions from a Jupyter notebook. You must do a workaround, but homefully in future versions of asyncio this will not be needed ``` # import nest_asyncio # nest_asyncio.apply() ``` ### Testing Existing unit tests mostly cover these changes, but I also did some manual testing against a real AWS account.
Add some helper functions for refactoring existing modules to be more async and use them for aws:s3 and aws:ecr.
In a test on one of our larger aws accounts, we've seen ~90% reduction in sync time for s3 and ecr.
to_async
wraps a regular synchronous function so that it can be used within async functions in a non-blocking fashionso_sync
There is a caveat when running these functions from a Jupyter notebook.
You must do a workaround, but homefully in future versions of asyncio this will not be needed
Testing
Existing unit tests mostly cover these changes, but I also did some manual testing against a real AWS account.