From 81f704a2cdeece8f73d7c09eae730a905afdb870 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 22 Jun 2022 15:26:51 -0700 Subject: [PATCH] feat(pubsub): support bigquery subscriptions (#6119) * feat(pubsub): support bigquery subscriptions * fix formatting and update comments * run gofmt * simplify fake behavior for bqconfig * fix comments * add comments to exported consts * add SubscriptionState field * add comment to exported field * add comment to exported type * fix comment --- pubsub/integration_test.go | 2 + pubsub/pstest/fake.go | 12 ++++ pubsub/pstest/fake_test.go | 58 +++++++++++++++ pubsub/subscription.go | 137 ++++++++++++++++++++++++++++++++++-- pubsub/subscription_test.go | 34 +++++++++ 5 files changed, 239 insertions(+), 4 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 73375286b5cf..e0417d17b832 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -625,6 +625,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) { ServiceAccountEmail: serviceAccountEmail, }, }, + State: SubscriptionStateActive, } opt := cmpopts.IgnoreUnexported(SubscriptionConfig{}) if diff := testutil.Diff(got, want, opt); diff != "" { @@ -658,6 +659,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) { RetentionDuration: 2 * time.Hour, Labels: map[string]string{"label": "value"}, ExpirationPolicy: 25 * time.Hour, + State: SubscriptionStateActive, } if !testutil.Equal(got, want, opt) { diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 7584215010c7..964b692642ae 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -480,6 +480,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p if ps.PushConfig == nil { ps.PushConfig = &pb.PushConfig{} } + if ps.BigqueryConfig == nil { + ps.BigqueryConfig = &pb.BigQueryConfig{} + } else if ps.BigqueryConfig.Table != "" { + ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE + } ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration var deadLetterTopic *topic if ps.DeadLetterPolicy != nil { @@ -579,6 +584,12 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "push_config": sub.proto.PushConfig = req.Subscription.PushConfig + case "bigquery_config": + sub.proto.BigqueryConfig = req.GetSubscription().GetBigqueryConfig() + if sub.proto.GetBigqueryConfig().GetTable() != "" { + sub.proto.GetBigqueryConfig().State = pb.BigQueryConfig_ACTIVE + } + case "ack_deadline_seconds": a := req.Subscription.AckDeadlineSeconds if err := checkAckDeadline(a); err != nil { @@ -788,6 +799,7 @@ func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, dea if at == 0 { at = 10 * time.Second } + ps.State = pb.Subscription_ACTIVE return &subscription{ topic: t, deadLetterTopic: deadLetterTopic, diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 2dce348c5f0a..ee5f604b5747 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1455,3 +1455,61 @@ func TestTopicRetentionAdmin(t *testing.T) { t.Errorf("sub.TopicMessageRetentionDuration mismatch: %s", diff) } } + +func TestSubscriptionPushPull(t *testing.T) { + ctx := context.Background() + pclient, sclient, _, cleanup := newFake(ctx, t) + defer cleanup() + + top := mustCreateTopic(ctx, t, pclient, &pb.Topic{ + Name: "projects/P/topics/T", + }) + + // Create a push subscription. + pc := &pb.PushConfig{ + PushEndpoint: "some-endpoint", + } + got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{ + AckDeadlineSeconds: minAckDeadlineSecs, + Name: "projects/P/subscriptions/S", + Topic: top.Name, + PushConfig: pc, + }) + + if diff := testutil.Diff(got.PushConfig, pc); diff != "" { + t.Errorf("sub.PushConfig mismatch: %s", diff) + } + + // Update the subscription to write to BigQuery instead. + updateSub := got + updateSub.PushConfig = &pb.PushConfig{} + bqc := &pb.BigQueryConfig{ + Table: "some-table", + } + updateSub.BigqueryConfig = bqc + got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ + Subscription: updateSub, + UpdateMask: &field_mask.FieldMask{Paths: []string{"push_config", "bigquery_config"}}, + }) + if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { + t.Errorf("sub.PushConfig should be zero value\n%s", diff) + } + want := bqc + want.State = pb.BigQueryConfig_ACTIVE + if diff := testutil.Diff(got.BigqueryConfig, want); diff != "" { + t.Errorf("sub.BigQueryConfig mismatch: %s", diff) + } + + // Switch back to a pull subscription. + updateSub.BigqueryConfig = &pb.BigQueryConfig{} + got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{ + Subscription: updateSub, + UpdateMask: &field_mask.FieldMask{Paths: []string{"bigquery_config"}}, + }) + if diff := testutil.Diff(got.PushConfig, new(pb.PushConfig)); diff != "" { + t.Errorf("sub.PushConfig should be zero value\n%s", diff) + } + if diff := testutil.Diff(got.BigqueryConfig, new(pb.BigQueryConfig)); diff != "" { + t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff) + } +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 0eb374872388..daaf6748df9e 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -210,14 +210,106 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { } } +// BigQueryConfigState denotes the possible states for a BigQuery Subscription. +type BigQueryConfigState int + +const ( + // BigQueryConfigStateUnspecified is the default value. This value is unused. + BigQueryConfigStateUnspecified = iota + + // BigQueryConfigActive means the subscription can actively send messages to BigQuery. + BigQueryConfigActive + + // BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors. + BigQueryConfigPermissionDenied + + // BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist. + BigQueryConfigNotFound + + // BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch. + BigQueryConfigSchemaMismatch +) + +// BigQueryConfig configures the subscription to deliver to a BigQuery table. +type BigQueryConfig struct { + // The name of the table to which to write data, of the form + // {projectId}:{datasetId}.{tableId} + Table string + + // When true, use the topic's schema as the columns to write to in BigQuery, + // if it exists. + UseTopicSchema bool + + // When true, write the subscription name, message_id, publish_time, + // attributes, and ordering_key to additional columns in the table. The + // subscription name, message_id, and publish_time fields are put in their own + // columns while all other message properties (other than data) are written to + // a JSON object in the attributes column. + WriteMetadata bool + + // When true and use_topic_schema is true, any fields that are a part of the + // topic schema that are not part of the BigQuery table schema are dropped + // when writing to BigQuery. Otherwise, the schemas must be kept in sync and + // any messages with extra fields are not written and remain in the + // subscription's backlog. + DropUnknownFields bool + + // This is an output-only field that indicates whether or not the subscription can + // receive messages. This field is set only in responses from the server; + // it is ignored if it is set in any requests. + State BigQueryConfigState +} + +func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { + if bc == nil { + return nil + } + pbCfg := &pb.BigQueryConfig{ + Table: bc.Table, + UseTopicSchema: bc.UseTopicSchema, + WriteMetadata: bc.WriteMetadata, + DropUnknownFields: bc.DropUnknownFields, + State: pb.BigQueryConfig_State(bc.State), + } + return pbCfg +} + +// SubscriptionState denotes the possible states for a Subscription. +type SubscriptionState int + +const ( + // SubscriptionStateUnspecified is the default value. This value is unused. + SubscriptionStateUnspecified = iota + + // SubscriptionStateActive means the subscription can actively send messages to BigQuery. + SubscriptionStateActive + + // SubscriptionStateResourceError means the subscription receive messages because of an + // error with the resource to which it pushes messages. + // See the more detailed error state in the corresponding configuration. + SubscriptionStateResourceError +) + // SubscriptionConfig describes the configuration of a subscription. type SubscriptionConfig struct { // The fully qualified identifier for the subscription, in the format "projects//subscriptions/" name string - Topic *Topic + // The topic from which this subscription is receiving messages. + Topic *Topic + + // If push delivery is used with this subscription, this field is + // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, + // but not both. If both are empty, then the subscriber will pull and ack + // messages using API methods. PushConfig PushConfig + // If delivery to BigQuery is used with this subscription, this field is + // used to configure it. Either `PushConfig` or `BigQueryConfig` can be set, + // but not both. If both are empty, then the subscriber will pull and ack + // messages using API methods. + BigQueryConfig BigQueryConfig + // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this @@ -290,6 +382,12 @@ type SubscriptionConfig struct { // This is an output only field, meaning it will only appear in responses from the backend // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration + + // State indicates whether or not the subscription can receive messages. + // This is an output-only field that indicates whether or not the subscription can + // receive messages. This field is set only in responses from the server; + // it is ignored if it is set in any requests. + State SubscriptionState } // String returns the globally unique printable name of the subscription config. @@ -317,6 +415,10 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { pbPushConfig = cfg.PushConfig.toProto() } + var pbBigQueryConfig *pb.BigQueryConfig + if cfg.BigQueryConfig.Table != "" { + pbBigQueryConfig = cfg.BigQueryConfig.toProto() + } var retentionDuration *durpb.Duration if cfg.RetentionDuration != 0 { retentionDuration = durpb.New(cfg.RetentionDuration) @@ -333,6 +435,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { Name: name, Topic: cfg.Topic.name, PushConfig: pbPushConfig, + BigqueryConfig: pbBigQueryConfig, AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), RetainAckedMessages: cfg.RetainAckedMessages, MessageRetentionDuration: retentionDuration, @@ -371,11 +474,14 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC RetryPolicy: rp, Detached: pbSub.Detached, TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), + State: SubscriptionState(pbSub.State), } - pc := protoToPushConfig(pbSub.PushConfig) - if pc != nil { + if pc := protoToPushConfig(pbSub.PushConfig); pc != nil { subC.PushConfig = *pc } + if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil { + subC.BigQueryConfig = *bq + } return subC, nil } @@ -398,6 +504,20 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { return pc } +func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig { + if pbBQ == nil { + return nil + } + bq := &BigQueryConfig{ + Table: pbBQ.GetTable(), + UseTopicSchema: pbBQ.GetUseTopicSchema(), + DropUnknownFields: pbBQ.GetDropUnknownFields(), + WriteMetadata: pbBQ.GetWriteMetadata(), + State: BigQueryConfigState(pbBQ.State), + } + return bq +} + // DeadLetterPolicy specifies the conditions for dead lettering messages in // a subscription. type DeadLetterPolicy struct { @@ -641,9 +761,14 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { // SubscriptionConfigToUpdate describes how to update a subscription. type SubscriptionConfigToUpdate struct { - // If non-nil, the push config is changed. + // If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig. + // If currently in push mode, set this value to the zero value to revert to a Pull based subscription. PushConfig *PushConfig + // If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig. + // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, + BigQueryConfig *BigQueryConfig + // If non-zero, the ack deadline is changed. AckDeadline time.Duration @@ -698,6 +823,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update psub.PushConfig = cfg.PushConfig.toProto() paths = append(paths, "push_config") } + if cfg.BigQueryConfig != nil { + psub.BigqueryConfig = cfg.BigQueryConfig.toProto() + paths = append(paths, "bigquery_config") + } if cfg.AckDeadline != 0 { psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) paths = append(paths, "ack_deadline_seconds") diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 7595905152b3..7d4ac9ee28c7 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -190,6 +190,7 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, + State: SubscriptionStateActive, } opt := cmpopts.IgnoreUnexported(SubscriptionConfig{}) if !testutil.Equal(cfg, want, opt) { @@ -226,6 +227,7 @@ func TestUpdateSubscription(t *testing.T) { Audience: "client-12345", }, }, + State: SubscriptionStateActive, } if !testutil.Equal(got, want, opt) { t.Fatalf("\ngot %+v\nwant %+v", got, want) @@ -435,3 +437,35 @@ func TestOrdering_CreateSubscription(t *testing.T) { msg.Ack() }) } + +func TestBigQuerySubscription(t *testing.T) { + ctx := context.Background() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + bqTable := "some-project:some-dataset.some-table" + bqConfig := BigQueryConfig{ + Table: bqTable, + } + + subConfig := SubscriptionConfig{ + Topic: topic, + BigQueryConfig: bqConfig, + } + bqSub, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatal(err) + } + cfg, err := bqSub.Config(ctx) + if err != nil { + t.Fatal(err) + } + + want := bqConfig + want.State = BigQueryConfigActive + if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" { + t.Fatalf("CreateBQSubscription mismatch: \n%s", diff) + } +}