From 3dd43d6c9facc59c7c4913cac605aa95176cc857 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Date: Wed, 22 Feb 2023 17:14:44 -0500 Subject: [PATCH] Fix: Port proto changes (#871) feat: Add temporary_failed_ack_ids to ModifyAckDeadlineConfirmation fix: Add service_yaml_parameters to py_gapic_library BUILD.bazel targets docs: Clarify BigQueryConfig PERMISSION_DENIED state docs: Clarify subscription description docs: Replacing HTML code with Markdown docs: Fix PullResponse description docs: Fix Pull description feat: Add google.api.method.signature to update methods docs: Update Pub/Sub topic retention limit from 7 days to 31 days --- .../services/publisher/async_client.py | 36 ++++ google/pubsub_v1/services/publisher/client.py | 35 ++++ .../services/subscriber/async_client.py | 136 ++++++++++--- .../pubsub_v1/services/subscriber/client.py | 133 +++++++++--- .../services/subscriber/transports/base.py | 1 + .../services/subscriber/transports/grpc.py | 28 +-- .../subscriber/transports/grpc_asyncio.py | 28 +-- google/pubsub_v1/types/pubsub.py | 79 +++++--- .../snippet_metadata_google.pubsub.v1.json | 50 ++++- tests/unit/gapic/pubsub_v1/test_publisher.py | 97 +++++++++ .../gapic/pubsub_v1/test_schema_service.py | 7 + tests/unit/gapic/pubsub_v1/test_subscriber.py | 191 ++++++++++++++++++ 12 files changed, 694 insertions(+), 127 deletions(-) diff --git a/google/pubsub_v1/services/publisher/async_client.py b/google/pubsub_v1/services/publisher/async_client.py index b272df768..e749892f5 100644 --- a/google/pubsub_v1/services/publisher/async_client.py +++ b/google/pubsub_v1/services/publisher/async_client.py @@ -46,6 +46,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore from google.protobuf import duration_pb2 # type: ignore +from google.protobuf import field_mask_pb2 # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub from google.pubsub_v1.types import TimeoutType @@ -339,6 +340,8 @@ async def update_topic( self, request: Optional[Union[pubsub.UpdateTopicRequest, dict]] = None, *, + topic: Optional[pubsub.Topic] = None, + update_mask: Optional[field_mask_pb2.FieldMask] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), @@ -378,6 +381,23 @@ async def sample_update_topic(): Args: request (Optional[Union[google.pubsub_v1.types.UpdateTopicRequest, dict]]): The request object. Request for the UpdateTopic method. + topic (:class:`google.pubsub_v1.types.Topic`): + Required. The updated topic object. + This corresponds to the ``topic`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. + update_mask (:class:`google.protobuf.field_mask_pb2.FieldMask`): + Required. Indicates which fields in the provided topic + to update. Must be specified and non-empty. Note that if + ``update_mask`` contains "message_storage_policy" but + the ``message_storage_policy`` is not set in the + ``topic`` provided above, then the updated value is + determined by the policy configured at the project or + organization level. + + This corresponds to the ``update_mask`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. timeout (TimeoutType): @@ -390,8 +410,24 @@ async def sample_update_topic(): A topic resource. """ # Create or coerce a protobuf request object. + # Quick check: If we got a request object, we should *not* have + # gotten any keyword arguments that map to the request. + has_flattened_params = any([topic, update_mask]) + if request is not None and has_flattened_params: + raise ValueError( + "If the `request` argument is set, then none of " + "the individual field arguments should be set." + ) + request = pubsub.UpdateTopicRequest(request) + # If we have keyword arguments corresponding to fields on the + # request, apply these. + if topic is not None: + request.topic = topic + if update_mask is not None: + request.update_mask = update_mask + # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. rpc = gapic_v1.method_async.wrap_method( diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index df459dafd..a9684144f 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -51,6 +51,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore from google.protobuf import duration_pb2 # type: ignore +from google.protobuf import field_mask_pb2 # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub from google.pubsub_v1.types import TimeoutType @@ -610,6 +611,8 @@ def update_topic( self, request: Optional[Union[pubsub.UpdateTopicRequest, dict]] = None, *, + topic: Optional[pubsub.Topic] = None, + update_mask: Optional[field_mask_pb2.FieldMask] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), @@ -649,6 +652,23 @@ def sample_update_topic(): Args: request (Union[google.pubsub_v1.types.UpdateTopicRequest, dict]): The request object. Request for the UpdateTopic method. + topic (google.pubsub_v1.types.Topic): + Required. The updated topic object. + This corresponds to the ``topic`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. + update_mask (google.protobuf.field_mask_pb2.FieldMask): + Required. Indicates which fields in the provided topic + to update. Must be specified and non-empty. Note that if + ``update_mask`` contains "message_storage_policy" but + the ``message_storage_policy`` is not set in the + ``topic`` provided above, then the updated value is + determined by the policy configured at the project or + organization level. + + This corresponds to the ``update_mask`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. timeout (TimeoutType): @@ -661,12 +681,27 @@ def sample_update_topic(): A topic resource. """ # Create or coerce a protobuf request object. + # Quick check: If we got a request object, we should *not* have + # gotten any keyword arguments that map to the request. + has_flattened_params = any([topic, update_mask]) + if request is not None and has_flattened_params: + raise ValueError( + "If the `request` argument is set, then none of " + "the individual field arguments should be set." + ) + # Minor optimization to avoid making a copy if the user passes # in a pubsub.UpdateTopicRequest. # There's no risk of modifying the input as we've already verified # there are no flattened fields. if not isinstance(request, pubsub.UpdateTopicRequest): request = pubsub.UpdateTopicRequest(request) + # If we have keyword arguments corresponding to fields on the + # request, apply these. + if topic is not None: + request.topic = topic + if update_mask is not None: + request.update_mask = update_mask # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. diff --git a/google/pubsub_v1/services/subscriber/async_client.py b/google/pubsub_v1/services/subscriber/async_client.py index e832b0eba..dbe4fd0e7 100644 --- a/google/pubsub_v1/services/subscriber/async_client.py +++ b/google/pubsub_v1/services/subscriber/async_client.py @@ -49,6 +49,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore from google.protobuf import duration_pb2 # type: ignore +from google.protobuf import field_mask_pb2 # type: ignore from google.protobuf import timestamp_pb2 # type: ignore from google.pubsub_v1.services.subscriber import pagers from google.pubsub_v1.types import pubsub @@ -278,7 +279,10 @@ async def sample_create_subscription(): Args: request (Optional[Union[google.pubsub_v1.types.Subscription, dict]]): - The request object. A subscription resource. + The request object. A subscription resource. If none of + `push_config` or `bigquery_config` is set, then the + subscriber will pull and ack messages using API methods. + At most one of these fields may be set. name (:class:`str`): Required. The name of the subscription. It must have the format @@ -304,11 +308,9 @@ async def sample_create_subscription(): on the ``request`` instance; if ``request`` is provided, this should not be set. push_config (:class:`google.pubsub_v1.types.PushConfig`): - If push delivery is used with this subscription, this - field is used to configure it. Either ``pushConfig`` or - ``bigQueryConfig`` can be set, but not both. If both are - empty, then the subscriber will pull and ack messages - using API methods. + If push delivery is used with this + subscription, this field is used to + configure it. This corresponds to the ``push_config`` field on the ``request`` instance; if ``request`` is provided, this @@ -318,7 +320,7 @@ async def sample_create_subscription(): Pub/Sub waits for the subscriber to acknowledge receipt before resending the message. In the interval after the message is delivered and before it is acknowledged, it - is considered to be outstanding. During that time + is considered to be *outstanding*. During that time period, the message will not be redelivered (on a best-effort basis). @@ -350,7 +352,11 @@ async def sample_create_subscription(): Returns: google.pubsub_v1.types.Subscription: - A subscription resource. + A subscription resource. If none of push_config or bigquery_config is + set, then the subscriber will pull and ack messages + using API methods. At most one of these fields may be + set. + """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -467,7 +473,11 @@ async def sample_get_subscription(): Returns: google.pubsub_v1.types.Subscription: - A subscription resource. + A subscription resource. If none of push_config or bigquery_config is + set, then the subscriber will pull and ack messages + using API methods. At most one of these fields may be + set. + """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -528,6 +538,8 @@ async def update_subscription( self, request: Optional[Union[pubsub.UpdateSubscriptionRequest, dict]] = None, *, + subscription: Optional[pubsub.Subscription] = None, + update_mask: Optional[field_mask_pb2.FieldMask] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), @@ -570,6 +582,21 @@ async def sample_update_subscription(): request (Optional[Union[google.pubsub_v1.types.UpdateSubscriptionRequest, dict]]): The request object. Request for the UpdateSubscription method. + subscription (:class:`google.pubsub_v1.types.Subscription`): + Required. The updated subscription + object. + + This corresponds to the ``subscription`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. + update_mask (:class:`google.protobuf.field_mask_pb2.FieldMask`): + Required. Indicates which fields in + the provided subscription to update. + Must be specified and non-empty. + + This corresponds to the ``update_mask`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. timeout (float): The timeout for this request. @@ -578,11 +605,31 @@ async def sample_update_subscription(): Returns: google.pubsub_v1.types.Subscription: - A subscription resource. + A subscription resource. If none of push_config or bigquery_config is + set, then the subscriber will pull and ack messages + using API methods. At most one of these fields may be + set. + """ # Create or coerce a protobuf request object. + # Quick check: If we got a request object, we should *not* have + # gotten any keyword arguments that map to the request. + has_flattened_params = any([subscription, update_mask]) + if request is not None and has_flattened_params: + raise ValueError( + "If the `request` argument is set, then none of " + "the individual field arguments should be set." + ) + request = pubsub.UpdateSubscriptionRequest(request) + # If we have keyword arguments corresponding to fields on the + # request, apply these. + if subscription is not None: + request.subscription = subscription + if update_mask is not None: + request.update_mask = update_mask + # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. rpc = gapic_v1.method_async.wrap_method( @@ -1116,9 +1163,7 @@ async def pull( timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PullResponse: - r"""Pulls messages from the server. The server may return - ``UNAVAILABLE`` if there are too many concurrent pull requests - pending for the given subscription. + r"""Pulls messages from the server. .. code-block:: python @@ -1229,6 +1274,7 @@ async def sample_pull(): multiplier=1.3, predicate=retries.if_exception_type( core_exceptions.Aborted, + core_exceptions.InternalServerError, core_exceptions.ServiceUnavailable, core_exceptions.Unknown, ), @@ -1495,13 +1541,12 @@ async def get_snapshot( timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Snapshot: - r"""Gets the configuration details of a snapshot. - Snapshots are used in Seek - operations, which allow you to manage message - acknowledgments in bulk. That is, you can set the - acknowledgment state of messages in an existing - subscription to the state captured by a snapshot. + r"""Gets the configuration details of a snapshot. Snapshots are used + in + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. .. code-block:: python @@ -1803,9 +1848,10 @@ async def sample_create_snapshot(): name is not provided in the request, the server will assign a random name for this snapshot on the same project as the subscription. Note that for REST API - requests, you must specify a name. See the resource name - rules. Format is - ``projects/{project}/snapshots/{snap}``. + requests, you must specify a name. See the `resource + name + rules `__. + Format is ``projects/{project}/snapshots/{snap}``. This corresponds to the ``name`` field on the ``request`` instance; if ``request`` is provided, this @@ -1898,18 +1944,17 @@ async def update_snapshot( self, request: Optional[Union[pubsub.UpdateSnapshotRequest, dict]] = None, *, + snapshot: Optional[pubsub.Snapshot] = None, + update_mask: Optional[field_mask_pb2.FieldMask] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Snapshot: r"""Updates an existing snapshot. Snapshots are used in - Seek - operations, which allow - you to manage message acknowledgments in bulk. That is, - you can set the acknowledgment state of messages in an - existing subscription to the state captured by a - snapshot. + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. .. code-block:: python @@ -1940,6 +1985,21 @@ async def sample_update_snapshot(): request (Optional[Union[google.pubsub_v1.types.UpdateSnapshotRequest, dict]]): The request object. Request for the UpdateSnapshot method. + snapshot (:class:`google.pubsub_v1.types.Snapshot`): + Required. The updated snapshot + object. + + This corresponds to the ``snapshot`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. + update_mask (:class:`google.protobuf.field_mask_pb2.FieldMask`): + Required. Indicates which fields in + the provided snapshot to update. Must be + specified and non-empty. + + This corresponds to the ``update_mask`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. timeout (float): The timeout for this request. @@ -1957,8 +2017,24 @@ async def sample_update_snapshot(): """ # Create or coerce a protobuf request object. + # Quick check: If we got a request object, we should *not* have + # gotten any keyword arguments that map to the request. + has_flattened_params = any([snapshot, update_mask]) + if request is not None and has_flattened_params: + raise ValueError( + "If the `request` argument is set, then none of " + "the individual field arguments should be set." + ) + request = pubsub.UpdateSnapshotRequest(request) + # If we have keyword arguments corresponding to fields on the + # request, apply these. + if snapshot is not None: + request.snapshot = snapshot + if update_mask is not None: + request.update_mask = update_mask + # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. rpc = gapic_v1.method_async.wrap_method( diff --git a/google/pubsub_v1/services/subscriber/client.py b/google/pubsub_v1/services/subscriber/client.py index eebfb8736..816275ef7 100644 --- a/google/pubsub_v1/services/subscriber/client.py +++ b/google/pubsub_v1/services/subscriber/client.py @@ -53,6 +53,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore from google.protobuf import duration_pb2 # type: ignore +from google.protobuf import field_mask_pb2 # type: ignore from google.protobuf import timestamp_pb2 # type: ignore from google.pubsub_v1.services.subscriber import pagers from google.pubsub_v1.types import pubsub @@ -557,7 +558,10 @@ def sample_create_subscription(): Args: request (Union[google.pubsub_v1.types.Subscription, dict]): - The request object. A subscription resource. + The request object. A subscription resource. If none of + `push_config` or `bigquery_config` is set, then the + subscriber will pull and ack messages using API methods. + At most one of these fields may be set. name (str): Required. The name of the subscription. It must have the format @@ -583,11 +587,9 @@ def sample_create_subscription(): on the ``request`` instance; if ``request`` is provided, this should not be set. push_config (google.pubsub_v1.types.PushConfig): - If push delivery is used with this subscription, this - field is used to configure it. Either ``pushConfig`` or - ``bigQueryConfig`` can be set, but not both. If both are - empty, then the subscriber will pull and ack messages - using API methods. + If push delivery is used with this + subscription, this field is used to + configure it. This corresponds to the ``push_config`` field on the ``request`` instance; if ``request`` is provided, this @@ -597,7 +599,7 @@ def sample_create_subscription(): Pub/Sub waits for the subscriber to acknowledge receipt before resending the message. In the interval after the message is delivered and before it is acknowledged, it - is considered to be outstanding. During that time + is considered to be *outstanding*. During that time period, the message will not be redelivered (on a best-effort basis). @@ -629,7 +631,11 @@ def sample_create_subscription(): Returns: google.pubsub_v1.types.Subscription: - A subscription resource. + A subscription resource. If none of push_config or bigquery_config is + set, then the subscriber will pull and ack messages + using API methods. At most one of these fields may be + set. + """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -735,7 +741,11 @@ def sample_get_subscription(): Returns: google.pubsub_v1.types.Subscription: - A subscription resource. + A subscription resource. If none of push_config or bigquery_config is + set, then the subscriber will pull and ack messages + using API methods. At most one of these fields may be + set. + """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -785,6 +795,8 @@ def update_subscription( self, request: Optional[Union[pubsub.UpdateSubscriptionRequest, dict]] = None, *, + subscription: Optional[pubsub.Subscription] = None, + update_mask: Optional[field_mask_pb2.FieldMask] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), @@ -827,6 +839,21 @@ def sample_update_subscription(): request (Union[google.pubsub_v1.types.UpdateSubscriptionRequest, dict]): The request object. Request for the UpdateSubscription method. + subscription (google.pubsub_v1.types.Subscription): + Required. The updated subscription + object. + + This corresponds to the ``subscription`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. + update_mask (google.protobuf.field_mask_pb2.FieldMask): + Required. Indicates which fields in + the provided subscription to update. + Must be specified and non-empty. + + This corresponds to the ``update_mask`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. timeout (float): The timeout for this request. @@ -835,15 +862,34 @@ def sample_update_subscription(): Returns: google.pubsub_v1.types.Subscription: - A subscription resource. + A subscription resource. If none of push_config or bigquery_config is + set, then the subscriber will pull and ack messages + using API methods. At most one of these fields may be + set. + """ # Create or coerce a protobuf request object. + # Quick check: If we got a request object, we should *not* have + # gotten any keyword arguments that map to the request. + has_flattened_params = any([subscription, update_mask]) + if request is not None and has_flattened_params: + raise ValueError( + "If the `request` argument is set, then none of " + "the individual field arguments should be set." + ) + # Minor optimization to avoid making a copy if the user passes # in a pubsub.UpdateSubscriptionRequest. # There's no risk of modifying the input as we've already verified # there are no flattened fields. if not isinstance(request, pubsub.UpdateSubscriptionRequest): request = pubsub.UpdateSubscriptionRequest(request) + # If we have keyword arguments corresponding to fields on the + # request, apply these. + if subscription is not None: + request.subscription = subscription + if update_mask is not None: + request.update_mask = update_mask # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. @@ -1327,9 +1373,7 @@ def pull( timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PullResponse: - r"""Pulls messages from the server. The server may return - ``UNAVAILABLE`` if there are too many concurrent pull requests - pending for the given subscription. + r"""Pulls messages from the server. .. code-block:: python @@ -1674,13 +1718,12 @@ def get_snapshot( timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Snapshot: - r"""Gets the configuration details of a snapshot. - Snapshots are used in Seek - operations, which allow you to manage message - acknowledgments in bulk. That is, you can set the - acknowledgment state of messages in an existing - subscription to the state captured by a snapshot. + r"""Gets the configuration details of a snapshot. Snapshots are used + in + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. .. code-block:: python @@ -1960,9 +2003,10 @@ def sample_create_snapshot(): name is not provided in the request, the server will assign a random name for this snapshot on the same project as the subscription. Note that for REST API - requests, you must specify a name. See the resource name - rules. Format is - ``projects/{project}/snapshots/{snap}``. + requests, you must specify a name. See the `resource + name + rules `__. + Format is ``projects/{project}/snapshots/{snap}``. This corresponds to the ``name`` field on the ``request`` instance; if ``request`` is provided, this @@ -2046,18 +2090,17 @@ def update_snapshot( self, request: Optional[Union[pubsub.UpdateSnapshotRequest, dict]] = None, *, + snapshot: Optional[pubsub.Snapshot] = None, + update_mask: Optional[field_mask_pb2.FieldMask] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Snapshot: r"""Updates an existing snapshot. Snapshots are used in - Seek - operations, which allow - you to manage message acknowledgments in bulk. That is, - you can set the acknowledgment state of messages in an - existing subscription to the state captured by a - snapshot. + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. .. code-block:: python @@ -2088,6 +2131,21 @@ def sample_update_snapshot(): request (Union[google.pubsub_v1.types.UpdateSnapshotRequest, dict]): The request object. Request for the UpdateSnapshot method. + snapshot (google.pubsub_v1.types.Snapshot): + Required. The updated snapshot + object. + + This corresponds to the ``snapshot`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. + update_mask (google.protobuf.field_mask_pb2.FieldMask): + Required. Indicates which fields in + the provided snapshot to update. Must be + specified and non-empty. + + This corresponds to the ``update_mask`` field + on the ``request`` instance; if ``request`` is provided, this + should not be set. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. timeout (float): The timeout for this request. @@ -2105,12 +2163,27 @@ def sample_update_snapshot(): """ # Create or coerce a protobuf request object. + # Quick check: If we got a request object, we should *not* have + # gotten any keyword arguments that map to the request. + has_flattened_params = any([snapshot, update_mask]) + if request is not None and has_flattened_params: + raise ValueError( + "If the `request` argument is set, then none of " + "the individual field arguments should be set." + ) + # Minor optimization to avoid making a copy if the user passes # in a pubsub.UpdateSnapshotRequest. # There's no risk of modifying the input as we've already verified # there are no flattened fields. if not isinstance(request, pubsub.UpdateSnapshotRequest): request = pubsub.UpdateSnapshotRequest(request) + # If we have keyword arguments corresponding to fields on the + # request, apply these. + if snapshot is not None: + request.snapshot = snapshot + if update_mask is not None: + request.update_mask = update_mask # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. diff --git a/google/pubsub_v1/services/subscriber/transports/base.py b/google/pubsub_v1/services/subscriber/transports/base.py index 2cf93a726..ea2991f39 100644 --- a/google/pubsub_v1/services/subscriber/transports/base.py +++ b/google/pubsub_v1/services/subscriber/transports/base.py @@ -241,6 +241,7 @@ def _prep_wrapped_messages(self, client_info): multiplier=1.3, predicate=retries.if_exception_type( core_exceptions.Aborted, + core_exceptions.InternalServerError, core_exceptions.ServiceUnavailable, core_exceptions.Unknown, ), diff --git a/google/pubsub_v1/services/subscriber/transports/grpc.py b/google/pubsub_v1/services/subscriber/transports/grpc.py index 5a0cf0d8e..4667bf707 100644 --- a/google/pubsub_v1/services/subscriber/transports/grpc.py +++ b/google/pubsub_v1/services/subscriber/transports/grpc.py @@ -452,9 +452,7 @@ def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]: def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]: r"""Return a callable for the pull method over gRPC. - Pulls messages from the server. The server may return - ``UNAVAILABLE`` if there are too many concurrent pull requests - pending for the given subscription. + Pulls messages from the server. Returns: Callable[[~.PullRequest], @@ -543,13 +541,12 @@ def modify_push_config( def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]: r"""Return a callable for the get snapshot method over gRPC. - Gets the configuration details of a snapshot. - Snapshots are used in Seek - operations, which allow you to manage message - acknowledgments in bulk. That is, you can set the - acknowledgment state of messages in an existing - subscription to the state captured by a snapshot. + Gets the configuration details of a snapshot. Snapshots are used + in + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. Returns: Callable[[~.GetSnapshotRequest], @@ -650,13 +647,10 @@ def update_snapshot( r"""Return a callable for the update snapshot method over gRPC. Updates an existing snapshot. Snapshots are used in - Seek - operations, which allow - you to manage message acknowledgments in bulk. That is, - you can set the acknowledgment state of messages in an - existing subscription to the state captured by a - snapshot. + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. Returns: Callable[[~.UpdateSnapshotRequest], diff --git a/google/pubsub_v1/services/subscriber/transports/grpc_asyncio.py b/google/pubsub_v1/services/subscriber/transports/grpc_asyncio.py index edcaf4911..9a266b428 100644 --- a/google/pubsub_v1/services/subscriber/transports/grpc_asyncio.py +++ b/google/pubsub_v1/services/subscriber/transports/grpc_asyncio.py @@ -459,9 +459,7 @@ def acknowledge( def pull(self) -> Callable[[pubsub.PullRequest], Awaitable[pubsub.PullResponse]]: r"""Return a callable for the pull method over gRPC. - Pulls messages from the server. The server may return - ``UNAVAILABLE`` if there are too many concurrent pull requests - pending for the given subscription. + Pulls messages from the server. Returns: Callable[[~.PullRequest], @@ -554,13 +552,12 @@ def get_snapshot( ) -> Callable[[pubsub.GetSnapshotRequest], Awaitable[pubsub.Snapshot]]: r"""Return a callable for the get snapshot method over gRPC. - Gets the configuration details of a snapshot. - Snapshots are used in Seek - operations, which allow you to manage message - acknowledgments in bulk. That is, you can set the - acknowledgment state of messages in an existing - subscription to the state captured by a snapshot. + Gets the configuration details of a snapshot. Snapshots are used + in + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. Returns: Callable[[~.GetSnapshotRequest], @@ -663,13 +660,10 @@ def update_snapshot( r"""Return a callable for the update snapshot method over gRPC. Updates an existing snapshot. Snapshots are used in - Seek - operations, which allow - you to manage message acknowledgments in bulk. That is, - you can set the acknowledgment state of messages in an - existing subscription to the state captured by a - snapshot. + `Seek `__ + operations, which allow you to manage message acknowledgments in + bulk. That is, you can set the acknowledgment state of messages + in an existing subscription to the state captured by a snapshot. Returns: Callable[[~.UpdateSnapshotRequest], diff --git a/google/pubsub_v1/types/pubsub.py b/google/pubsub_v1/types/pubsub.py index 878b1f381..3e2f225ad 100644 --- a/google/pubsub_v1/types/pubsub.py +++ b/google/pubsub_v1/types/pubsub.py @@ -184,7 +184,7 @@ class Topic(proto.Message): timestamp `__ that is up to ``message_retention_duration`` in the past. If this field is not set, message retention is controlled by - settings on individual subscriptions. Cannot be more than 7 + settings on individual subscriptions. Cannot be more than 31 days or less than 10 minutes. """ @@ -581,7 +581,9 @@ class DetachSubscriptionResponse(proto.Message): class Subscription(proto.Message): - r"""A subscription resource. + r"""A subscription resource. If none of ``push_config`` or + ``bigquery_config`` is set, then the subscriber will pull and ack + messages using API methods. At most one of these fields may be set. Attributes: name (str): @@ -601,23 +603,19 @@ class Subscription(proto.Message): field will be ``_deleted-topic_`` if the topic has been deleted. push_config (google.pubsub_v1.types.PushConfig): - If push delivery is used with this subscription, this field - is used to configure it. Either ``pushConfig`` or - ``bigQueryConfig`` can be set, but not both. If both are - empty, then the subscriber will pull and ack messages using - API methods. + If push delivery is used with this + subscription, this field is used to configure + it. bigquery_config (google.pubsub_v1.types.BigQueryConfig): - If delivery to BigQuery is used with this subscription, this - field is used to configure it. Either ``pushConfig`` or - ``bigQueryConfig`` can be set, but not both. If both are - empty, then the subscriber will pull and ack messages using - API methods. + If delivery to BigQuery is used with this + subscription, this field is used to configure + it. ack_deadline_seconds (int): The approximate amount of time (on a best-effort basis) Pub/Sub waits for the subscriber to acknowledge receipt before resending the message. In the interval after the message is delivered and before it is acknowledged, it is - considered to be outstanding. During that time period, the + considered to be *outstanding*. During that time period, the message will not be redelivered (on a best-effort basis). For pull subscriptions, this value is used as the initial @@ -652,9 +650,8 @@ class Subscription(proto.Message): Defaults to 7 days. Cannot be more than 7 days or less than 10 minutes. labels (MutableMapping[str, str]): - See - Creating and managing labels. + See `Creating and managing + labels `__. enable_message_ordering (bool): If true, messages published with the same ``ordering_key`` in ``PubsubMessage`` will be delivered to the subscribers in @@ -977,11 +974,7 @@ class PushConfig(proto.Message): - ``v1`` or ``v1beta2``: uses the push format defined in the v1 Pub/Sub API. - For example: - - .. raw:: html - -
attributes { "x-goog-version": "v1" } 
+ For example: ``attributes { "x-goog-version": "v1" }`` oidc_token (google.pubsub_v1.types.PushConfig.OidcToken): If specified, Pub/Sub will generate and attach an OIDC JWT token as an ``Authorization`` header in the HTTP request for @@ -1047,7 +1040,7 @@ class BigQueryConfig(proto.Message): Attributes: table (str): The name of the table to which to write data, - of the form {projectId}:{datasetId}.{tableId} + of the form {projectId}.{datasetId}.{tableId} use_topic_schema (bool): When true, use the topic's schema as the columns to write to in BigQuery, if it exists. @@ -1081,8 +1074,15 @@ class State(proto.Enum): The subscription can actively send messages to BigQuery PERMISSION_DENIED (2): - Cannot write to the BigQuery table because of - permission denied errors. + Cannot write to the BigQuery table because of permission + denied errors. This can happen if + + - Pub/Sub SA has not been granted the `appropriate BigQuery + IAM + permissions `__ + - bigquery.googleapis.com API is not enabled for the + project + (`instructions `__) NOT_FOUND (3): Cannot write to the BigQuery table because it does not exist. @@ -1347,9 +1347,10 @@ class PullResponse(proto.Message): Attributes: received_messages (MutableSequence[google.pubsub_v1.types.ReceivedMessage]): Received Pub/Sub messages. The list will be empty if there - are no more messages available in the backlog. For JSON, the - response can be entirely empty. The Pub/Sub system may - return fewer than the ``maxMessages`` requested even if + are no more messages available in the backlog, or if no + messages could be returned before the request timeout. For + JSON, the response can be entirely empty. The Pub/Sub system + may return fewer than the ``maxMessages`` requested even if there are more messages available in the backlog. """ @@ -1570,6 +1571,9 @@ class AcknowledgeConfirmation(proto.Message): unordered_ack_ids (MutableSequence[str]): List of acknowledgement IDs that were out of order. + temporary_failed_ack_ids (MutableSequence[str]): + List of acknowledgement IDs that failed + processing with temporary issues. """ ack_ids: MutableSequence[str] = proto.RepeatedField( @@ -1584,6 +1588,10 @@ class AcknowledgeConfirmation(proto.Message): proto.STRING, number=3, ) + temporary_failed_ack_ids: MutableSequence[str] = proto.RepeatedField( + proto.STRING, + number=4, + ) class ModifyAckDeadlineConfirmation(proto.Message): r"""Acknowledgement IDs sent in one or more previous requests to @@ -1596,6 +1604,9 @@ class ModifyAckDeadlineConfirmation(proto.Message): List of acknowledgement IDs that were malformed or whose acknowledgement deadline has expired. + temporary_failed_ack_ids (MutableSequence[str]): + List of acknowledgement IDs that failed + processing with temporary issues. """ ack_ids: MutableSequence[str] = proto.RepeatedField( @@ -1606,6 +1617,10 @@ class ModifyAckDeadlineConfirmation(proto.Message): proto.STRING, number=2, ) + temporary_failed_ack_ids: MutableSequence[str] = proto.RepeatedField( + proto.STRING, + number=3, + ) class SubscriptionProperties(proto.Message): r"""Subscription properties sent as part of the response. @@ -1659,8 +1674,9 @@ class CreateSnapshotRequest(proto.Message): is not provided in the request, the server will assign a random name for this snapshot on the same project as the subscription. Note that for REST API requests, you must - specify a name. See the resource name rules. Format is - ``projects/{project}/snapshots/{snap}``. + specify a name. See the `resource name + rules `__. + Format is ``projects/{project}/snapshots/{snap}``. subscription (str): Required. The subscription whose backlog the snapshot retains. Specifically, the created snapshot is guaranteed to @@ -1673,9 +1689,8 @@ class CreateSnapshotRequest(proto.Message): CreateSnapshot request. Format is ``projects/{project}/subscriptions/{sub}``. labels (MutableMapping[str, str]): - See - Creating and managing labels. + See `Creating and managing + labels `__. """ name: str = proto.Field( diff --git a/samples/generated_samples/snippet_metadata_google.pubsub.v1.json b/samples/generated_samples/snippet_metadata_google.pubsub.v1.json index e0f3623a9..d66015ac4 100644 --- a/samples/generated_samples/snippet_metadata_google.pubsub.v1.json +++ b/samples/generated_samples/snippet_metadata_google.pubsub.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-pubsub", - "version": "2.14.1" + "version": "0.1.0" }, "snippets": [ { @@ -1315,6 +1315,14 @@ "name": "request", "type": "google.pubsub_v1.types.UpdateTopicRequest" }, + { + "name": "topic", + "type": "google.pubsub_v1.types.Topic" + }, + { + "name": "update_mask", + "type": "google.protobuf.field_mask_pb2.FieldMask" + }, { "name": "retry", "type": "google.api_core.retry.Retry" @@ -1391,6 +1399,14 @@ "name": "request", "type": "google.pubsub_v1.types.UpdateTopicRequest" }, + { + "name": "topic", + "type": "google.pubsub_v1.types.Topic" + }, + { + "name": "update_mask", + "type": "google.protobuf.field_mask_pb2.FieldMask" + }, { "name": "retry", "type": "google.api_core.retry.Retry" @@ -5400,6 +5416,14 @@ "name": "request", "type": "google.pubsub_v1.types.UpdateSnapshotRequest" }, + { + "name": "snapshot", + "type": "google.pubsub_v1.types.Snapshot" + }, + { + "name": "update_mask", + "type": "google.protobuf.field_mask_pb2.FieldMask" + }, { "name": "retry", "type": "google.api_core.retry.Retry" @@ -5476,6 +5500,14 @@ "name": "request", "type": "google.pubsub_v1.types.UpdateSnapshotRequest" }, + { + "name": "snapshot", + "type": "google.pubsub_v1.types.Snapshot" + }, + { + "name": "update_mask", + "type": "google.protobuf.field_mask_pb2.FieldMask" + }, { "name": "retry", "type": "google.api_core.retry.Retry" @@ -5553,6 +5585,14 @@ "name": "request", "type": "google.pubsub_v1.types.UpdateSubscriptionRequest" }, + { + "name": "subscription", + "type": "google.pubsub_v1.types.Subscription" + }, + { + "name": "update_mask", + "type": "google.protobuf.field_mask_pb2.FieldMask" + }, { "name": "retry", "type": "google.api_core.retry.Retry" @@ -5629,6 +5669,14 @@ "name": "request", "type": "google.pubsub_v1.types.UpdateSubscriptionRequest" }, + { + "name": "subscription", + "type": "google.pubsub_v1.types.Subscription" + }, + { + "name": "update_mask", + "type": "google.protobuf.field_mask_pb2.FieldMask" + }, { "name": "retry", "type": "google.api_core.retry.Retry" diff --git a/tests/unit/gapic/pubsub_v1/test_publisher.py b/tests/unit/gapic/pubsub_v1/test_publisher.py index 1b86bb735..6badf82d6 100644 --- a/tests/unit/gapic/pubsub_v1/test_publisher.py +++ b/tests/unit/gapic/pubsub_v1/test_publisher.py @@ -24,10 +24,17 @@ import grpc from grpc.experimental import aio +from collections.abc import Iterable +from google.protobuf import json_format +import json import math import pytest from proto.marshal.rules.dates import DurationRule, TimestampRule from proto.marshal.rules import wrappers +from requests import Response +from requests import Request, PreparedRequest +from requests.sessions import Session +from google.protobuf import json_format from google.api_core import client_options from google.api_core import exceptions as core_exceptions @@ -1057,6 +1064,96 @@ async def test_update_topic_field_headers_async(): ) in kw["metadata"] +def test_update_topic_flattened(): + client = PublisherClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.update_topic), "__call__") as call: + # Designate an appropriate return value for the call. + call.return_value = pubsub.Topic() + # Call the method with a truthy value for each flattened field, + # using the keyword arguments to the method. + client.update_topic( + topic=pubsub.Topic(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + # Establish that the underlying call was made with the expected + # request object values. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + arg = args[0].topic + mock_val = pubsub.Topic(name="name_value") + assert arg == mock_val + arg = args[0].update_mask + mock_val = field_mask_pb2.FieldMask(paths=["paths_value"]) + assert arg == mock_val + + +def test_update_topic_flattened_error(): + client = PublisherClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Attempting to call a method with both a request object and flattened + # fields is an error. + with pytest.raises(ValueError): + client.update_topic( + pubsub.UpdateTopicRequest(), + topic=pubsub.Topic(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + +@pytest.mark.asyncio +async def test_update_topic_flattened_async(): + client = PublisherAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.update_topic), "__call__") as call: + # Designate an appropriate return value for the call. + call.return_value = pubsub.Topic() + + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall(pubsub.Topic()) + # Call the method with a truthy value for each flattened field, + # using the keyword arguments to the method. + response = await client.update_topic( + topic=pubsub.Topic(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + # Establish that the underlying call was made with the expected + # request object values. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + arg = args[0].topic + mock_val = pubsub.Topic(name="name_value") + assert arg == mock_val + arg = args[0].update_mask + mock_val = field_mask_pb2.FieldMask(paths=["paths_value"]) + assert arg == mock_val + + +@pytest.mark.asyncio +async def test_update_topic_flattened_error_async(): + client = PublisherAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Attempting to call a method with both a request object and flattened + # fields is an error. + with pytest.raises(ValueError): + await client.update_topic( + pubsub.UpdateTopicRequest(), + topic=pubsub.Topic(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + @pytest.mark.parametrize( "request_type", [ diff --git a/tests/unit/gapic/pubsub_v1/test_schema_service.py b/tests/unit/gapic/pubsub_v1/test_schema_service.py index 4c7ffd5ed..54b8d8ac3 100644 --- a/tests/unit/gapic/pubsub_v1/test_schema_service.py +++ b/tests/unit/gapic/pubsub_v1/test_schema_service.py @@ -24,10 +24,17 @@ import grpc from grpc.experimental import aio +from collections.abc import Iterable +from google.protobuf import json_format +import json import math import pytest from proto.marshal.rules.dates import DurationRule, TimestampRule from proto.marshal.rules import wrappers +from requests import Response +from requests import Request, PreparedRequest +from requests.sessions import Session +from google.protobuf import json_format from google.api_core import client_options from google.api_core import exceptions as core_exceptions diff --git a/tests/unit/gapic/pubsub_v1/test_subscriber.py b/tests/unit/gapic/pubsub_v1/test_subscriber.py index 79fd1bdbe..3be5857f3 100644 --- a/tests/unit/gapic/pubsub_v1/test_subscriber.py +++ b/tests/unit/gapic/pubsub_v1/test_subscriber.py @@ -25,10 +25,17 @@ import grpc from grpc.experimental import aio +from collections.abc import Iterable +from google.protobuf import json_format +import json import math import pytest from proto.marshal.rules.dates import DurationRule, TimestampRule from proto.marshal.rules import wrappers +from requests import Response +from requests import Request, PreparedRequest +from requests.sessions import Session +from google.protobuf import json_format from google.api_core import client_options from google.api_core import exceptions as core_exceptions @@ -1423,6 +1430,100 @@ async def test_update_subscription_field_headers_async(): ) in kw["metadata"] +def test_update_subscription_flattened(): + client = SubscriberClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.update_subscription), "__call__" + ) as call: + # Designate an appropriate return value for the call. + call.return_value = pubsub.Subscription() + # Call the method with a truthy value for each flattened field, + # using the keyword arguments to the method. + client.update_subscription( + subscription=pubsub.Subscription(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + # Establish that the underlying call was made with the expected + # request object values. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + arg = args[0].subscription + mock_val = pubsub.Subscription(name="name_value") + assert arg == mock_val + arg = args[0].update_mask + mock_val = field_mask_pb2.FieldMask(paths=["paths_value"]) + assert arg == mock_val + + +def test_update_subscription_flattened_error(): + client = SubscriberClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Attempting to call a method with both a request object and flattened + # fields is an error. + with pytest.raises(ValueError): + client.update_subscription( + pubsub.UpdateSubscriptionRequest(), + subscription=pubsub.Subscription(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + +@pytest.mark.asyncio +async def test_update_subscription_flattened_async(): + client = SubscriberAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.update_subscription), "__call__" + ) as call: + # Designate an appropriate return value for the call. + call.return_value = pubsub.Subscription() + + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall(pubsub.Subscription()) + # Call the method with a truthy value for each flattened field, + # using the keyword arguments to the method. + response = await client.update_subscription( + subscription=pubsub.Subscription(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + # Establish that the underlying call was made with the expected + # request object values. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + arg = args[0].subscription + mock_val = pubsub.Subscription(name="name_value") + assert arg == mock_val + arg = args[0].update_mask + mock_val = field_mask_pb2.FieldMask(paths=["paths_value"]) + assert arg == mock_val + + +@pytest.mark.asyncio +async def test_update_subscription_flattened_error_async(): + client = SubscriberAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Attempting to call a method with both a request object and flattened + # fields is an error. + with pytest.raises(ValueError): + await client.update_subscription( + pubsub.UpdateSubscriptionRequest(), + subscription=pubsub.Subscription(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + @pytest.mark.parametrize( "request_type", [ @@ -4189,6 +4290,96 @@ async def test_update_snapshot_field_headers_async(): ) in kw["metadata"] +def test_update_snapshot_flattened(): + client = SubscriberClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.update_snapshot), "__call__") as call: + # Designate an appropriate return value for the call. + call.return_value = pubsub.Snapshot() + # Call the method with a truthy value for each flattened field, + # using the keyword arguments to the method. + client.update_snapshot( + snapshot=pubsub.Snapshot(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + # Establish that the underlying call was made with the expected + # request object values. + assert len(call.mock_calls) == 1 + _, args, _ = call.mock_calls[0] + arg = args[0].snapshot + mock_val = pubsub.Snapshot(name="name_value") + assert arg == mock_val + arg = args[0].update_mask + mock_val = field_mask_pb2.FieldMask(paths=["paths_value"]) + assert arg == mock_val + + +def test_update_snapshot_flattened_error(): + client = SubscriberClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Attempting to call a method with both a request object and flattened + # fields is an error. + with pytest.raises(ValueError): + client.update_snapshot( + pubsub.UpdateSnapshotRequest(), + snapshot=pubsub.Snapshot(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + +@pytest.mark.asyncio +async def test_update_snapshot_flattened_async(): + client = SubscriberAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.update_snapshot), "__call__") as call: + # Designate an appropriate return value for the call. + call.return_value = pubsub.Snapshot() + + call.return_value = grpc_helpers_async.FakeUnaryUnaryCall(pubsub.Snapshot()) + # Call the method with a truthy value for each flattened field, + # using the keyword arguments to the method. + response = await client.update_snapshot( + snapshot=pubsub.Snapshot(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + # Establish that the underlying call was made with the expected + # request object values. + assert len(call.mock_calls) + _, args, _ = call.mock_calls[0] + arg = args[0].snapshot + mock_val = pubsub.Snapshot(name="name_value") + assert arg == mock_val + arg = args[0].update_mask + mock_val = field_mask_pb2.FieldMask(paths=["paths_value"]) + assert arg == mock_val + + +@pytest.mark.asyncio +async def test_update_snapshot_flattened_error_async(): + client = SubscriberAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), + ) + + # Attempting to call a method with both a request object and flattened + # fields is an error. + with pytest.raises(ValueError): + await client.update_snapshot( + pubsub.UpdateSnapshotRequest(), + snapshot=pubsub.Snapshot(name="name_value"), + update_mask=field_mask_pb2.FieldMask(paths=["paths_value"]), + ) + + @pytest.mark.parametrize( "request_type", [