From 8dfa4b663e86bfd12b30b63bd95e3a4787036b6c Mon Sep 17 00:00:00 2001 From: syedsadath-17 <90619459+sadath-12@users.noreply.github.com> Date: Thu, 7 Dec 2023 01:19:49 +0530 Subject: [PATCH] GCP Pubsub: cache topics (#3241) Signed-off-by: sadath-12 Signed-off-by: syedsadath-17 <90619459+sadath-12@users.noreply.github.com> Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Bernd Verst Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Bernd Verst --- pubsub/gcp/pubsub/pubsub.go | 116 +++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 15 deletions(-) diff --git a/pubsub/gcp/pubsub/pubsub.go b/pubsub/gcp/pubsub/pubsub.go index 41f6455bcd..38f65db5ab 100644 --- a/pubsub/gcp/pubsub/pubsub.go +++ b/pubsub/gcp/pubsub/pubsub.go @@ -54,9 +54,15 @@ type GCPPubSub struct { metadata *metadata logger logger.Logger - closed atomic.Bool - closeCh chan struct{} - wg sync.WaitGroup + closed atomic.Bool + closeCh chan struct{} + wg sync.WaitGroup + topicCache map[string]cacheEntry + lock *sync.RWMutex +} + +type cacheEntry struct { + LastSync time.Time } type GCPAuthJSON struct { @@ -76,9 +82,39 @@ type WhatNow struct { Type string `json:"type"` } +const topicCacheRefreshInterval = 5 * time.Hour + // NewGCPPubSub returns a new GCPPubSub instance. func NewGCPPubSub(logger logger.Logger) pubsub.PubSub { - return &GCPPubSub{logger: logger, closeCh: make(chan struct{})} + client := &GCPPubSub{ + logger: logger, + closeCh: make(chan struct{}), + topicCache: make(map[string]cacheEntry), + lock: &sync.RWMutex{}, + } + return client +} + +func (g *GCPPubSub) periodicCacheRefresh() { + // Run this loop 5 times every topicCacheRefreshInterval, to be able to delete items that are stale + ticker := time.NewTicker(topicCacheRefreshInterval / 5) + defer ticker.Stop() + + for { + select { + case <-g.closeCh: + return + case <-ticker.C: + g.lock.Lock() + for key, entry := range g.topicCache { + // Delete from the cache if the last sync was longer than topicCacheRefreshInterval + if time.Since(entry.LastSync) > topicCacheRefreshInterval { + delete(g.topicCache, key) + } + } + g.lock.Unlock() + } + } } func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { @@ -110,6 +146,12 @@ func (g *GCPPubSub) Init(ctx context.Context, meta pubsub.Metadata) error { return err } + g.wg.Add(1) + go func() { + defer g.wg.Done() + g.periodicCacheRefresh() + }() + pubsubClient, err := g.getPubSubClient(ctx, metadata) if err != nil { return fmt.Errorf("%s error creating pubsub client: %w", errorMessagePrefix, err) @@ -174,12 +216,22 @@ func (g *GCPPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) err if g.closed.Load() { return errors.New("component is closed") } + g.lock.RLock() + _, topicExists := g.topicCache[req.Topic] + g.lock.RUnlock() - if !g.metadata.DisableEntityManagement { + // We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple time + // This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent + if !g.metadata.DisableEntityManagement && !topicExists { err := g.ensureTopic(ctx, req.Topic) if err != nil { - return fmt.Errorf("%s could not get valid topic %s, %s", errorMessagePrefix, req.Topic, err) + return fmt.Errorf("%s could not get valid topic %s: %w", errorMessagePrefix, req.Topic, err) + } + g.lock.Lock() + g.topicCache[req.Topic] = cacheEntry{ + LastSync: time.Now(), } + g.lock.Unlock() } topic := g.getTopic(req.Topic) @@ -210,12 +262,22 @@ func (g *GCPPubSub) Subscribe(parentCtx context.Context, req pubsub.SubscribeReq if g.closed.Load() { return errors.New("component is closed") } + g.lock.RLock() + _, topicExists := g.topicCache[req.Topic] + g.lock.RUnlock() - if !g.metadata.DisableEntityManagement { + // We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple times + // This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent + if !g.metadata.DisableEntityManagement && !topicExists { topicErr := g.ensureTopic(parentCtx, req.Topic) if topicErr != nil { - return fmt.Errorf("%s could not get valid topic - topic:%q, error: %v", errorMessagePrefix, req.Topic, topicErr) + return fmt.Errorf("%s could not get valid topic - topic:%q, error: %w", errorMessagePrefix, req.Topic, topicErr) + } + g.lock.Lock() + g.topicCache[req.Topic] = cacheEntry{ + LastSync: time.Now(), } + g.lock.Unlock() subError := g.ensureSubscription(parentCtx, g.metadata.ConsumerID, req.Topic) if subError != nil { @@ -354,9 +416,24 @@ func (g *GCPPubSub) getTopic(topic string) *gcppubsub.Topic { } func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription string, topic string) error { - err := g.ensureTopic(parentCtx, topic) - if err != nil { - return err + g.lock.RLock() + _, topicOK := g.topicCache[topic] + _, dlTopicOK := g.topicCache[g.metadata.DeadLetterTopic] + g.lock.RUnlock() + if !topicOK { + g.lock.Lock() + // Double-check if the topic still doesn't exist to avoid race condition + if _, ok := g.topicCache[topic]; !ok { + err := g.ensureTopic(parentCtx, topic) + if err != nil { + g.lock.Unlock() + return err + } + g.topicCache[topic] = cacheEntry{ + LastSync: time.Now(), + } + } + g.lock.Unlock() } managedSubscription := subscription + "-" + topic @@ -369,11 +446,20 @@ func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription s EnableMessageOrdering: g.metadata.EnableMessageOrdering, } - if g.metadata.DeadLetterTopic != "" { - subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic) - if subErr != nil { - return subErr + if g.metadata.DeadLetterTopic != "" && !dlTopicOK { + g.lock.Lock() + // Double-check if the DeadLetterTopic still doesn't exist to avoid race condition + if _, ok := g.topicCache[g.metadata.DeadLetterTopic]; !ok { + subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic) + if subErr != nil { + g.lock.Unlock() + return subErr + } + g.topicCache[g.metadata.DeadLetterTopic] = cacheEntry{ + LastSync: time.Now(), + } } + g.lock.Unlock() dlTopic := fmt.Sprintf("projects/%s/topics/%s", g.metadata.ProjectID, g.metadata.DeadLetterTopic) subConfig.DeadLetterPolicy = &gcppubsub.DeadLetterPolicy{ DeadLetterTopic: dlTopic,