Skip to content

Commit

Permalink
docs(samples): Update Topic with Kinesis Ingestion Settings
Browse files Browse the repository at this point in the history
Add code to demonstrate updating a topic with Kinesis Ingestion Settings
IngestionDataSourceSettings were added as an attribute to Topic
[here](https://github.com/googleapis/googleapis/blob/65277ddce9caa1cfd1a0eb7ab67980fc73d20b50/google/pubsub/v1/pubsub.proto#L316)
  • Loading branch information
mukund-ananthu committed Mar 14, 2024
1 parent 83dc9ff commit 332ed86
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 2 deletions.
70 changes: 68 additions & 2 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def create_topic_kinesis_ingestion(
gcp_service_account: str,
) -> None:
"""Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings."""
# [START pubsub_quickstart_create_topic_kinesis_ingestion]
# [START pubsub_create_topic_kinesis_ingestion]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
Expand Down Expand Up @@ -101,10 +100,59 @@ def create_topic_kinesis_ingestion(
topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")
# [END pubsub_quickstart_create_topic_kinesis_ingestion]
# [END pubsub_create_topic_kinesis_ingestion]


def update_topic_kinesis_ingestion(
project_id: str,
topic_id: str,
stream_arn: str,
consumer_arn: str,
aws_role_arn: str,
gcp_service_account: str,
) -> None:
"""Update Pub/Sub topic with AWS Kinesis Ingestion Settings."""
# [START pubsub_update_topic_kinesis_ingestion]
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
topic=Topic(
name=topic_path,
ingestion_data_source_settings=IngestionDataSourceSettings(
aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
stream_arn=stream_arn,
consumer_arn=consumer_arn,
aws_role_arn=aws_role_arn,
gcp_service_account=gcp_service_account,
)
),
),
update_mask=field_mask_pb2.FieldMask(
paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")


# [END pubsub_update_topic_kinesis_ingestion]


def delete_topic(project_id: str, topic_id: str) -> None:
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_topic]
Expand Down Expand Up @@ -483,6 +531,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
create_topic_kinesis_ingestion_parser.add_argument("consumer_arn")
create_topic_kinesis_ingestion_parser.add_argument("aws_role_arn")
create_topic_kinesis_ingestion_parser.add_argument("gcp_service_account")

update_topic_kinesis_ingestion_parser = subparsers.add_parser(
"update_kinesis_ingestion", help=update_topic_kinesis_ingestion.__doc__
)
update_topic_kinesis_ingestion_parser.add_argument("topic_id")
update_topic_kinesis_ingestion_parser.add_argument("stream_arn")
update_topic_kinesis_ingestion_parser.add_argument("consumer_arn")
update_topic_kinesis_ingestion_parser.add_argument("aws_role_arn")
update_topic_kinesis_ingestion_parser.add_argument("gcp_service_account")

delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__)
delete_parser.add_argument("topic_id")
Expand Down Expand Up @@ -553,6 +610,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
args.aws_role_arn,
args.gcp_service_account,
)
elif args.command == "update_kinesis_ingestion":
update_topic_kinesis_ingestion(
args.project_id,
args.topic_id,
args.stream_arn,
args.consumer_arn,
args.aws_role_arn,
args.gcp_service_account,
)
elif args.command == "delete":
delete_topic(args.project_id, args.topic_id)
elif args.command == "publish":
Expand Down
45 changes: 45 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def test_create(
out, _ = capsys.readouterr()
assert f"Created topic: {topic_path}" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_create_kinesis_ingestion(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
Expand Down Expand Up @@ -155,6 +158,48 @@ def test_create_kinesis_ingestion(
out, _ = capsys.readouterr()
assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})

def test_update_kinesis_ingestion(
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
) -> None:
# The scope of `topic_path` is limited to this function.
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)

# Outside of automated CI tests, these values must be of actual AWS resources for the test to pass.
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
gcp_service_account = (
"[email protected]"
)

try:
publisher_client.delete_topic(request={"topic": topic_path})
except NotFound:
pass

publisher.create_topic(PROJECT_ID, TOPIC_ID)

out, _ = capsys.readouterr()
assert f"Created topic: {topic_path}" in out

publisher.update_topic_kinesis_ingestion(
PROJECT_ID,
TOPIC_ID,
stream_arn,
consumer_arn,
aws_role_arn,
gcp_service_account,
)

out, _ = capsys.readouterr()
assert f"Updated topic: {topic_path} with AWS Kinesis Ingestion Settings" in out

# Clean up resource created for the test.
publisher_client.delete_topic(request={"topic": topic_path})


def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None:
publisher.list_topics(PROJECT_ID)
Expand Down

0 comments on commit 332ed86

Please sign in to comment.