Skip to content

Commit

Permalink
feat: add BigQuery configuration for subscriptions (#685)
Browse files Browse the repository at this point in the history
* feat: add BigQuery configuration for subscriptions

PiperOrigin-RevId: 449031535

Source-Link: googleapis/googleapis@feec34d

Source-Link: googleapis/googleapis-gen@89664e9
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiODk2NjRlOTcwOGMxOWQ1MzJjNjNmN2ExNmZkNzljYjYzMWQ4N2FhMSJ9

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
gcf-owl-bot[bot] and gcf-owl-bot[bot] authored May 19, 2022
1 parent 8513f53 commit 6fa03be
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 8 deletions.
2 changes: 2 additions & 0 deletions google/pubsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from google.pubsub_v1.services.subscriber.async_client import SubscriberAsyncClient

from google.pubsub_v1.types.pubsub import AcknowledgeRequest
from google.pubsub_v1.types.pubsub import BigQueryConfig
from google.pubsub_v1.types.pubsub import CreateSnapshotRequest
from google.pubsub_v1.types.pubsub import DeadLetterPolicy
from google.pubsub_v1.types.pubsub import DeleteSnapshotRequest
Expand Down Expand Up @@ -88,6 +89,7 @@
"SubscriberClient",
"SubscriberAsyncClient",
"AcknowledgeRequest",
"BigQueryConfig",
"CreateSnapshotRequest",
"DeadLetterPolicy",
"DeleteSnapshotRequest",
Expand Down
2 changes: 2 additions & 0 deletions google/pubsub_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .services.subscriber import SubscriberAsyncClient

from .types.pubsub import AcknowledgeRequest
from .types.pubsub import BigQueryConfig
from .types.pubsub import CreateSnapshotRequest
from .types.pubsub import DeadLetterPolicy
from .types.pubsub import DeleteSnapshotRequest
Expand Down Expand Up @@ -83,6 +84,7 @@
"SchemaServiceAsyncClient",
"SubscriberAsyncClient",
"AcknowledgeRequest",
"BigQueryConfig",
"CreateSchemaRequest",
"CreateSnapshotRequest",
"DeadLetterPolicy",
Expand Down
5 changes: 3 additions & 2 deletions google/pubsub_v1/services/subscriber/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ async def sample_create_subscription():
should not be set.
push_config (:class:`google.pubsub_v1.types.PushConfig`):
If push delivery is used with this subscription, this
field is used to configure it. An empty ``pushConfig``
signifies that the subscriber will pull and ack messages
field is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages
using API methods.
This corresponds to the ``push_config`` field
Expand Down
5 changes: 3 additions & 2 deletions google/pubsub_v1/services/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,9 @@ def sample_create_subscription():
should not be set.
push_config (google.pubsub_v1.types.PushConfig):
If push delivery is used with this subscription, this
field is used to configure it. An empty ``pushConfig``
signifies that the subscriber will pull and ack messages
field is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages
using API methods.
This corresponds to the ``push_config`` field
Expand Down
2 changes: 2 additions & 0 deletions google/pubsub_v1/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from .pubsub import (
AcknowledgeRequest,
BigQueryConfig,
CreateSnapshotRequest,
DeadLetterPolicy,
DeleteSnapshotRequest,
Expand Down Expand Up @@ -87,6 +88,7 @@
__all__ = (
"TimeoutType",
"AcknowledgeRequest",
"BigQueryConfig",
"CreateSnapshotRequest",
"DeadLetterPolicy",
"DeleteSnapshotRequest",
Expand Down
95 changes: 92 additions & 3 deletions google/pubsub_v1/types/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"DeadLetterPolicy",
"ExpirationPolicy",
"PushConfig",
"BigQueryConfig",
"ReceivedMessage",
"GetSubscriptionRequest",
"UpdateSubscriptionRequest",
Expand Down Expand Up @@ -581,9 +582,16 @@ class Subscription(proto.Message):
deleted.
push_config (google.pubsub_v1.types.PushConfig):
If push delivery is used with this subscription, this field
is used to configure it. An empty ``pushConfig`` signifies
that the subscriber will pull and ack messages using API
methods.
is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages using
API methods.
bigquery_config (google.pubsub_v1.types.BigQueryConfig):
If delivery to BigQuery is used with this subscription, this
field is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages using
API methods.
ack_deadline_seconds (int):
The approximate amount of time (on a best-effort basis)
Pub/Sub waits for the subscriber to acknowledge receipt
Expand Down Expand Up @@ -700,8 +708,18 @@ class Subscription(proto.Message):
subscribers. See the ``message_retention_duration`` field in
``Topic``. This field is set only in responses from the
server; it is ignored if it is set in any requests.
state (google.pubsub_v1.types.Subscription.State):
Output only. An output-only field indicating
whether or not the subscription can receive
messages.
"""

class State(proto.Enum):
r"""Possible states for a subscription."""
STATE_UNSPECIFIED = 0
ACTIVE = 1
RESOURCE_ERROR = 2

name = proto.Field(
proto.STRING,
number=1,
Expand All @@ -715,6 +733,11 @@ class Subscription(proto.Message):
number=4,
message="PushConfig",
)
bigquery_config = proto.Field(
proto.MESSAGE,
number=18,
message="BigQueryConfig",
)
ack_deadline_seconds = proto.Field(
proto.INT32,
number=5,
Expand Down Expand Up @@ -769,6 +792,11 @@ class Subscription(proto.Message):
number=17,
message=duration_pb2.Duration,
)
state = proto.Field(
proto.ENUM,
number=19,
enum=State,
)


class RetryPolicy(proto.Message):
Expand Down Expand Up @@ -980,6 +1008,67 @@ class OidcToken(proto.Message):
)


class BigQueryConfig(proto.Message):
r"""Configuration for a BigQuery subscription.
Attributes:
table (str):
The name of the table to which to write data,
of the form {projectId}:{datasetId}.{tableId}
use_topic_schema (bool):
When true, use the topic's schema as the
columns to write to in BigQuery, if it exists.
write_metadata (bool):
When true, write the subscription name, message_id,
publish_time, attributes, and ordering_key to additional
columns in the table. The subscription name, message_id, and
publish_time fields are put in their own columns while all
other message properties (other than data) are written to a
JSON object in the attributes column.
drop_unknown_fields (bool):
When true and use_topic_schema is true, any fields that are
a part of the topic schema that are not part of the BigQuery
table schema are dropped when writing to BigQuery.
Otherwise, the schemas must be kept in sync and any messages
with extra fields are not written and remain in the
subscription's backlog.
state (google.pubsub_v1.types.BigQueryConfig.State):
Output only. An output-only field that
indicates whether or not the subscription can
receive messages.
"""

class State(proto.Enum):
r"""Possible states for a BigQuery subscription."""
STATE_UNSPECIFIED = 0
ACTIVE = 1
PERMISSION_DENIED = 2
NOT_FOUND = 3
SCHEMA_MISMATCH = 4

table = proto.Field(
proto.STRING,
number=1,
)
use_topic_schema = proto.Field(
proto.BOOL,
number=2,
)
write_metadata = proto.Field(
proto.BOOL,
number=3,
)
drop_unknown_fields = proto.Field(
proto.BOOL,
number=4,
)
state = proto.Field(
proto.ENUM,
number=5,
enum=State,
)


class ReceivedMessage(proto.Message):
r"""A message and its corresponding acknowledgment ID.
Expand Down
2 changes: 1 addition & 1 deletion scripts/fixup_pubsub_v1_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class pubsubCallTransformer(cst.CSTTransformer):
'acknowledge': ('subscription', 'ack_ids', ),
'create_schema': ('parent', 'schema', 'schema_id', ),
'create_snapshot': ('name', 'subscription', 'labels', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', ),
'create_subscription': ('name', 'topic', 'push_config', 'bigquery_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', 'state', ),
'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ),
'delete_schema': ('name', ),
'delete_snapshot': ('snapshot', ),
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/gapic/pubsub_v1/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
response = client.create_subscription(request)

Expand All @@ -683,6 +684,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


def test_create_subscription_empty_call():
Expand Down Expand Up @@ -731,6 +733,7 @@ async def test_create_subscription_async(
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
)
response = await client.create_subscription(request)
Expand All @@ -750,6 +753,7 @@ async def test_create_subscription_async(
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


@pytest.mark.asyncio
Expand Down Expand Up @@ -963,6 +967,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
response = client.get_subscription(request)

Expand All @@ -981,6 +986,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


def test_get_subscription_empty_call():
Expand Down Expand Up @@ -1025,6 +1031,7 @@ async def test_get_subscription_async(
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
)
response = await client.get_subscription(request)
Expand All @@ -1044,6 +1051,7 @@ async def test_get_subscription_async(
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


@pytest.mark.asyncio
Expand Down Expand Up @@ -1221,6 +1229,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
response = client.update_subscription(request)

Expand All @@ -1239,6 +1248,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


def test_update_subscription_empty_call():
Expand Down Expand Up @@ -1287,6 +1297,7 @@ async def test_update_subscription_async(
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
)
response = await client.update_subscription(request)
Expand All @@ -1306,6 +1317,7 @@ async def test_update_subscription_async(
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


@pytest.mark.asyncio
Expand Down

0 comments on commit 6fa03be

Please sign in to comment.