Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(pubsublite): extra examples and documentation clean up #3663

Merged
merged 6 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsublite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ To publish messages to a topic:
const topic = "projects/project-id/locations/us-central1-b/topics/topic1"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
log.Fatal(err)
log.Fatal(err)
}

// Publish "hello world".
Expand Down
45 changes: 31 additions & 14 deletions pubsublite/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,34 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
More information about Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.

Note: This library is in ALPHA. Backwards-incompatible changes may be made
before stable v1.0.0 is released.


Introduction

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.

Note: This library is in ALPHA. Backwards-incompatible changes may be made
before stable v1.0.0 is released.
The following imports are required for code snippets below:

import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/pscompat"
)

More complete examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.


Creating Topics

Messages are published to topics. Pub/Sub Lite topics may be created like so:

ctx := context.Background()
const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic"
topicConfig := pubsublite.TopicConfig{
Name: topicPath,
Expand All @@ -53,16 +70,15 @@ Messages are published to topics. Pub/Sub Lite topics may be created like so:
if err != nil {
// TODO: Handle error.
}
topic, err = adminClient.CreateTopic(ctx, topicConfig)
if err != nil {
if _, err = adminClient.CreateTopic(ctx, topicConfig); err != nil {
// TODO: Handle error.
}

See https://cloud.google.com/pubsub/lite/docs/topics for more information about
how Pub/Sub Lite topics are configured.

See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions
and zones where Pub/Sub Lite is available.
See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones
where Pub/Sub Lite is available.


Publishing
Expand Down Expand Up @@ -100,8 +116,9 @@ service:
// TODO: Handle error.
}

Once you've finishing publishing, call Stop to flush all messages to the service
and close gRPC streams:
Once you've finishing publishing all messages, call Stop to flush all messages
to the service and close gRPC streams. The PublisherClient can no longer be used
after it has been stopped or has terminated due to a permanent service error.

publisher.Stop()

Expand All @@ -123,8 +140,7 @@ Pub/Sub Lite subscriptions may be created like so:
Topic: topicPath,
DeliveryRequirement: pubsublite.DeliverImmediately,
}
subscription, err = adminClient.CreateSubscription(ctx, subscriptionConfig)
if err != nil {
if _, err = adminClient.CreateSubscription(ctx, subscriptionConfig); err != nil {
// TODO: Handle error.
}

Expand All @@ -138,7 +154,9 @@ To receive messages for a subscription, first create a SubscriberClient:

subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath)

Messages are then consumed from a subscription via callback.
Messages are then consumed from a subscription via callback. The callback may be
invoked concurrently by multiple goroutines (one per partition that the
subscriber client is connected to).

cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
Expand All @@ -149,9 +167,8 @@ Messages are then consumed from a subscription via callback.
// TODO: Handle error.
}

The callback may be invoked concurrently by multiple goroutines (one per
partition that the subscriber client is connected to). To terminate a call to
Receive, cancel its context:
Receive blocks until either the context is canceled or a fatal service error
occurs. To terminate a call to Receive, cancel its context:

cancel()

Expand Down
17 changes: 17 additions & 0 deletions pubsublite/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ import (
"google.golang.org/api/iterator"
)

// This example demonstrates how to create a new topic.
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
// about how Pub/Sub Lite topics are configured.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones
// where Pub/Sub Lite is available.
func ExampleAdminClient_CreateTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -47,6 +53,7 @@ func ExampleAdminClient_CreateTopic() {

func ExampleAdminClient_UpdateTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -67,6 +74,7 @@ func ExampleAdminClient_UpdateTopic() {

func ExampleAdminClient_DeleteTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -80,6 +88,7 @@ func ExampleAdminClient_DeleteTopic() {

func ExampleAdminClient_Topics() {
ctx := context.Background()
// NOTE: region must correspond to the zone below.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -101,6 +110,7 @@ func ExampleAdminClient_Topics() {

func ExampleAdminClient_TopicSubscriptions() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -121,8 +131,12 @@ func ExampleAdminClient_TopicSubscriptions() {
}
}

// This example demonstrates how to create a new subscription for a topic.
// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more
// information about how subscriptions are configured.
func ExampleAdminClient_CreateSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic and subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -143,6 +157,7 @@ func ExampleAdminClient_CreateSubscription() {

func ExampleAdminClient_UpdateSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -162,6 +177,7 @@ func ExampleAdminClient_UpdateSubscription() {

func ExampleAdminClient_DeleteSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -175,6 +191,7 @@ func ExampleAdminClient_DeleteSubscription() {

func ExampleAdminClient_Subscriptions() {
ctx := context.Background()
// NOTE: region must correspond to the zone below.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/pscompat/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
Package pscompat contains clients for publishing and subscribing using the
Pub/Sub Lite service.

The clients in this package are designed to compatible with the Cloud Pub/Sub
library: https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are
defined by the client, PublisherClient and SubscriberClient can be used as
This package is designed to compatible with the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the
client application, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
respectively, from the pubsub package.

Expand Down
40 changes: 36 additions & 4 deletions pubsublite/pscompat/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func ExamplePublisherClient_Publish() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
Expand Down Expand Up @@ -68,7 +68,7 @@ func ExamplePublisherClient_Publish_batchingSettings() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
Expand All @@ -92,13 +92,15 @@ func ExamplePublisherClient_Error() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == pscompat.ErrPublisherStopped {
// Prints the fatal error that caused the publisher to terminate.
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
break
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
Expand Down Expand Up @@ -151,6 +153,36 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

// This example shows how to manually assign which topic partitions a
// SubscriberClient should connect to. If not specified, the SubscriberClient
// will use Pub/Sub Lite's partition assignment service to automatically
// determine which partitions it should connect to.
func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
// NOTE: The corresponding topic must have 2 or more partitions.
settings.Partitions = []int{0, 1}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}