Skip to content

Commit

Permalink
[GoogleCloud] Real time improvements + Susbscription as specific reso…
Browse files Browse the repository at this point in the history
…urce (port-labs#1010)

# Description - Real time improvements

What - Changed default resource realtime data + Changed real time
calculation of deleted entities

Why - Remove redundant unreliable calls to the GCP's Asset inventory

How - Use the asset data for the current upserted non-specific kind + in
delete use the prior state of the asset's resource

# Description - Added subscription specific handling

What - Added subscription as specific kind we get from the api and not
from the asset inventory

Why - Inconsistent behavior with asset inventory - Very late updates of
inventory + not full data on real time events

How - added subscriber api that will be able to perform
list_subscriptions + get_subscription

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [x] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Resync finishes successfully
- [x] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [x] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [x] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
matan84 authored Sep 15, 2024
1 parent c424416 commit 119cf98
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 72 deletions.
9 changes: 9 additions & 0 deletions integrations/gcp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.56 (2024-09-15)


### Improvements

- Extracted the subscription from the asset inventory and added specific fetching via the GCP's SubscriberAPI.
- Changed realtime's default non-specific behavior to rely on the asset's data in the feed.


## 0.1.55 (2024-09-12)


Expand Down
51 changes: 51 additions & 0 deletions integrations/gcp/examples/blueprints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"identifier": "googleCloudSubscription",
"description": "This blueprint represents a Google Cloud Subscription",
"title": "Google Cloud Subscription",
"icon": "GoogleCloud",
"schema": {
"properties": {
"pushConfig": {
"type": "object",
"title": "Push Configuration",
"description": "The ingestion configuration for this subscription"
},
"ackDeadlineSeconds": {
"type": "number",
"title": "Ack Deadline Seconds",
"description": "The maximum time after receiving a message that the subscriber should acknowledge the message"
},
"labels": {
"type": "object",
"title": "Labels",
"description": "A set of key/value label pairs to assign to this subscription"
},
"retainAckedMessages": {
"type": "boolean",
"title": "Retain Acked Messages",
"description": "Indicates whether to retain acknowledged messages"
},
"messageRetentionDuration": {
"type": "boolean",
"title": "Message Retention Duration",
"description": "How long to retain unacknowledged messages in the subscription's backlog"
},
"filter": {
"type": "string",
"title": "Filter",
"description": "A filter expression that determines which messages should be delivered to the subscriber"
}
},
"required": []
},
"mirrorProperties": {},
"calculationProperties": {},
"relations": {
"project": {
"target": "gcpProject",
"title": "Project",
"required": true,
"many": false
}
}
}
21 changes: 21 additions & 0 deletions integrations/gcp/examples/mappings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
createMissingRelatedEntities: true
deleteDependentEntities: true
resources:
- kind: pubsub.googleapis.com/Subscription
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: '.name | split("/") | last'
blueprint: '"googleCloudSubscription"'
properties:
pushConfig: .pushConfig
ackDeadlineSeconds: .ackDeadlineSeconds
labels: .labels
retainAckedMessages: .retainAckedMessages
messageRetentionDuration: .messageRetentionDuration
filter: .filter
relations:
project: .__project.name
100 changes: 81 additions & 19 deletions integrations/gcp/gcp_core/search/resource_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ProjectsAsyncClient,
)
from google.pubsub_v1.services.publisher import PublisherAsyncClient
from google.pubsub_v1.services.subscriber import SubscriberAsyncClient
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_ITEM
from port_ocean.utils.cache import cache_iterator_result
Expand Down Expand Up @@ -137,6 +138,44 @@ async def list_all_topics_per_project(
logger.info(f"Successfully listed all topics within project {project_name}")


async def list_all_subscriptions_per_project(
project: dict[str, Any], **kwargs: Any
) -> ASYNC_GENERATOR_RESYNC_TYPE:
"""
This lists all Topics under a certain project.
The Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory.
The listing is being done via the PublisherAsyncClient, ignoring state in assets inventory
"""
async with SubscriberAsyncClient() as async_subscriber_client:
project_name = project["name"]
logger.info(
f"Searching all {AssetTypesWithSpecialHandling.SUBSCRIPTION}'s in project {project_name}"
)
try:
async for subscriptions in paginated_query(
async_subscriber_client,
"list_subscriptions",
{"project": project_name},
lambda response: parse_protobuf_messages(response.subscriptions),
kwargs.get("rate_limiter"),
):
for subscription in subscriptions:
subscription[EXTRA_PROJECT_FIELD] = project
yield subscriptions
except PermissionDenied:
logger.error(
f"Service account doesn't have permissions to list subscriptions from project {project_name}"
)
except NotFound:
logger.info(
f"Couldn't perform list_subscriptions on project {project_name} since it's deleted."
)
else:
logger.info(
f"Successfully listed all subscriptions within project {project_name}"
)


@cache_iterator_result()
async def search_all_projects() -> ASYNC_GENERATOR_RESYNC_TYPE:
logger.info("Searching projects")
Expand Down Expand Up @@ -214,6 +253,19 @@ async def get_single_topic(project_id: str, topic_id: str) -> RAW_ITEM:
)


async def get_single_subscription(project_id: str, subscription_id: str) -> RAW_ITEM:
"""
Subscriptions are handled specifically due to lacks of data in the asset itself within the asset inventory- e.g. some properties missing.
Here the SubscriberAsyncClient is used, ignoring state in assets inventory
"""
async with SubscriberAsyncClient() as async_subscriber_client:
return parse_protobuf_message(
await async_subscriber_client.get_subscription(
subscription=subscription_id, timeout=DEFAULT_REQUEST_TIMEOUT
)
)


async def search_single_resource(
project: dict[str, Any], asset_kind: str, asset_name: str
) -> RAW_ITEM:
Expand All @@ -235,25 +287,35 @@ async def search_single_resource(


async def feed_event_to_resource(
asset_type: str, asset_name: str, project_id: str
asset_type: str, asset_name: str, project_id: str, asset_data: dict[str, Any]
) -> RAW_ITEM:
resource = None
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace("//cloudresourcemanager.googleapis.com/", "")
resource = await get_single_folder(folder_id)
case AssetTypesWithSpecialHandling.ORGANIZATION:
organization_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_organization(organization_id)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id)
case _:
project = await get_single_project(project_id)
resource = await search_single_resource(project, asset_type, asset_name)
if asset_data.get("deleted") is True:
resource = asset_data["priorAsset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
else:
match asset_type:
case AssetTypesWithSpecialHandling.TOPIC:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_topic(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
case AssetTypesWithSpecialHandling.SUBSCRIPTION:
topic_name = asset_name.replace("//pubsub.googleapis.com/", "")
resource = await get_single_subscription(project_id, topic_name)
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
case AssetTypesWithSpecialHandling.FOLDER:
folder_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_folder(folder_id)
case AssetTypesWithSpecialHandling.ORGANIZATION:
organization_id = asset_name.replace(
"//cloudresourcemanager.googleapis.com/", ""
)
resource = await get_single_organization(organization_id)
case AssetTypesWithSpecialHandling.PROJECT:
resource = await get_single_project(project_id)
case _:
resource = asset_data["asset"]["resource"]["data"]
resource[EXTRA_PROJECT_FIELD] = await get_single_project(project_id)
return resource
56 changes: 33 additions & 23 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def parse_protobuf_messages(

class AssetTypesWithSpecialHandling(enum.StrEnum):
TOPIC = "pubsub.googleapis.com/Topic"
SUBSCRIPTION = "pubsub.googleapis.com/Subscription"
PROJECT = "cloudresourcemanager.googleapis.com/Project"
ORGANIZATION = "cloudresourcemanager.googleapis.com/Organization"
FOLDER = "cloudresourcemanager.googleapis.com/Folder"
Expand Down Expand Up @@ -115,7 +116,7 @@ def get_service_account_project_id() -> str:
return gcp_project_env
else:
raise ValueError(
f"Couldn't figure out the service account's project id. You can specify it usign the GCP_PROJECT environment variable. Error: {str(e)}"
f"Couldn't figure out the service account's project id. You can specify it using the GCP_PROJECT environment variable. Error: {str(e)}"
)
except KeyError as e:
raise ValueError(
Expand All @@ -128,32 +129,34 @@ def get_service_account_project_id() -> str:
raise ValueError("Couldn't figure out the service account's project id.")


async def resolve_request_controllers(
kind: str,
async def get_quotas_for_project(
project_id: str, kind: str
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
try:
service_account_project_id = get_service_account_project_id()

if kind == AssetTypesWithSpecialHandling.TOPIC:
topic_rate_limiter = (
await pubsub_administrator_per_minute_per_project.limiter(
service_account_project_id
match kind:
case (
AssetTypesWithSpecialHandling.TOPIC
| AssetTypesWithSpecialHandling.SUBSCRIPTION
):
topic_rate_limiter = (
await pubsub_administrator_per_minute_per_project.limiter(
project_id
)
)
)
topic_semaphore = (
await pubsub_administrator_per_minute_per_project.semaphore(
service_account_project_id
topic_semaphore = (
await pubsub_administrator_per_minute_per_project.semaphore(
project_id
)
)
)
return (topic_rate_limiter, topic_semaphore)

asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
service_account_project_id
)
asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
service_account_project_id
)
return (asset_rate_limiter, asset_semaphore)
return (topic_rate_limiter, topic_semaphore)
case _:
asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
project_id
)
asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
project_id
)
return (asset_rate_limiter, asset_semaphore)
except Exception as e:
logger.warning(
f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
Expand All @@ -165,3 +168,10 @@ async def resolve_request_controllers(
await search_all_resources_qpm_per_project.default_semaphore()
)
return (default_rate_limiter, default_semaphore)


async def resolve_request_controllers(
kind: str,
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
service_account_project_id = get_service_account_project_id()
return await get_quotas_for_project(service_account_project_id, kind)
Loading

0 comments on commit 119cf98

Please sign in to comment.