Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add BigQuery configuration for subscriptions #685

Merged
merged 2 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/pubsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from google.pubsub_v1.services.subscriber.async_client import SubscriberAsyncClient

from google.pubsub_v1.types.pubsub import AcknowledgeRequest
from google.pubsub_v1.types.pubsub import BigQueryConfig
from google.pubsub_v1.types.pubsub import CreateSnapshotRequest
from google.pubsub_v1.types.pubsub import DeadLetterPolicy
from google.pubsub_v1.types.pubsub import DeleteSnapshotRequest
Expand Down Expand Up @@ -88,6 +89,7 @@
"SubscriberClient",
"SubscriberAsyncClient",
"AcknowledgeRequest",
"BigQueryConfig",
"CreateSnapshotRequest",
"DeadLetterPolicy",
"DeleteSnapshotRequest",
Expand Down
2 changes: 2 additions & 0 deletions google/pubsub_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .services.subscriber import SubscriberAsyncClient

from .types.pubsub import AcknowledgeRequest
from .types.pubsub import BigQueryConfig
from .types.pubsub import CreateSnapshotRequest
from .types.pubsub import DeadLetterPolicy
from .types.pubsub import DeleteSnapshotRequest
Expand Down Expand Up @@ -83,6 +84,7 @@
"SchemaServiceAsyncClient",
"SubscriberAsyncClient",
"AcknowledgeRequest",
"BigQueryConfig",
"CreateSchemaRequest",
"CreateSnapshotRequest",
"DeadLetterPolicy",
Expand Down
5 changes: 3 additions & 2 deletions google/pubsub_v1/services/subscriber/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ async def sample_create_subscription():
should not be set.
push_config (:class:`google.pubsub_v1.types.PushConfig`):
If push delivery is used with this subscription, this
field is used to configure it. An empty ``pushConfig``
signifies that the subscriber will pull and ack messages
field is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages
using API methods.

This corresponds to the ``push_config`` field
Expand Down
5 changes: 3 additions & 2 deletions google/pubsub_v1/services/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,9 @@ def sample_create_subscription():
should not be set.
push_config (google.pubsub_v1.types.PushConfig):
If push delivery is used with this subscription, this
field is used to configure it. An empty ``pushConfig``
signifies that the subscriber will pull and ack messages
field is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages
using API methods.

This corresponds to the ``push_config`` field
Expand Down
2 changes: 2 additions & 0 deletions google/pubsub_v1/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from .pubsub import (
AcknowledgeRequest,
BigQueryConfig,
CreateSnapshotRequest,
DeadLetterPolicy,
DeleteSnapshotRequest,
Expand Down Expand Up @@ -87,6 +88,7 @@
__all__ = (
"TimeoutType",
"AcknowledgeRequest",
"BigQueryConfig",
"CreateSnapshotRequest",
"DeadLetterPolicy",
"DeleteSnapshotRequest",
Expand Down
95 changes: 92 additions & 3 deletions google/pubsub_v1/types/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"DeadLetterPolicy",
"ExpirationPolicy",
"PushConfig",
"BigQueryConfig",
"ReceivedMessage",
"GetSubscriptionRequest",
"UpdateSubscriptionRequest",
Expand Down Expand Up @@ -581,9 +582,16 @@ class Subscription(proto.Message):
deleted.
push_config (google.pubsub_v1.types.PushConfig):
If push delivery is used with this subscription, this field
is used to configure it. An empty ``pushConfig`` signifies
that the subscriber will pull and ack messages using API
methods.
is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages using
API methods.
bigquery_config (google.pubsub_v1.types.BigQueryConfig):
If delivery to BigQuery is used with this subscription, this
field is used to configure it. Either ``pushConfig`` or
``bigQueryConfig`` can be set, but not both. If both are
empty, then the subscriber will pull and ack messages using
API methods.
ack_deadline_seconds (int):
The approximate amount of time (on a best-effort basis)
Pub/Sub waits for the subscriber to acknowledge receipt
Expand Down Expand Up @@ -700,8 +708,18 @@ class Subscription(proto.Message):
subscribers. See the ``message_retention_duration`` field in
``Topic``. This field is set only in responses from the
server; it is ignored if it is set in any requests.
state (google.pubsub_v1.types.Subscription.State):
Output only. An output-only field indicating
whether or not the subscription can receive
messages.
"""

class State(proto.Enum):
r"""Possible states for a subscription."""
STATE_UNSPECIFIED = 0
ACTIVE = 1
RESOURCE_ERROR = 2

name = proto.Field(
proto.STRING,
number=1,
Expand All @@ -715,6 +733,11 @@ class Subscription(proto.Message):
number=4,
message="PushConfig",
)
bigquery_config = proto.Field(
proto.MESSAGE,
number=18,
message="BigQueryConfig",
)
ack_deadline_seconds = proto.Field(
proto.INT32,
number=5,
Expand Down Expand Up @@ -769,6 +792,11 @@ class Subscription(proto.Message):
number=17,
message=duration_pb2.Duration,
)
state = proto.Field(
proto.ENUM,
number=19,
enum=State,
)


class RetryPolicy(proto.Message):
Expand Down Expand Up @@ -980,6 +1008,67 @@ class OidcToken(proto.Message):
)


class BigQueryConfig(proto.Message):
r"""Configuration for a BigQuery subscription.

Attributes:
table (str):
The name of the table to which to write data,
of the form {projectId}:{datasetId}.{tableId}
use_topic_schema (bool):
When true, use the topic's schema as the
columns to write to in BigQuery, if it exists.
write_metadata (bool):
When true, write the subscription name, message_id,
publish_time, attributes, and ordering_key to additional
columns in the table. The subscription name, message_id, and
publish_time fields are put in their own columns while all
other message properties (other than data) are written to a
JSON object in the attributes column.
drop_unknown_fields (bool):
When true and use_topic_schema is true, any fields that are
a part of the topic schema that are not part of the BigQuery
table schema are dropped when writing to BigQuery.
Otherwise, the schemas must be kept in sync and any messages
with extra fields are not written and remain in the
subscription's backlog.
state (google.pubsub_v1.types.BigQueryConfig.State):
Output only. An output-only field that
indicates whether or not the subscription can
receive messages.
"""

class State(proto.Enum):
r"""Possible states for a BigQuery subscription."""
STATE_UNSPECIFIED = 0
ACTIVE = 1
PERMISSION_DENIED = 2
NOT_FOUND = 3
SCHEMA_MISMATCH = 4

table = proto.Field(
proto.STRING,
number=1,
)
use_topic_schema = proto.Field(
proto.BOOL,
number=2,
)
write_metadata = proto.Field(
proto.BOOL,
number=3,
)
drop_unknown_fields = proto.Field(
proto.BOOL,
number=4,
)
state = proto.Field(
proto.ENUM,
number=5,
enum=State,
)


class ReceivedMessage(proto.Message):
r"""A message and its corresponding acknowledgment ID.

Expand Down
2 changes: 1 addition & 1 deletion scripts/fixup_pubsub_v1_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class pubsubCallTransformer(cst.CSTTransformer):
'acknowledge': ('subscription', 'ack_ids', ),
'create_schema': ('parent', 'schema', 'schema_id', ),
'create_snapshot': ('name', 'subscription', 'labels', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', ),
'create_subscription': ('name', 'topic', 'push_config', 'bigquery_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', 'state', ),
'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ),
'delete_schema': ('name', ),
'delete_snapshot': ('snapshot', ),
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/gapic/pubsub_v1/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
response = client.create_subscription(request)

Expand All @@ -683,6 +684,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


def test_create_subscription_empty_call():
Expand Down Expand Up @@ -731,6 +733,7 @@ async def test_create_subscription_async(
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
)
response = await client.create_subscription(request)
Expand All @@ -750,6 +753,7 @@ async def test_create_subscription_async(
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


@pytest.mark.asyncio
Expand Down Expand Up @@ -963,6 +967,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
response = client.get_subscription(request)

Expand All @@ -981,6 +986,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


def test_get_subscription_empty_call():
Expand Down Expand Up @@ -1025,6 +1031,7 @@ async def test_get_subscription_async(
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
)
response = await client.get_subscription(request)
Expand All @@ -1044,6 +1051,7 @@ async def test_get_subscription_async(
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


@pytest.mark.asyncio
Expand Down Expand Up @@ -1221,6 +1229,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
response = client.update_subscription(request)

Expand All @@ -1239,6 +1248,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


def test_update_subscription_empty_call():
Expand Down Expand Up @@ -1287,6 +1297,7 @@ async def test_update_subscription_async(
filter="filter_value",
detached=True,
enable_exactly_once_delivery=True,
state=pubsub.Subscription.State.ACTIVE,
)
)
response = await client.update_subscription(request)
Expand All @@ -1306,6 +1317,7 @@ async def test_update_subscription_async(
assert response.filter == "filter_value"
assert response.detached is True
assert response.enable_exactly_once_delivery is True
assert response.state == pubsub.Subscription.State.ACTIVE


@pytest.mark.asyncio
Expand Down