diff --git a/pubsublite/README.md b/pubsublite/README.md index ab37bd95b1e6..2721e8ffb92c 100644 --- a/pubsublite/README.md +++ b/pubsublite/README.md @@ -27,7 +27,7 @@ To publish messages to a topic: const topic = "projects/project-id/locations/us-central1-b/topics/topic1" publisher, err := pscompat.NewPublisherClient(ctx, topic) if err != nil { - log.Fatal(err) + log.Fatal(err) } // Publish "hello world". diff --git a/pubsublite/doc.go b/pubsublite/doc.go index 4b1def088e5e..c260489b49fd 100644 --- a/pubsublite/doc.go +++ b/pubsublite/doc.go @@ -29,17 +29,34 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite. More information about Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite. +Note: This library is in ALPHA. Backwards-incompatible changes may be made +before stable v1.0.0 is released. + + +Introduction + See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package. -Note: This library is in ALPHA. Backwards-incompatible changes may be made -before stable v1.0.0 is released. +The following imports are required for code snippets below: + + import ( + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite" + "cloud.google.com/go/pubsublite/pscompat" + ) + +More complete examples can be found at +https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples +and +https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples. Creating Topics Messages are published to topics. Pub/Sub Lite topics may be created like so: + ctx := context.Background() const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic" topicConfig := pubsublite.TopicConfig{ Name: topicPath, @@ -53,16 +70,15 @@ Messages are published to topics. Pub/Sub Lite topics may be created like so: if err != nil { // TODO: Handle error. } - topic, err = adminClient.CreateTopic(ctx, topicConfig) - if err != nil { + if _, err = adminClient.CreateTopic(ctx, topicConfig); err != nil { // TODO: Handle error. } See https://cloud.google.com/pubsub/lite/docs/topics for more information about how Pub/Sub Lite topics are configured. -See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions -and zones where Pub/Sub Lite is available. +See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones +where Pub/Sub Lite is available. Publishing @@ -100,8 +116,9 @@ service: // TODO: Handle error. } -Once you've finishing publishing, call Stop to flush all messages to the service -and close gRPC streams: +Once you've finishing publishing all messages, call Stop to flush all messages +to the service and close gRPC streams. The PublisherClient can no longer be used +after it has been stopped or has terminated due to a permanent service error. publisher.Stop() @@ -123,8 +140,7 @@ Pub/Sub Lite subscriptions may be created like so: Topic: topicPath, DeliveryRequirement: pubsublite.DeliverImmediately, } - subscription, err = adminClient.CreateSubscription(ctx, subscriptionConfig) - if err != nil { + if _, err = adminClient.CreateSubscription(ctx, subscriptionConfig); err != nil { // TODO: Handle error. } @@ -138,7 +154,9 @@ To receive messages for a subscription, first create a SubscriberClient: subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath) -Messages are then consumed from a subscription via callback. +Messages are then consumed from a subscription via callback. The callback may be +invoked concurrently by multiple goroutines (one per partition that the +subscriber client is connected to). cctx, cancel := context.WithCancel(ctx) err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) { @@ -149,9 +167,8 @@ Messages are then consumed from a subscription via callback. // TODO: Handle error. } -The callback may be invoked concurrently by multiple goroutines (one per -partition that the subscriber client is connected to). To terminate a call to -Receive, cancel its context: +Receive blocks until either the context is canceled or a fatal service error +occurs. To terminate a call to Receive, cancel its context: cancel() diff --git a/pubsublite/example_test.go b/pubsublite/example_test.go index 8ee45bc1da40..83582725a5dc 100644 --- a/pubsublite/example_test.go +++ b/pubsublite/example_test.go @@ -22,8 +22,14 @@ import ( "google.golang.org/api/iterator" ) +// This example demonstrates how to create a new topic. +// See https://cloud.google.com/pubsub/lite/docs/topics for more information +// about how Pub/Sub Lite topics are configured. +// See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones +// where Pub/Sub Lite is available. func ExampleAdminClient_CreateTopic() { ctx := context.Background() + // NOTE: region must correspond to the zone of the topic. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -47,6 +53,7 @@ func ExampleAdminClient_CreateTopic() { func ExampleAdminClient_UpdateTopic() { ctx := context.Background() + // NOTE: region must correspond to the zone of the topic. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -67,6 +74,7 @@ func ExampleAdminClient_UpdateTopic() { func ExampleAdminClient_DeleteTopic() { ctx := context.Background() + // NOTE: region must correspond to the zone of the topic. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -80,6 +88,7 @@ func ExampleAdminClient_DeleteTopic() { func ExampleAdminClient_Topics() { ctx := context.Background() + // NOTE: region must correspond to the zone below. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -101,6 +110,7 @@ func ExampleAdminClient_Topics() { func ExampleAdminClient_TopicSubscriptions() { ctx := context.Background() + // NOTE: region must correspond to the zone of the topic. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -121,8 +131,12 @@ func ExampleAdminClient_TopicSubscriptions() { } } +// This example demonstrates how to create a new subscription for a topic. +// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more +// information about how subscriptions are configured. func ExampleAdminClient_CreateSubscription() { ctx := context.Background() + // NOTE: region must correspond to the zone of the topic and subscription. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -143,6 +157,7 @@ func ExampleAdminClient_CreateSubscription() { func ExampleAdminClient_UpdateSubscription() { ctx := context.Background() + // NOTE: region must correspond to the zone of the subscription. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -162,6 +177,7 @@ func ExampleAdminClient_UpdateSubscription() { func ExampleAdminClient_DeleteSubscription() { ctx := context.Background() + // NOTE: region must correspond to the zone of the subscription. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. @@ -175,6 +191,7 @@ func ExampleAdminClient_DeleteSubscription() { func ExampleAdminClient_Subscriptions() { ctx := context.Background() + // NOTE: region must correspond to the zone below. admin, err := pubsublite.NewAdminClient(ctx, "region") if err != nil { // TODO: Handle error. diff --git a/pubsublite/pscompat/doc.go b/pubsublite/pscompat/doc.go index ee5662bb4857..87b68b1b4498 100644 --- a/pubsublite/pscompat/doc.go +++ b/pubsublite/pscompat/doc.go @@ -15,9 +15,9 @@ Package pscompat contains clients for publishing and subscribing using the Pub/Sub Lite service. -The clients in this package are designed to compatible with the Cloud Pub/Sub -library: https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are -defined by the client, PublisherClient and SubscriberClient can be used as +This package is designed to compatible with the Cloud Pub/Sub library: +https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the +client application, PublisherClient and SubscriberClient can be used as substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(), respectively, from the pubsub package. diff --git a/pubsublite/pscompat/example_test.go b/pubsublite/pscompat/example_test.go index 67a18c7f4d4a..af02852329e9 100644 --- a/pubsublite/pscompat/example_test.go +++ b/pubsublite/pscompat/example_test.go @@ -36,7 +36,7 @@ func ExamplePublisherClient_Publish() { Data: []byte("hello world"), }) results = append(results, r) - // Do other work ... + // Publish more messages ... for _, r := range results { id, err := r.Get(ctx) if err != nil { @@ -68,7 +68,7 @@ func ExamplePublisherClient_Publish_batchingSettings() { Data: []byte("hello world"), }) results = append(results, r) - // Do other work ... + // Publish more messages ... for _, r := range results { id, err := r.Get(ctx) if err != nil { @@ -92,13 +92,15 @@ func ExamplePublisherClient_Error() { Data: []byte("hello world"), }) results = append(results, r) - // Do other work ... + // Publish more messages ... for _, r := range results { id, err := r.Get(ctx) if err != nil { // TODO: Handle error. if err == pscompat.ErrPublisherStopped { + // Prints the fatal error that caused the publisher to terminate. fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error()) + break } } fmt.Printf("Published a message with a message ID: %s\n", id) @@ -151,6 +153,36 @@ func ExampleSubscriberClient_Receive_maxOutstanding() { // TODO: Handle error. } - // Call cancel from callback, or another goroutine. + // Call cancel from the receiver callback or another goroutine to stop + // receiving. + cancel() +} + +// This example shows how to manually assign which topic partitions a +// SubscriberClient should connect to. If not specified, the SubscriberClient +// will use Pub/Sub Lite's partition assignment service to automatically +// determine which partitions it should connect to. +func ExampleSubscriberClient_Receive_manualPartitionAssignment() { + ctx := context.Background() + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" + settings := pscompat.DefaultReceiveSettings + // NOTE: The corresponding topic must have 2 or more partitions. + settings.Partitions = []int{0, 1} + subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings) + if err != nil { + // TODO: Handle error. + } + cctx, cancel := context.WithCancel(ctx) + err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) { + // TODO: Handle message. + // NOTE: May be called concurrently; synchronize access to shared memory. + m.Ack() + }) + if err != nil { + // TODO: Handle error. + } + + // Call cancel from the receiver callback or another goroutine to stop + // receiving. cancel() }