Skip to content

Commit

Permalink
fix(gcp): Fix pubsub runtime integrations caching (#704)
Browse files Browse the repository at this point in the history
* fix(gcp): Add lock to pubsub cache.

* and queue and secret locking.

* stop cache from only populating first requested match.

* stop queues from caching first request queue only.
  • Loading branch information
tjholm authored Dec 5, 2024
1 parent 73055d1 commit 587eff8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
7 changes: 6 additions & 1 deletion cloud/gcp/runtime/queue/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/nitrictech/nitric/cloud/common/deploy/resources"
"github.com/nitrictech/nitric/cloud/common/deploy/tags"
Expand Down Expand Up @@ -50,12 +51,16 @@ type PubsubQueueService struct {
newSubscriberClient func(ctx context.Context, opts ...option.ClientOption) (ifaces_pubsub.SubscriberClient, error)
projectId string
cache map[string]ifaces_pubsub.Topic
cacheLock sync.Mutex
}

// Retrieves the Nitric "Queue Topic" for the specified queue (PubSub Topic).
//
// This retrieves the default Nitric Queue for the Topic based on tagging conventions.
func (s *PubsubQueueService) getPubsubTopicFromName(queue string) (ifaces_pubsub.Topic, error) {
s.cacheLock.Lock()
defer s.cacheLock.Unlock()

if s.cache == nil {
topics := s.client.Topics(context.Background())
s.cache = make(map[string]ifaces_pubsub.Topic)
Expand All @@ -76,7 +81,7 @@ func (s *PubsubQueueService) getPubsubTopicFromName(queue string) (ifaces_pubsub

resType, hasType := labels[tags.GetResourceTypeKey(stackID)]

if name, ok := labels[tags.GetResourceNameKey(stackID)]; ok && name == queue && hasType && resType == "queue" {
if name, ok := labels[tags.GetResourceNameKey(stackID)]; ok && hasType && resType == "queue" {
s.cache[name] = t
}
}
Expand Down
31 changes: 18 additions & 13 deletions cloud/gcp/runtime/secret/secret_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"strings"
"sync"

"github.com/nitrictech/nitric/cloud/common/deploy/tags"
"github.com/nitrictech/nitric/cloud/gcp/runtime/env"
Expand All @@ -43,6 +44,7 @@ type SecretManagerSecretService struct {
projectId string
stackName string
cache map[string]string
cacheLock sync.Mutex
}

var _ secretpb.SecretManagerServer = &SecretManagerSecretService{}
Expand Down Expand Up @@ -82,38 +84,41 @@ func (s *SecretManagerSecretService) buildSecretVersionName(ctx context.Context,
return "", fmt.Errorf("provide non-blank version")
}

parent, inCache := s.cache[sv.Secret.Name]
if !inCache {
realSec, err := s.getSecret(ctx, sv.Secret)
if err != nil {
return "", err
}

parent = realSec.Name
parent, err := s.getSecret(ctx, sv.Secret)
if err != nil {
return "", err
}

return fmt.Sprintf("%s/versions/%s", parent, sv.Version), nil
}

// ensure a secret container exists for storing secret versions
func (s *SecretManagerSecretService) getSecret(ctx context.Context, sec *secretpb.Secret) (*secretmanagerpb.Secret, error) {
func (s *SecretManagerSecretService) getSecret(ctx context.Context, sec *secretpb.Secret) (string, error) {
s.cacheLock.Lock()
defer s.cacheLock.Unlock()

parent, inCache := s.cache[sec.Name]
if inCache {
return parent, nil
}

iter := s.client.ListSecrets(ctx, &secretmanagerpb.ListSecretsRequest{
Parent: s.getParentName(),
Filter: fmt.Sprintf("labels.%s=%s", tags.GetResourceNameKey(env.GetNitricStackID()), sec.Name),
})

result, err := iter.Next()
if errors.Is(err, iterator.Done) {
return nil, status.Error(grpccodes.NotFound, "secret not found")
return "", status.Error(grpccodes.NotFound, "secret not found")
}

if err != nil {
return nil, err
return "", err
}

s.cache[sec.Name] = result.Name

return result, nil
return result.Name, nil
}

// Put - Creates a new secret if one doesn't exist, or just adds a new secret version
Expand All @@ -139,7 +144,7 @@ func (s *SecretManagerSecretService) Put(ctx context.Context, req *secretpb.Secr
}

verResult, err := s.client.AddSecretVersion(ctx, &secretmanagerpb.AddSecretVersionRequest{
Parent: parentSec.Name,
Parent: parentSec,
Payload: &secretmanagerpb.SecretPayload{
Data: req.Value,
},
Expand Down
9 changes: 7 additions & 2 deletions cloud/gcp/runtime/topic/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -50,12 +51,16 @@ type PubsubEventService struct {
resource.GcpResourceResolver
client ifaces_pubsub.PubsubClient
tasksClient ifaces_cloudtasks.CloudtasksClient
cacheLock sync.Mutex
cache map[string]ifaces_pubsub.Topic
}

var _ topicpb.TopicsServer = &PubsubEventService{}

func (s *PubsubEventService) getPubsubTopicFromName(topic string) (ifaces_pubsub.Topic, error) {
s.cacheLock.Lock()
defer s.cacheLock.Unlock()

if s.cache == nil {
topics := s.client.Topics(context.Background())
s.cache = make(map[string]ifaces_pubsub.Topic)
Expand All @@ -75,7 +80,7 @@ func (s *PubsubEventService) getPubsubTopicFromName(topic string) (ifaces_pubsub

resType, hasType := labels[tags.GetResourceTypeKey(env.GetNitricStackID())]

if name, ok := labels[tags.GetResourceNameKey(env.GetNitricStackID())]; ok && hasType && name == topic && resType == "topic" {
if name, ok := labels[tags.GetResourceNameKey(env.GetNitricStackID())]; ok && hasType && resType == "topic" {
s.cache[name] = t
}
}
Expand All @@ -85,7 +90,7 @@ func (s *PubsubEventService) getPubsubTopicFromName(topic string) (ifaces_pubsub
return topic, nil
}

return nil, fmt.Errorf("topic not found")
return nil, fmt.Errorf("topic %s not found", topic)
}

type httpPubsubMessage struct {
Expand Down

0 comments on commit 587eff8

Please sign in to comment.