Skip to content

Commit

Permalink
fix(pubsublite): rename publish.Metadata to pscompat.MessageMetadata (#…
Browse files Browse the repository at this point in the history
…3672)

Since publish.Metadata now also applies to message ids, it was renamed to MessageMetadata and duplicated in the internal/wire and pscompat packages.
  • Loading branch information
tmdiep authored Feb 11, 2021
1 parent f66114b commit 6a8d4c5
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 229 deletions.
73 changes: 49 additions & 24 deletions pubsublite/go.sum

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions pubsublite/internal/wire/publish_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"errors"
"fmt"

"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"
"google.golang.org/protobuf/proto"
Expand All @@ -29,7 +28,7 @@ import (
var errPublishQueueEmpty = errors.New("pubsublite: received publish response from server with no batches in flight")

// PublishResultFunc receives the result of a publish.
type PublishResultFunc func(*publish.Metadata, error)
type PublishResultFunc func(*MessageMetadata, error)

// messageHolder stores a message to be published, with associated metadata.
type messageHolder struct {
Expand Down Expand Up @@ -142,8 +141,8 @@ func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error {
batch, _ := frontElem.Value.(*publishBatch)
for i, msgHolder := range batch.msgHolders {
// Messages are ordered, so the offset of each message is firstOffset + i.
pm := &publish.Metadata{Partition: b.partition, Offset: firstOffset + int64(i)}
msgHolder.onResult(pm, nil)
mm := &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)}
msgHolder.onResult(mm, nil)
b.availableBufferBytes += msgHolder.size
}

Expand Down
7 changes: 3 additions & 4 deletions pubsublite/internal/wire/publish_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/publish"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -35,7 +34,7 @@ type testPublishResultReceiver struct {
done chan struct{}
msg string
t *testing.T
got *publish.Metadata
got *MessageMetadata
gotErr error
}

Expand All @@ -47,8 +46,8 @@ func newTestPublishResultReceiver(t *testing.T, msg *pb.PubSubMessage) *testPubl
}
}

func (r *testPublishResultReceiver) set(pm *publish.Metadata, err error) {
r.got = pm
func (r *testPublishResultReceiver) set(mm *MessageMetadata, err error) {
r.got = mm
r.gotErr = err
close(r.done)
}
Expand Down
17 changes: 17 additions & 0 deletions pubsublite/internal/wire/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,20 @@ type subscriptionPartition struct {
func (sp subscriptionPartition) String() string {
return fmt.Sprintf("%s/partitions/%d", sp.Path, sp.Partition)
}

// MessageMetadata holds properties of a message published to the Pub/Sub Lite
// service.
//
// NOTE: This is duplicated in the pscompat package in order to generate nicer
// docs and should be kept consistent.
type MessageMetadata struct {
// The topic partition the message was published to.
Partition int

// The offset the message was assigned.
Offset int64
}

func (m *MessageMetadata) String() string {
return fmt.Sprintf("%d:%d", m.Partition, m.Offset)
}
42 changes: 42 additions & 0 deletions pubsublite/pscompat/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,45 @@ func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
// receiving.
cancel()
}

func ExampleParseMessageMetadata_publisher() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")})
id, err := result.Get(ctx)
if err != nil {
// TODO: Handle error.
}
metadata, err := pscompat.ParseMessageMetadata(id)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published message to partition %d with offset %d\n", metadata.Partition, metadata.Offset)
}

func ExampleParseMessageMetadata_subscriber() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
subscriber, err := pscompat.NewSubscriberClient(ctx, subscription)
if err != nil {
// TODO: Handle error.
}
err = subscriber.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
m.Ack()
metadata, err := pscompat.ParseMessageMetadata(m.ID)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Received message from partition %d with offset %d\n", metadata.Partition, metadata.Offset)
})
if err != nil {
// TODO: Handle error.
}
}
5 changes: 2 additions & 3 deletions pubsublite/pscompat/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
Expand Down Expand Up @@ -207,7 +206,7 @@ func waitForPublishResults(t *testing.T, pubResults []*pubsub.PublishResult) {
if err != nil {
t.Errorf("Publish(%d) got err: %v", i, err)
}
if _, err := publish.ParseMetadata(id); err != nil {
if _, err := ParseMessageMetadata(id); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -242,7 +241,7 @@ func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings Rece
}

// Check message ordering.
metadata, err := publish.ParseMetadata(msg.ID)
metadata, err := ParseMessageMetadata(msg.ID)
if err != nil {
t.Error(err)
} else {
Expand Down
33 changes: 33 additions & 0 deletions pubsublite/pscompat/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"encoding/base64"
"errors"
"fmt"
"strconv"
"strings"

"cloud.google.com/go/pubsub"
"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -128,3 +130,34 @@ func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) err
}
return nil
}

// MessageMetadata holds properties of a message published to the Pub/Sub Lite
// service.
type MessageMetadata struct {
// The topic partition the message was published to.
Partition int

// The offset the message was assigned.
Offset int64
}

func (m *MessageMetadata) String() string {
return fmt.Sprintf("%d:%d", m.Partition, m.Offset)
}

// ParseMessageMetadata creates MessageMetadata from the ID string of a
// pubsub.PublishResult returned by PublisherClient or pubsub.Message.ID
// received from SubscriberClient.
func ParseMessageMetadata(id string) (*MessageMetadata, error) {
parts := strings.Split(id, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("pubsublite: invalid encoded message metadata %q", id)
}

partition, pErr := strconv.ParseInt(parts[0], 10, 64)
offset, oErr := strconv.ParseInt(parts[1], 10, 64)
if pErr != nil || oErr != nil {
return nil, fmt.Errorf("pubsublite: invalid encoded message metadata %q", id)
}
return &MessageMetadata{Partition: int(partition), Offset: offset}, nil
}
48 changes: 48 additions & 0 deletions pubsublite/pscompat/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,51 @@ func TestMessageTransforms(t *testing.T) {
})
}
}

func TestMessageMetadataStringEncoding(t *testing.T) {
for _, tc := range []struct {
desc string
input string
want *MessageMetadata
wantErr bool
}{
{
desc: "valid: zero",
input: "0:0",
want: &MessageMetadata{Partition: 0, Offset: 0},
},
{
desc: "valid: non-zero",
input: "3:1234",
want: &MessageMetadata{Partition: 3, Offset: 1234},
},
{
desc: "invalid: number",
input: "1234",
wantErr: true,
},
{
desc: "invalid: partition",
input: "p:1234",
wantErr: true,
},
{
desc: "invalid: offset",
input: "10:9offset",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, gotErr := ParseMessageMetadata(tc.input)
if !testutil.Equal(got, tc.want) || (gotErr != nil) != tc.wantErr {
t.Errorf("ParseMessageMetadata(%q): got (%v, %v), want (%v, err=%v)", tc.input, got, gotErr, tc.want, tc.wantErr)
}

if tc.want != nil {
if got := tc.want.String(); got != tc.input {
t.Errorf("MessageMetadata(%v).String(): got %q, want: %q", tc.want, got, tc.input)
}
}
})
}
}
7 changes: 3 additions & 4 deletions pubsublite/pscompat/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"
Expand Down Expand Up @@ -122,10 +121,10 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub
return result
}

p.wirePub.Publish(msgpb, func(pm *publish.Metadata, err error) {
p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) {
err = translateError(err)
if pm != nil {
ipubsub.SetPublishResult(result, pm.String(), err)
if metadata != nil {
ipubsub.SetPublishResult(result, metadata.String(), err)
} else {
ipubsub.SetPublishResult(result, "", err)
}
Expand Down
5 changes: 2 additions & 3 deletions pubsublite/pscompat/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"

Expand All @@ -44,7 +43,7 @@ func (mp *mockWirePublisher) Publish(msg *pb.PubSubMessage, onResult wire.Publis
onResult(nil, err)
return
}
result := resp.(*publish.Metadata)
result := resp.(*wire.MessageMetadata)
onResult(result, nil)
}

Expand All @@ -68,7 +67,7 @@ func TestPublisherClientTransformMessage(t *testing.T) {
OrderingKey: "ordering_key",
Attributes: map[string]string{"attr": "value"},
}
fakeResponse := &publish.Metadata{
fakeResponse := &wire.MessageMetadata{
Partition: 2,
Offset: 42,
}
Expand Down
3 changes: 1 addition & 2 deletions pubsublite/pscompat/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"google.golang.org/api/option"

ipubsub "cloud.google.com/go/internal/pubsub"
Expand Down Expand Up @@ -136,7 +135,7 @@ func (si *subscriberInstance) transformMessage(in *wire.ReceivedMessage, out *pu
if len(out.ID) > 0 {
return errMessageIDSet
}
metadata := &publish.Metadata{Partition: in.Partition, Offset: in.Msg.GetCursor().GetOffset()}
metadata := &MessageMetadata{Partition: in.Partition, Offset: in.Msg.GetCursor().GetOffset()}
out.ID = metadata.String()
return nil
}
Expand Down
65 changes: 0 additions & 65 deletions pubsublite/publish/example_test.go

This file was deleted.

Loading

0 comments on commit 6a8d4c5

Please sign in to comment.