diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index d2be927b8..7cb7ca223 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -103,6 +103,88 @@ def create_topic_with_kinesis_ingestion( # [END pubsub_create_topic_with_kinesis_ingestion] +def create_topic_with_cloud_storage_ingestion( + project_id: str, + topic_id: str, + bucket: str, + input_format: str, + text_delimiter: str, + match_glob: str, + minimum_object_create_time: str, +) -> None: + """Create a new Pub/Sub topic with Cloud Storage Ingestion Settings.""" + # [START pubsub_create_topic_with_cloud_storage_ingestion] + from google.cloud import pubsub_v1 + from google.protobuf import timestamp_pb2 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # bucket = "your-bucket" + # input_format = "text" (can be one of "text", "avro", "pubsub_avro") + # text_delimiter = "\n" + # match_glob = "**.txt" + # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + cloud_storage_settings = IngestionDataSourceSettings.CloudStorage( + bucket=bucket, + ) + if input_format == "text": + cloud_storage_settings.text_format = ( + IngestionDataSourceSettings.CloudStorage.TextFormat( + delimiter=text_delimiter + ) + ) + elif input_format == "avro": + cloud_storage_settings.avro_format = ( + IngestionDataSourceSettings.CloudStorage.AvroFormat() + ) + elif input_format == "pubsub_avro": + cloud_storage_settings.pubsub_avro_format = ( + IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat() + ) + else: + print( + "Invalid input_format: " + + input_format + + "; must be in ('text', 'avro', 'pubsub_avro')" + ) + return + + if match_glob: + cloud_storage_settings.match_glob = match_glob + + if minimum_object_create_time: + try: + minimum_object_create_time_timestamp = timestamp_pb2.Timestamp() + minimum_object_create_time_timestamp.FromJsonString( + minimum_object_create_time + ) + cloud_storage_settings.minimum_object_create_time = ( + minimum_object_create_time_timestamp + ) + except ValueError: + print("Invalid minimum_object_create_time: " + minimum_object_create_time) + return + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + cloud_storage=cloud_storage_settings, + ), + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings") + # [END pubsub_create_topic_with_cloud_storage_ingestion] + + def update_topic_type( project_id: str, topic_id: str, @@ -615,6 +697,19 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_topic_with_kinesis_ingestion_parser.add_argument("aws_role_arn") create_topic_with_kinesis_ingestion_parser.add_argument("gcp_service_account") + create_topic_with_cloud_storage_ingestion_parser = subparsers.add_parser( + "create_cloud_storage_ingestion", + help=create_topic_with_cloud_storage_ingestion.__doc__, + ) + create_topic_with_cloud_storage_ingestion_parser.add_argument("topic_id") + create_topic_with_cloud_storage_ingestion_parser.add_argument("bucket") + create_topic_with_cloud_storage_ingestion_parser.add_argument("input_format") + create_topic_with_cloud_storage_ingestion_parser.add_argument("text_delimiter") + create_topic_with_cloud_storage_ingestion_parser.add_argument("match_glob") + create_topic_with_cloud_storage_ingestion_parser.add_argument( + "minimum_object_create_time" + ) + update_topic_type_parser = subparsers.add_parser( "update_kinesis_ingestion", help=update_topic_type.__doc__ ) @@ -693,6 +788,16 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: args.aws_role_arn, args.gcp_service_account, ) + elif args.command == "create_cloud_storage_ingestion": + create_topic_with_cloud_storage_ingestion( + args.project_id, + args.topic_id, + args.bucket, + args.input_format, + args.text_delimiter, + args.match_glob, + args.minimum_object_create_time, + ) elif args.command == "update_kinesis_ingestion": update_topic_type( args.project_id, diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index adb015e8a..6f17305cb 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -162,6 +162,40 @@ def test_create_topic_with_kinesis_ingestion( publisher_client.delete_topic(request={"topic": topic_path}) +def test_create_topic_with_cloud_storage_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + bucket = "pubsub-cloud-storage-bucket" + input_format = "text" + text_delimiter = "," + match_glob = "**.txt" + minimum_object_create_time = "1970-01-01T00:00:01Z" + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_with_cloud_storage_ingestion( + PROJECT_ID, + TOPIC_ID, + bucket, + input_format, + text_delimiter, + match_glob, + minimum_object_create_time, + ) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path} with Cloud Storage Ingestion Settings" in out + + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + + def test_update_topic_type( publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] ) -> None: diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 4e86dfa5b..3a16ebc94 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,4 +1,4 @@ -google-cloud-pubsub==2.25.0 +google-cloud-pubsub==2.26.0 avro==1.12.0 protobuf===4.24.4; python_version == '3.7' protobuf==5.28.0; python_version >= '3.8'