diff --git a/integrations/gcp/CHANGELOG.md b/integrations/gcp/CHANGELOG.md index 40d5c6743d..7df7c280cf 100644 --- a/integrations/gcp/CHANGELOG.md +++ b/integrations/gcp/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.56 (2024-09-15) + + +### Improvements + +- Extracted the subscription from the asset inventory and added specific fetching via the GCP's SubscriberAPI. +- Changed realtime's default non-specific behavior to rely on the asset's data in the feed. + + ## 0.1.55 (2024-09-12) diff --git a/integrations/gcp/examples/blueprints.json b/integrations/gcp/examples/blueprints.json new file mode 100644 index 0000000000..8461b4c349 --- /dev/null +++ b/integrations/gcp/examples/blueprints.json @@ -0,0 +1,51 @@ +{ + "identifier": "googleCloudSubscription", + "description": "This blueprint represents a Google Cloud Subscription", + "title": "Google Cloud Subscription", + "icon": "GoogleCloud", + "schema": { + "properties": { + "pushConfig": { + "type": "object", + "title": "Push Configuration", + "description": "The ingestion configuration for this subscription" + }, + "ackDeadlineSeconds": { + "type": "number", + "title": "Ack Deadline Seconds", + "description": "The maximum time after receiving a message that the subscriber should acknowledge the message" + }, + "labels": { + "type": "object", + "title": "Labels", + "description": "A set of key/value label pairs to assign to this subscription" + }, + "retainAckedMessages": { + "type": "boolean", + "title": "Retain Acked Messages", + "description": "Indicates whether to retain acknowledged messages" + }, + "messageRetentionDuration": { + "type": "boolean", + "title": "Message Retention Duration", + "description": "How long to retain unacknowledged messages in the subscription's backlog" + }, + "filter": { + "type": "string", + "title": "Filter", + "description": "A filter expression that determines which messages should be delivered to the subscriber" + } + }, + "required": [] + }, + "mirrorProperties": {}, + "calculationProperties": {}, + "relations": { + "project": { + "target": "gcpProject", + "title": "Project", + "required": true, + "many": false + } + } +} diff --git a/integrations/gcp/examples/mappings.yaml b/integrations/gcp/examples/mappings.yaml new file mode 100644 index 0000000000..c1d4ea396f --- /dev/null +++ b/integrations/gcp/examples/mappings.yaml @@ -0,0 +1,21 @@ +createMissingRelatedEntities: true +deleteDependentEntities: true +resources: + - kind: pubsub.googleapis.com/Subscription + selector: + query: "true" + port: + entity: + mappings: + identifier: .name + title: '.name | split("/") | last' + blueprint: '"googleCloudSubscription"' + properties: + pushConfig: .pushConfig + ackDeadlineSeconds: .ackDeadlineSeconds + labels: .labels + retainAckedMessages: .retainAckedMessages + messageRetentionDuration: .messageRetentionDuration + filter: .filter + relations: + project: .__project.name diff --git a/integrations/gcp/gcp_core/search/resource_searches.py b/integrations/gcp/gcp_core/search/resource_searches.py index 7302fb4c0f..00877e4202 100644 --- a/integrations/gcp/gcp_core/search/resource_searches.py +++ b/integrations/gcp/gcp_core/search/resource_searches.py @@ -11,6 +11,7 @@ ProjectsAsyncClient, ) from google.pubsub_v1.services.publisher import PublisherAsyncClient +from google.pubsub_v1.services.subscriber import SubscriberAsyncClient from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM from port_ocean.utils.cache import cache_iterator_result @@ -137,6 +138,44 @@ async def list_all_topics_per_project( logger.info(f"Successfully listed all topics within project {project_name}") +async def list_all_subscriptions_per_project( + project: dict[str, Any], **kwargs: Any +) -> ASYNC_GENERATOR_RESYNC_TYPE: + """ + This lists all Topics under a certain project. + The Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory. + The listing is being done via the PublisherAsyncClient, ignoring state in assets inventory + """ + async with SubscriberAsyncClient() as async_subscriber_client: + project_name = project["name"] + logger.info( + f"Searching all {AssetTypesWithSpecialHandling.SUBSCRIPTION}'s in project {project_name}" + ) + try: + async for subscriptions in paginated_query( + async_subscriber_client, + "list_subscriptions", + {"project": project_name}, + lambda response: parse_protobuf_messages(response.subscriptions), + kwargs.get("rate_limiter"), + ): + for subscription in subscriptions: + subscription[EXTRA_PROJECT_FIELD] = project + yield subscriptions + except PermissionDenied: + logger.error( + f"Service account doesn't have permissions to list subscriptions from project {project_name}" + ) + except NotFound: + logger.info( + f"Couldn't perform list_subscriptions on project {project_name} since it's deleted." + ) + else: + logger.info( + f"Successfully listed all subscriptions within project {project_name}" + ) + + @cache_iterator_result() async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE: logger.info("Searching projects") @@ -214,6 +253,19 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM: ) +async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM: + """ + Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing. + Here the SubscriberAsyncClient is used, ignoring state in assets inventory + """ + async with SubscriberAsyncClient() as async_subscriber_client: + return parse_protobuf_message( + await async_subscriber_client.get_subscription( + subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT + ) + ) + + async def search_single_resource( project: dict[str, Any], asset_kind: str, asset_name: str ) -> RAW_ITEM: @@ -235,25 +287,35 @@ async def search_single_resource( async def feed_event_to_resource( - asset_type: str, asset_name: str, project_id: str + asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any] ) -> RAW_ITEM: resource = None - match asset_type: - case AssetTypesWithSpecialHandling.TOPIC: - topic_name = asset_name.replace("//pubsub.googleapis.com/", "") - resource = await get_single_topic(project_id, topic_name) - resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) - case AssetTypesWithSpecialHandling.FOLDER: - folder_id = asset_name.replace("//cloudresourcemanager.googleapis.com/", "") - resource = await get_single_folder(folder_id) - case AssetTypesWithSpecialHandling.ORGANIZATION: - organization_id = asset_name.replace( - "//cloudresourcemanager.googleapis.com/", "" - ) - resource = await get_single_organization(organization_id) - case AssetTypesWithSpecialHandling.PROJECT: - resource = await get_single_project(project_id) - case _: - project = await get_single_project(project_id) - resource = await search_single_resource(project, asset_type, asset_name) + if asset_data.get("deleted") is True: + resource = asset_data["priorAsset"]["resource"]["data"] + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + else: + match asset_type: + case AssetTypesWithSpecialHandling.TOPIC: + topic_name = asset_name.replace("//pubsub.googleapis.com/", "") + resource = await get_single_topic(project_id, topic_name) + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + case AssetTypesWithSpecialHandling.SUBSCRIPTION: + topic_name = asset_name.replace("//pubsub.googleapis.com/", "") + resource = await get_single_subscription(project_id, topic_name) + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) + case AssetTypesWithSpecialHandling.FOLDER: + folder_id = asset_name.replace( + "//cloudresourcemanager.googleapis.com/", "" + ) + resource = await get_single_folder(folder_id) + case AssetTypesWithSpecialHandling.ORGANIZATION: + organization_id = asset_name.replace( + "//cloudresourcemanager.googleapis.com/", "" + ) + resource = await get_single_organization(organization_id) + case AssetTypesWithSpecialHandling.PROJECT: + resource = await get_single_project(project_id) + case _: + resource = asset_data["asset"]["resource"]["data"] + resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id) return resource diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index a6537b7208..b91ff75890 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -60,6 +60,7 @@ def parse_protobuf_messages( class AssetTypesWithSpecialHandling(enum.StrEnum): TOPIC = "pubsub.googleapis.com/Topic" + SUBSCRIPTION = "pubsub.googleapis.com/Subscription" PROJECT = "cloudresourcemanager.googleapis.com/Project" ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization" FOLDER = "cloudresourcemanager.googleapis.com/Folder" @@ -115,7 +116,7 @@ def get_service_account_project_id() -> str: return gcp_project_env else: raise ValueError( - f"Couldn't figure out the service account's project id. You can specify it usign the GCP_PROJECT environment variable. Error: {str(e)}" + f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}" ) except KeyError as e: raise ValueError( @@ -128,32 +129,34 @@ def get_service_account_project_id() -> str: raise ValueError("Couldn't figure out the service account's project id.") -async def resolve_request_controllers( - kind: str, +async def get_quotas_for_project( + project_id: str, kind: str ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: try: - service_account_project_id = get_service_account_project_id() - - if kind == AssetTypesWithSpecialHandling.TOPIC: - topic_rate_limiter = ( - await pubsub_administrator_per_minute_per_project.limiter( - service_account_project_id + match kind: + case ( + AssetTypesWithSpecialHandling.TOPIC + | AssetTypesWithSpecialHandling.SUBSCRIPTION + ): + topic_rate_limiter = ( + await pubsub_administrator_per_minute_per_project.limiter( + project_id + ) ) - ) - topic_semaphore = ( - await pubsub_administrator_per_minute_per_project.semaphore( - service_account_project_id + topic_semaphore = ( + await pubsub_administrator_per_minute_per_project.semaphore( + project_id + ) ) - ) - return (topic_rate_limiter, topic_semaphore) - - asset_rate_limiter = await search_all_resources_qpm_per_project.limiter( - service_account_project_id - ) - asset_semaphore = await search_all_resources_qpm_per_project.semaphore( - service_account_project_id - ) - return (asset_rate_limiter, asset_semaphore) + return (topic_rate_limiter, topic_semaphore) + case _: + asset_rate_limiter = await search_all_resources_qpm_per_project.limiter( + project_id + ) + asset_semaphore = await search_all_resources_qpm_per_project.semaphore( + project_id + ) + return (asset_rate_limiter, asset_semaphore) except Exception as e: logger.warning( f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}" @@ -165,3 +168,10 @@ async def resolve_request_controllers( await search_all_resources_qpm_per_project.default_semaphore() ) return (default_rate_limiter, default_semaphore) + + +async def resolve_request_controllers( + kind: str, +) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: + service_account_project_id = get_service_account_project_id() + return await get_quotas_for_project(service_account_project_id, kind) diff --git a/integrations/gcp/main.py b/integrations/gcp/main.py index 1ef79c56e2..93da63f57e 100644 --- a/integrations/gcp/main.py +++ b/integrations/gcp/main.py @@ -6,19 +6,18 @@ from fastapi import Request, Response from loguru import logger from port_ocean.context.ocean import ocean -from port_ocean.core.models import Entity from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from gcp_core.errors import ( AssetHasNoProjectAncestorError, GotFeedCreatedSuccessfullyMessageError, - ResourceNotFoundError, ) from gcp_core.feed_event import get_project_name_from_ancestors, parse_asset_data from gcp_core.overrides import GCPCloudResourceSelector from gcp_core.search.iterators import iterate_per_available_project from gcp_core.search.resource_searches import ( feed_event_to_resource, + list_all_subscriptions_per_project, list_all_topics_per_project, search_all_folders, search_all_organizations, @@ -44,6 +43,13 @@ async def _resolve_resync_method_for_resource( asset_type=kind, rate_limiter=topic_rate_limiter, ) + case AssetTypesWithSpecialHandling.SUBSCRIPTION: + subscription_rate_limiter, _ = await resolve_request_controllers(kind) + return iterate_per_available_project( + list_all_subscriptions_per_project, + asset_type=kind, + rate_limiter=subscription_rate_limiter, + ) case AssetTypesWithSpecialHandling.FOLDER: return search_all_folders() case AssetTypesWithSpecialHandling.ORGANIZATION: @@ -108,6 +114,17 @@ async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: yield batch +@ocean.on_resync(kind=AssetTypesWithSpecialHandling.SUBSCRIPTION) +async def resync_subscriptions(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + topic_rate_limiter, _ = await resolve_request_controllers(kind) + async for batch in iterate_per_available_project( + list_all_subscriptions_per_project, + asset_type=kind, + topic_rate_limiter=topic_rate_limiter, + ): + yield batch + + @ocean.on_resync() async def resync_resources(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: if kind in iter(AssetTypesWithSpecialHandling): @@ -160,36 +177,26 @@ async def feed_events_callback(request: Request) -> Response: asset_project = get_project_name_from_ancestors( asset_data["asset"]["ancestors"] ) - with logger.contextualize( - asset_type=asset_type, asset_name=asset_name, asset_project=asset_project - ): - logger.info("Got Real-Time event") - resource = await feed_event_to_resource( - asset_type=asset_type, project_id=asset_project, asset_name=asset_name + logger.info( + f"Got Real-Time event for kind: {asset_type} with name: {asset_name} from project: {asset_project}" + ) + asset_resource_data = await feed_event_to_resource( + asset_type, asset_name, asset_project, asset_data + ) + if asset_data.get("deleted") is True: + logger.info( + f"Resource {asset_type} : {asset_name} has been deleted in GCP, unregistering from port" + ) + await ocean.unregister_raw(asset_type, [asset_resource_data]) + else: + logger.info( + f"Registering creation/update of resource {asset_type} : {asset_name} in project {asset_project} in Port" ) - if asset_data.get("deleted") is True: - logger.info("Registering a deleted resource") - await ocean.unregister_raw(asset_type, [resource]) - else: - logger.info("Registering a change in the data") - await ocean.register_raw(asset_type, [resource]) + await ocean.register_raw(asset_type, [asset_resource_data]) except AssetHasNoProjectAncestorError: logger.exception( f"Couldn't find project ancestor to asset {asset_name}. Other types of ancestors and not supported yet." ) - except ResourceNotFoundError: - logger.warning( - f"Didn't find any {asset_type} resource named: {asset_name}. Deleting ocean entity." - ) - await ocean.unregister( - [ - Entity( - blueprint=asset_type, - identifier=asset_name, - ) - ] - ) - return Response(status_code=http.HTTPStatus.NOT_FOUND) except GotFeedCreatedSuccessfullyMessageError: logger.info("Assets Feed created successfully") except Exception: diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index 17db9ad03a..2f527c8449 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcp" -version = "0.1.55" +version = "0.1.56" description = "A GCP ocean integration" authors = ["Matan Geva "] diff --git a/integrations/gcp/tests/gcp_core/__init__.py b/integrations/gcp/tests/gcp_core/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/gcp/tests/gcp_core/search/__init__.py b/integrations/gcp/tests/gcp_core/search/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/gcp/tests/gcp_core/search/test_resource_searches.py b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py new file mode 100644 index 0000000000..04a90bb29e --- /dev/null +++ b/integrations/gcp/tests/gcp_core/search/test_resource_searches.py @@ -0,0 +1,156 @@ +from typing import Any +from unittest.mock import AsyncMock, patch + +from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE +from google.pubsub_v1.types import pubsub +from google.cloud.resourcemanager_v3.types import Project + + +async def mock_subscription_pages( + *args: Any, **kwargs: Any +) -> ASYNC_GENERATOR_RESYNC_TYPE: + yield [{"name": "subscription_1"}, {"name": "subscription_2"}] # First page + yield [{"name": "subscription_3"}, {"name": "subscription_4"}] # Second page + + +@patch( + "port_ocean.context.ocean.PortOceanContext.integration_config", + return_value={"search_all_resources_per_minute_quota": 100}, +) +@patch("gcp_core.search.paginated_query.paginated_query", new=mock_subscription_pages) +@patch("google.pubsub_v1.services.subscriber.SubscriberAsyncClient", new=AsyncMock) +async def test_list_all_subscriptions_per_project(integration_config_mock: Any) -> None: + # Arrange + from gcp_core.search.resource_searches import list_all_subscriptions_per_project + + expected_subscriptions = [ + {"__project": {"name": "project_name"}, "name": "subscription_1"}, + {"__project": {"name": "project_name"}, "name": "subscription_2"}, + {"__project": {"name": "project_name"}, "name": "subscription_3"}, + {"__project": {"name": "project_name"}, "name": "subscription_4"}, + ] + mock_project = {"name": "project_name"} + + # Act + actual_subscriptions = [] + async for file in list_all_subscriptions_per_project(mock_project): + actual_subscriptions.extend(file) + + # Assert + assert len(actual_subscriptions) == 4 + assert actual_subscriptions == expected_subscriptions + + +@patch( + "port_ocean.context.ocean.PortOceanContext.integration_config", + return_value={"search_all_resources_per_minute_quota": 100}, +) +async def test_get_single_subscription( + integration_config: Any, monkeypatch: Any +) -> None: + # Arrange + subscriber_async_client_mock = AsyncMock + monkeypatch.setattr( + "google.pubsub_v1.services.subscriber.SubscriberAsyncClient", + subscriber_async_client_mock, + ) + subscriber_async_client_mock.get_subscription = AsyncMock() + subscriber_async_client_mock.get_subscription.return_value = pubsub.Subscription( + {"name": "subscription_name"} + ) + + from gcp_core.search.resource_searches import get_single_subscription + + expected_subscription = { + "ack_deadline_seconds": 0, + "detached": False, + "enable_exactly_once_delivery": False, + "enable_message_ordering": False, + "filter": "", + "labels": {}, + "name": "subscription_name", + "retain_acked_messages": False, + "state": 0, + "topic": "", + } + mock_project = "project_name" + + # Act + actual_subscription = await get_single_subscription( + mock_project, "subscription_name" + ) + + # Assert + assert actual_subscription == expected_subscription + + +@patch( + "port_ocean.context.ocean.PortOceanContext.integration_config", + return_value={"search_all_resources_per_minute_quota": 100}, +) +async def test_feed_to_resource(integration_config: Any, monkeypatch: Any) -> None: + # Arrange + + ## Mock project client + projects_async_client_mock = AsyncMock + monkeypatch.setattr( + "google.cloud.resourcemanager_v3.ProjectsAsyncClient", + projects_async_client_mock, + ) + projects_async_client_mock.get_project = AsyncMock() + projects_async_client_mock.get_project.return_value = Project( + {"name": "project_name"} + ) + + ## Mock publisher client + publisher_async_client_mock = AsyncMock + monkeypatch.setattr( + "google.pubsub_v1.services.publisher.PublisherAsyncClient", + publisher_async_client_mock, + ) + publisher_async_client_mock.get_topic = AsyncMock() + publisher_async_client_mock.get_topic.return_value = pubsub.Topic( + {"name": "topic_name"} + ) + + from gcp_core.search.resource_searches import feed_event_to_resource + + mock_asset_name = "projects/project_name/topics/topic_name" + mock_asset_type = "pubsub.googleapis.com/Topic" + mock_asset_project_name = "project_name" + mock_asset_data = { + "asset": { + "name": mock_asset_name, + "asset_type": mock_asset_type, + }, + "event": "google.cloud.audit.log.v1.written", + "project": "project_name", + } + + expected_resource = { + "__project": { + "display_name": "", + "etag": "", + "labels": {}, + "name": "project_name", + "parent": "", + "project_id": "", + "state": 0, + }, + "kms_key_name": "", + "labels": {}, + "name": "topic_name", + "satisfies_pzs": False, + "state": 0, + } + + # Act + actual_resource = await feed_event_to_resource( + asset_type=mock_asset_type, + asset_name=mock_asset_name, + project_id=mock_asset_project_name, + asset_data=mock_asset_data, + ) + + # Assert + assert actual_resource == expected_resource diff --git a/integrations/gcp/tests/test_sample.py b/integrations/gcp/tests/test_sample.py deleted file mode 100644 index dc80e299c8..0000000000 --- a/integrations/gcp/tests/test_sample.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_example() -> None: - assert 1 == 1