Skip to content

Commit

Permalink
Test new pubsub api
Browse files Browse the repository at this point in the history
  • Loading branch information
BrunoReboul committed Mar 30, 2020
1 parent 112bf8c commit 7acd487
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions services/getgroupsettings/getgroupsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"google.golang.org/api/groupssettings/v1"
"google.golang.org/api/option"

"cloud.google.com/go/pubsub"
// pubsubold "cloud.google.com/go/pubsub"
pubsub "cloud.google.com/go/pubsub/apiv1"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)

// Global structure for global variables to optimize the cloud function performances
Expand All @@ -34,7 +36,8 @@ type Global struct {
groupsSettingsService *groupssettings.Service
initFailed bool
outputTopicName string
pubSubClient *pubsub.Client
// pubSubClient *pubsubold.Client
pubsubPublisherClient *pubsub.PublisherClient
retryTimeOutSeconds int64
}

Expand Down Expand Up @@ -71,9 +74,15 @@ func Initialize(ctx context.Context, global *Global) {
global.initFailed = true
return
}
global.pubSubClient, err = pubsub.NewClient(ctx, projectID)
// global.pubSubClient, err = pubsubold.NewClient(ctx, projectID)
// if err != nil {
// log.Printf("ERROR - pubsubold.NewClient: %v", err)
// global.initFailed = true
// return
// }
global.pubsubPublisherClient, err = pubsub.NewPublisherClient(global.ctx)
if err != nil {
log.Printf("ERROR - pubsub.NewClient: %v", err)
log.Printf("ERROR - global.pubsubPublisherClient: %v", err)
global.initFailed = true
return
}
Expand Down Expand Up @@ -111,20 +120,37 @@ func EntryPoint(ctxEvent context.Context, PubSubMessage ram.PubSubMessage, globa
feedMessageGroupSettings.Asset.Resource = groupSettings
}

publishRequest := ram.PublishRequest{Topic: global.outputTopicName}
feedMessageGroupSettingsJSON, err := json.Marshal(feedMessageGroupSettings)
if err != nil {
log.Println("ERROR - json.Unmarshal(pubSubMessage.Data, &feedMessageGroup)")
return nil // NO RETRY
}
pubSubMessage := &pubsub.Message{
Data: feedMessageGroupSettingsJSON,
}
id, err := global.pubSubClient.Topic(publishRequest.Topic).Publish(global.ctx, pubSubMessage).Get(global.ctx)

// publishRequest := ram.PublishRequest{Topic: global.outputTopicName}
// pubSubMessage := &pubsubold.Message{
// Data: feedMessageGroupSettingsJSON,
// }
// id, err := global.pubSubClient.Topic(publishRequest.Topic).Publish(global.ctx, pubSubMessage).Get(global.ctx)
// if err != nil {
// return fmt.Errorf("pubSubClient.Topic(publishRequest.Topic).Publish: %v", err) // RETRY
// }
// log.Printf("Group %s %s settings published to pubsub topic %s id %s %s", feedMessageGroup.Asset.Resource.Id, feedMessageGroup.Asset.Resource.Email, global.outputTopicName, id, string(feedMessageGroupSettingsJSON))

var pubSubMessage pubsubpb.PubsubMessage
pubSubMessage.Data = feedMessageGroupSettingsJSON

var pubsubMessages []*pubsubpb.PubsubMessage
pubsubMessages = append(pubsubMessages, &pubSubMessage)

var publishRequestv1 pubsubpb.PublishRequest
publishRequestv1.Topic = global.outputTopicName
publishRequestv1.Messages = pubsubMessages

pubsubResponse, err := global.pubsubPublisherClient.Publish(global.ctx, &publishRequestv1)
if err != nil {
return fmt.Errorf("pubSubClient.Topic(publishRequest.Topic).Publish: %v", err) // RETRY
return fmt.Errorf("global.pubsubPublisherClient.Publish: %v", err) // RETRY
}
log.Printf("Group %s %s settings published to pubsub topic %s id %s %s", feedMessageGroup.Asset.Resource.Id, feedMessageGroup.Asset.Resource.Email, global.outputTopicName, id, string(feedMessageGroupSettingsJSON))
log.Printf("Group %s %s settings published to pubsub topic %s ids %v %s", feedMessageGroup.Asset.Resource.Id, feedMessageGroup.Asset.Resource.Email, global.outputTopicName, pubsubResponse.MessageIds, string(feedMessageGroupSettingsJSON))

return nil
}

0 comments on commit 7acd487

Please sign in to comment.