diff --git a/services/splitdump/splitdump.go b/services/splitdump/splitdump.go index 89c9e31..8ddab28 100644 --- a/services/splitdump/splitdump.go +++ b/services/splitdump/splitdump.go @@ -27,9 +27,10 @@ import ( "strings" "time" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/apiv1" "cloud.google.com/go/storage" "github.com/BrunoReboul/ram/utilities/ram" + pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" ) // Global structure for global variables to optimize the cloud function performances @@ -38,7 +39,8 @@ type Global struct { initFailed bool retryTimeOutSeconds int64 iamTopicName string - pubSubClient *pubsub.Client + pubsubPublisherClient *pubsub.PublisherClient + projectID string splitThresholdLineNumber int64 storageBucket *storage.BucketHandle } @@ -78,12 +80,11 @@ func Initialize(ctx context.Context, global *Global) { var bucketName string var err error var ok bool - var projectID string var storageClient *storage.Client bucketName = os.Getenv("CAIEXPORTBUCKETNAME") global.iamTopicName = os.Getenv("IAMTOPICNAME") - projectID = os.Getenv("GCP_PROJECT") + global.projectID = os.Getenv("GCP_PROJECT") log.Println("Function COLD START") if global.retryTimeOutSeconds, ok = ram.GetEnvVarInt64("RETRYTIMEOUTSECONDS"); !ok { @@ -99,9 +100,9 @@ func Initialize(ctx context.Context, global *Global) { return } global.storageBucket = storageClient.Bucket(bucketName) - global.pubSubClient, err = pubsub.NewClient(ctx, projectID) + global.pubsubPublisherClient, err = pubsub.NewPublisherClient(global.ctx) if err != nil { - log.Printf("ERROR - pubsub.NewClient: %v", err) + log.Printf("ERROR - global.pubsubPublisherClient: %v", err) global.initFailed = true return } @@ -149,7 +150,7 @@ func EntryPoint(ctxEvent context.Context, gcsEvent ram.GCSEvent, global *Global) defer storageObjectReader.Close() teeStorageObjectReader := io.TeeReader(storageObjectReader, &buffer) - topicList, err := ram.GetTopicList(global.ctx, global.pubSubClient) + topicList, err := ram.GetTopicList(global.ctx, global.pubsubPublisherClient, global.projectID) if err != nil { return fmt.Errorf("getTopicList: %v", err) // RETRY } @@ -308,7 +309,7 @@ func processDumpLine(dumpline string, global *Global, pointerTopubSubMsgNumber * topicName = "cai-rces-" + getAssetShortTypeName(asset) } // log.Println("topicName", topicName) - if err = ram.CreateTopic(global.ctx, global.pubSubClient, topicList, topicName); err != nil { + if err = ram.CreateTopic(global.ctx, global.pubsubPublisherClient, topicList, topicName, global.projectID); err != nil { log.Printf("Ignored dump line: no topic %s to publish %s %v", topicName, dumpline, err) } else { feedMessageJSON, err := json.Marshal(getFeedMessage(asset, startTime)) @@ -316,14 +317,22 @@ func processDumpLine(dumpline string, global *Global, pointerTopubSubMsgNumber * log.Println("Error json.Marshal", err) return err } - publishRequest := ram.PublishRequest{Topic: topicName} - pubSubMessage := &pubsub.Message{ - Data: feedMessageJSON, - } - _, err = global.pubSubClient.Topic(publishRequest.Topic).Publish(global.ctx, pubSubMessage).Get(global.ctx) + var pubSubMessage pubsubpb.PubsubMessage + pubSubMessage.Data = feedMessageJSON + + var pubsubMessages []*pubsubpb.PubsubMessage + pubsubMessages = append(pubsubMessages, &pubSubMessage) + + var publishRequest pubsubpb.PublishRequest + publishRequest.Topic = fmt.Sprintf("projects/%s/topics/%s", global.projectID, topicName) + publishRequest.Messages = pubsubMessages + + pubsubResponse, err := global.pubsubPublisherClient.Publish(global.ctx, &publishRequest) if err != nil { - log.Printf("ERROR pubSubClient.Topic(publishRequest.Topic).Publish: %v", err) + log.Printf("ERROR global.pubsubPublisherClient.Publish: %v", err) // NO RETRY } + // log.Printf("Published to pubsub topic %s ids %v %s", topicName, pubsubResponse.MessageIds, string(feedMessageJSON)) + _ = pubsubResponse *pointerTopubSubMsgNumber++ } } diff --git a/utilities/ram/ram.go b/utilities/ram/ram.go index 1b05215..381568c 100644 --- a/utilities/ram/ram.go +++ b/utilities/ram/ram.go @@ -31,7 +31,9 @@ import ( "cloud.google.com/go/firestore" "cloud.google.com/go/functions/metadata" - "cloud.google.com/go/pubsub" + pubsubold "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/apiv1" + "golang.org/x/oauth2/google" "golang.org/x/oauth2/jwt" admin "google.golang.org/api/admin/directory/v1" @@ -41,6 +43,7 @@ import ( "google.golang.org/api/iam/v1" "google.golang.org/api/iterator" "google.golang.org/api/option" + pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" ) // AssetGroup CAI like format @@ -204,26 +207,30 @@ func BuildAncestryPath(ancestors []string) string { } // CreateTopic check if a topic already exist, if not create it -func CreateTopic(ctx context.Context, pubSubClient *pubsub.Client, topicList []string, topicName string) error { +func CreateTopic(ctx context.Context, pubSubPulisherClient *pubsub.PublisherClient, topicList []string, topicName string, projectID string) error { if Find(topicList, topicName) { return nil } // refresh topic list - topicList, err := GetTopicList(ctx, pubSubClient) + topicList, err := GetTopicList(ctx, pubSubPulisherClient, projectID) if err != nil { return fmt.Errorf("getTopicList: %v", err) } if Find(topicList, topicName) { return nil } - topic, err := pubSubClient.CreateTopic(ctx, topicName) + var topicRequested pubsubpb.Topic + topicRequested.Name = fmt.Sprintf("projects/%s/topics/%s", projectID, topicName) + topicRequested.Labels = map[string]string{"name": topicName} + + topic, err := pubSubPulisherClient.CreateTopic(ctx, &topicRequested) if err != nil { matched, _ := regexp.Match(`.*AlreadyExists.*`, []byte(err.Error())) if !matched { - return fmt.Errorf("pubSubClient.CreateTopic: %v", err) + return fmt.Errorf("pubSubPulisherClient.CreateTopic: %v", err) } } - log.Println("Created topic:", topic.ID()) + log.Println("Created topic:", topic.Name) return nil } @@ -462,7 +469,7 @@ func GetEnvVarUint64(envVarName string) (uint64, bool) { } // GetPublishCallResult func to be used in go routine to scale pubsub event publish -func GetPublishCallResult(ctx context.Context, publishResult *pubsub.PublishResult, waitgroup *sync.WaitGroup, msgInfo string, pubSubErrNumber *uint64, pubSubMsgNumber *uint64, logEventEveryXPubSubMsg uint64) { +func GetPublishCallResult(ctx context.Context, publishResult *pubsubold.PublishResult, waitgroup *sync.WaitGroup, msgInfo string, pubSubErrNumber *uint64, pubSubMsgNumber *uint64, logEventEveryXPubSubMsg uint64) { defer waitgroup.Done() id, err := publishResult.Get(ctx) if err != nil { @@ -478,9 +485,12 @@ func GetPublishCallResult(ctx context.Context, publishResult *pubsub.PublishResu } // GetTopicList retreive the list of existing pubsub topics -func GetTopicList(ctx context.Context, pubSubClient *pubsub.Client) ([]string, error) { +func GetTopicList(ctx context.Context, pubSubPulisherClient *pubsub.PublisherClient, projectID string) ([]string, error) { var topicList []string - topicsIterator := pubSubClient.Topics(ctx) + var listTopicRequest pubsubpb.ListTopicsRequest + listTopicRequest.Project = fmt.Sprintf("projects/%s", projectID) + + topicsIterator := pubSubPulisherClient.ListTopics(ctx, &listTopicRequest) for { topic, err := topicsIterator.Next() if err == iterator.Done { @@ -489,7 +499,7 @@ func GetTopicList(ctx context.Context, pubSubClient *pubsub.Client) ([]string, e if err != nil { return topicList, fmt.Errorf("topicsIterator.Next: %v", err) } - topicList = append(topicList, topic.ID()) + topicList = append(topicList, topic.Name) } return topicList, nil }