Skip to content

Commit

Permalink
splitdump uses pubsub apiv1
Browse files Browse the repository at this point in the history
  • Loading branch information
BrunoReboul committed Apr 1, 2020
1 parent db62220 commit 39c755f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 24 deletions.
37 changes: 23 additions & 14 deletions services/splitdump/splitdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"strings"
"time"

"cloud.google.com/go/pubsub"
pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/storage"
"github.com/BrunoReboul/ram/utilities/ram"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)

// Global structure for global variables to optimize the cloud function performances
Expand All @@ -38,7 +39,8 @@ type Global struct {
initFailed bool
retryTimeOutSeconds int64
iamTopicName string
pubSubClient *pubsub.Client
pubsubPublisherClient *pubsub.PublisherClient
projectID string
splitThresholdLineNumber int64
storageBucket *storage.BucketHandle
}
Expand Down Expand Up @@ -78,12 +80,11 @@ func Initialize(ctx context.Context, global *Global) {
var bucketName string
var err error
var ok bool
var projectID string
var storageClient *storage.Client

bucketName = os.Getenv("CAIEXPORTBUCKETNAME")
global.iamTopicName = os.Getenv("IAMTOPICNAME")
projectID = os.Getenv("GCP_PROJECT")
global.projectID = os.Getenv("GCP_PROJECT")

log.Println("Function COLD START")
if global.retryTimeOutSeconds, ok = ram.GetEnvVarInt64("RETRYTIMEOUTSECONDS"); !ok {
Expand All @@ -99,9 +100,9 @@ func Initialize(ctx context.Context, global *Global) {
return
}
global.storageBucket = storageClient.Bucket(bucketName)
global.pubSubClient, err = pubsub.NewClient(ctx, projectID)
global.pubsubPublisherClient, err = pubsub.NewPublisherClient(global.ctx)
if err != nil {
log.Printf("ERROR - pubsub.NewClient: %v", err)
log.Printf("ERROR - global.pubsubPublisherClient: %v", err)
global.initFailed = true
return
}
Expand Down Expand Up @@ -149,7 +150,7 @@ func EntryPoint(ctxEvent context.Context, gcsEvent ram.GCSEvent, global *Global)
defer storageObjectReader.Close()
teeStorageObjectReader := io.TeeReader(storageObjectReader, &buffer)

topicList, err := ram.GetTopicList(global.ctx, global.pubSubClient)
topicList, err := ram.GetTopicList(global.ctx, global.pubsubPublisherClient, global.projectID)
if err != nil {
return fmt.Errorf("getTopicList: %v", err) // RETRY
}
Expand Down Expand Up @@ -308,22 +309,30 @@ func processDumpLine(dumpline string, global *Global, pointerTopubSubMsgNumber *
topicName = "cai-rces-" + getAssetShortTypeName(asset)
}
// log.Println("topicName", topicName)
if err = ram.CreateTopic(global.ctx, global.pubSubClient, topicList, topicName); err != nil {
if err = ram.CreateTopic(global.ctx, global.pubsubPublisherClient, topicList, topicName, global.projectID); err != nil {
log.Printf("Ignored dump line: no topic %s to publish %s %v", topicName, dumpline, err)
} else {
feedMessageJSON, err := json.Marshal(getFeedMessage(asset, startTime))
if err != nil {
log.Println("Error json.Marshal", err)
return err
}
publishRequest := ram.PublishRequest{Topic: topicName}
pubSubMessage := &pubsub.Message{
Data: feedMessageJSON,
}
_, err = global.pubSubClient.Topic(publishRequest.Topic).Publish(global.ctx, pubSubMessage).Get(global.ctx)
var pubSubMessage pubsubpb.PubsubMessage
pubSubMessage.Data = feedMessageJSON

var pubsubMessages []*pubsubpb.PubsubMessage
pubsubMessages = append(pubsubMessages, &pubSubMessage)

var publishRequest pubsubpb.PublishRequest
publishRequest.Topic = fmt.Sprintf("projects/%s/topics/%s", global.projectID, topicName)
publishRequest.Messages = pubsubMessages

pubsubResponse, err := global.pubsubPublisherClient.Publish(global.ctx, &publishRequest)
if err != nil {
log.Printf("ERROR pubSubClient.Topic(publishRequest.Topic).Publish: %v", err)
log.Printf("ERROR global.pubsubPublisherClient.Publish: %v", err) // NO RETRY
}
// log.Printf("Published to pubsub topic %s ids %v %s", topicName, pubsubResponse.MessageIds, string(feedMessageJSON))
_ = pubsubResponse
*pointerTopubSubMsgNumber++
}
}
Expand Down
30 changes: 20 additions & 10 deletions utilities/ram/ram.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (

"cloud.google.com/go/firestore"
"cloud.google.com/go/functions/metadata"
"cloud.google.com/go/pubsub"
pubsubold "cloud.google.com/go/pubsub"
pubsub "cloud.google.com/go/pubsub/apiv1"

"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
admin "google.golang.org/api/admin/directory/v1"
Expand All @@ -41,6 +43,7 @@ import (
"google.golang.org/api/iam/v1"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)

// AssetGroup CAI like format
Expand Down Expand Up @@ -204,26 +207,30 @@ func BuildAncestryPath(ancestors []string) string {
}

// CreateTopic check if a topic already exist, if not create it
func CreateTopic(ctx context.Context, pubSubClient *pubsub.Client, topicList []string, topicName string) error {
func CreateTopic(ctx context.Context, pubSubPulisherClient *pubsub.PublisherClient, topicList []string, topicName string, projectID string) error {
if Find(topicList, topicName) {
return nil
}
// refresh topic list
topicList, err := GetTopicList(ctx, pubSubClient)
topicList, err := GetTopicList(ctx, pubSubPulisherClient, projectID)
if err != nil {
return fmt.Errorf("getTopicList: %v", err)
}
if Find(topicList, topicName) {
return nil
}
topic, err := pubSubClient.CreateTopic(ctx, topicName)
var topicRequested pubsubpb.Topic
topicRequested.Name = fmt.Sprintf("projects/%s/topics/%s", projectID, topicName)
topicRequested.Labels = map[string]string{"name": topicName}

topic, err := pubSubPulisherClient.CreateTopic(ctx, &topicRequested)
if err != nil {
matched, _ := regexp.Match(`.*AlreadyExists.*`, []byte(err.Error()))
if !matched {
return fmt.Errorf("pubSubClient.CreateTopic: %v", err)
return fmt.Errorf("pubSubPulisherClient.CreateTopic: %v", err)
}
}
log.Println("Created topic:", topic.ID())
log.Println("Created topic:", topic.Name)
return nil
}

Expand Down Expand Up @@ -462,7 +469,7 @@ func GetEnvVarUint64(envVarName string) (uint64, bool) {
}

// GetPublishCallResult func to be used in go routine to scale pubsub event publish
func GetPublishCallResult(ctx context.Context, publishResult *pubsub.PublishResult, waitgroup *sync.WaitGroup, msgInfo string, pubSubErrNumber *uint64, pubSubMsgNumber *uint64, logEventEveryXPubSubMsg uint64) {
func GetPublishCallResult(ctx context.Context, publishResult *pubsubold.PublishResult, waitgroup *sync.WaitGroup, msgInfo string, pubSubErrNumber *uint64, pubSubMsgNumber *uint64, logEventEveryXPubSubMsg uint64) {
defer waitgroup.Done()
id, err := publishResult.Get(ctx)
if err != nil {
Expand All @@ -478,9 +485,12 @@ func GetPublishCallResult(ctx context.Context, publishResult *pubsub.PublishResu
}

// GetTopicList retreive the list of existing pubsub topics
func GetTopicList(ctx context.Context, pubSubClient *pubsub.Client) ([]string, error) {
func GetTopicList(ctx context.Context, pubSubPulisherClient *pubsub.PublisherClient, projectID string) ([]string, error) {
var topicList []string
topicsIterator := pubSubClient.Topics(ctx)
var listTopicRequest pubsubpb.ListTopicsRequest
listTopicRequest.Project = fmt.Sprintf("projects/%s", projectID)

topicsIterator := pubSubPulisherClient.ListTopics(ctx, &listTopicRequest)
for {
topic, err := topicsIterator.Next()
if err == iterator.Done {
Expand All @@ -489,7 +499,7 @@ func GetTopicList(ctx context.Context, pubSubClient *pubsub.Client) ([]string, e
if err != nil {
return topicList, fmt.Errorf("topicsIterator.Next: %v", err)
}
topicList = append(topicList, topic.ID())
topicList = append(topicList, topic.Name)
}
return topicList, nil
}
Expand Down

0 comments on commit 39c755f

Please sign in to comment.