Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): support payload wrapping for push subs #8292

Merged
merged 6 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/vet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ golint ./... 2>&1 | (
grep -vE " executeStreamingSql(Min|Rnd)Time" |
grep -vE " executeSql(Min|Rnd)Time" |
grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" |
grep -vE "pubsub\/subscription\.go.+ type name will be used as pubsub.PubsubWrapper by other packages" |
grep -v "ClusterId" |
grep -v "InstanceId" |
grep -v "firestore.arrayUnion" |
Expand Down
5 changes: 5 additions & 0 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
}
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
} else if ps.PushConfig.Wrapper == nil {
// Wrapper should default to PubsubWrapper.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting to nil is also acceptable and is treated as PubsubWrapper.

Copy link
Member Author

@hongalex hongalex Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just implementing that behavior in the fake server we provide. The client doesn't know about the default and should default to nil

ps.PushConfig.Wrapper = &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
}
}
// Consider any table set to mean the config is active.
// We don't convert nil config to empty like with PushConfig above
Expand Down
3 changes: 3 additions & 0 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,9 @@ func TestSubscriptionPushPull(t *testing.T) {
// Create a push subscription.
pc := &pb.PushConfig{
PushEndpoint: "some-endpoint",
Wrapper: &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
},
}
got := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
AckDeadlineSeconds: minAckDeadlineSecs,
Expand Down
70 changes: 67 additions & 3 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ type PushConfig struct {
// This field is optional and should be set only by users interested in
// authenticated push.
AuthenticationMethod AuthenticationMethod

// The format of the delivered message to the push endpoint is defined by
// the chosen wrapper. When unset, `PubsubWrapper` is used.
Wrapper Wrapper
}

func (pc *PushConfig) toProto() *pb.PushConfig {
Expand All @@ -165,12 +169,19 @@ func (pc *PushConfig) toProto() *pb.PushConfig {
default: // TODO: add others here when GAIC adds more definitions.
}
}
if w := pc.Wrapper; w != nil {
switch wt := w.(type) {
case *PubsubWrapper:
pbCfg.Wrapper = wt.toProto()
case *NoWrapper:
pbCfg.Wrapper = wt.toProto()
default:
}
}
return pbCfg
}

// AuthenticationMethod is used by push points to verify the source of push requests.
// This interface defines fields that are part of a closed alpha that may not be accessible
// to all users.
// AuthenticationMethod is used by push subscriptions to verify the source of push requests.
type AuthenticationMethod interface {
isAuthMethod() bool
}
Expand Down Expand Up @@ -212,6 +223,49 @@ func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
}
}

// Wrapper defines the format of message delivered to push endpoints.
type Wrapper interface {
isWrapper() bool
}

// PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON
// representation of a PubsubMessage
// (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
type PubsubWrapper struct{}

var _ Wrapper = (*PubsubWrapper)(nil)

func (p *PubsubWrapper) isWrapper() bool { return true }

func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ {
if p == nil {
return nil
}
return &pb.PushConfig_PubsubWrapper_{
PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
}
}

// NoWrapper denotes not wrapping the payload sent to the push endpoint.
type NoWrapper struct {
WriteMetadata bool
}

var _ Wrapper = (*NoWrapper)(nil)

func (n *NoWrapper) isWrapper() bool { return true }

func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ {
if n == nil {
return nil
}
return &pb.PushConfig_NoWrapper_{
NoWrapper: &pb.PushConfig_NoWrapper{
WriteMetadata: n.WriteMetadata,
},
}
}

// BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int

Expand Down Expand Up @@ -648,6 +702,16 @@ func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
}
}
}
if w := pbPc.Wrapper; w != nil {
switch wt := w.(type) {
case *pb.PushConfig_PubsubWrapper_:
pc.Wrapper = &PubsubWrapper{}
case *pb.PushConfig_NoWrapper_:
pc.Wrapper = &NoWrapper{
WriteMetadata: wt.NoWrapper.WriteMetadata,
}
}
}
return pc
}

Expand Down
37 changes: 22 additions & 15 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestListTopicSubscriptions(t *testing.T) {

const defaultRetentionDuration = 168 * time.Hour

func TestUpdateSubscription(t *testing.T) {
func TestSubscriptionConfig(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
Expand Down Expand Up @@ -191,13 +191,14 @@ func TestUpdateSubscription(t *testing.T) {
ServiceAccountEmail: "[email protected]",
Audience: "client-12345",
},
Wrapper: &PubsubWrapper{},
},
EnableExactlyOnceDelivery: false,
State: SubscriptionStateActive,
}
opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
if !testutil.Equal(cfg, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", cfg, want)
if diff := testutil.Diff(cfg, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}

got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
Expand All @@ -206,10 +207,13 @@ func TestUpdateSubscription(t *testing.T) {
Labels: map[string]string{"label": "value"},
ExpirationPolicy: 72 * time.Hour,
PushConfig: &PushConfig{
Endpoint: "https://example.com/push",
Endpoint: "https://example2.com/push",
AuthenticationMethod: &OIDCToken{
ServiceAccountEmail: "[email protected]",
Audience: "client-12345",
ServiceAccountEmail: "[email protected]",
Audience: "client-98765",
},
Wrapper: &NoWrapper{
WriteMetadata: true,
},
},
EnableExactlyOnceDelivery: true,
Expand All @@ -225,17 +229,20 @@ func TestUpdateSubscription(t *testing.T) {
Labels: map[string]string{"label": "value"},
ExpirationPolicy: 72 * time.Hour,
PushConfig: PushConfig{
Endpoint: "https://example.com/push",
Endpoint: "https://example2.com/push",
AuthenticationMethod: &OIDCToken{
ServiceAccountEmail: "[email protected]",
Audience: "client-12345",
ServiceAccountEmail: "[email protected]",
Audience: "client-98765",
},
Wrapper: &NoWrapper{
WriteMetadata: true,
},
},
EnableExactlyOnceDelivery: true,
State: SubscriptionStateActive,
}
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
if diff := testutil.Diff(got, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}

got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
Expand All @@ -247,8 +254,8 @@ func TestUpdateSubscription(t *testing.T) {
}
want.RetentionDuration = 2 * time.Hour
want.Labels = nil
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
if diff := testutil.Diff(got, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}

_, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
Expand All @@ -264,8 +271,8 @@ func TestUpdateSubscription(t *testing.T) {
t.Fatal(err)
}
want.ExpirationPolicy = time.Duration(0)
if !testutil.Equal(got, want, opt) {
t.Fatalf("\ngot %+v\nwant %+v", got, want)
if diff := testutil.Diff(got, want, opt); diff != "" {
t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
}
}

Expand Down