Skip to content

Commit

Permalink
GCP Pubsub: cache topics (#3241)
Browse files Browse the repository at this point in the history
Signed-off-by: sadath-12 <[email protected]>
Signed-off-by: syedsadath-17 <[email protected]>
Signed-off-by: Alessandro (Ale) Segala <[email protected]>
Signed-off-by: Bernd Verst <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2023
1 parent 79adc56 commit 8dfa4b6
Showing 1 changed file with 101 additions and 15 deletions.
116 changes: 101 additions & 15 deletions pubsub/gcp/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ type GCPPubSub struct {
metadata *metadata
logger logger.Logger

closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
topicCache map[string]cacheEntry
lock *sync.RWMutex
}

type cacheEntry struct {
LastSync time.Time
}

type GCPAuthJSON struct {
Expand All @@ -76,9 +82,39 @@ type WhatNow struct {
Type string `json:"type"`
}

const topicCacheRefreshInterval = 5 * time.Hour

// NewGCPPubSub returns a new GCPPubSub instance.
func NewGCPPubSub(logger logger.Logger) pubsub.PubSub {
return &GCPPubSub{logger: logger, closeCh: make(chan struct{})}
client := &GCPPubSub{
logger: logger,
closeCh: make(chan struct{}),
topicCache: make(map[string]cacheEntry),
lock: &sync.RWMutex{},
}
return client
}

func (g *GCPPubSub) periodicCacheRefresh() {
// Run this loop 5 times every topicCacheRefreshInterval, to be able to delete items that are stale
ticker := time.NewTicker(topicCacheRefreshInterval / 5)
defer ticker.Stop()

for {
select {
case <-g.closeCh:
return
case <-ticker.C:
g.lock.Lock()
for key, entry := range g.topicCache {
// Delete from the cache if the last sync was longer than topicCacheRefreshInterval
if time.Since(entry.LastSync) > topicCacheRefreshInterval {
delete(g.topicCache, key)
}
}
g.lock.Unlock()
}
}
}

func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
Expand Down Expand Up @@ -110,6 +146,12 @@ func (g *GCPPubSub) Init(ctx context.Context, meta pubsub.Metadata) error {
return err
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
g.periodicCacheRefresh()
}()

pubsubClient, err := g.getPubSubClient(ctx, metadata)
if err != nil {
return fmt.Errorf("%s error creating pubsub client: %w", errorMessagePrefix, err)
Expand Down Expand Up @@ -174,12 +216,22 @@ func (g *GCPPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) err
if g.closed.Load() {
return errors.New("component is closed")
}
g.lock.RLock()
_, topicExists := g.topicCache[req.Topic]
g.lock.RUnlock()

if !g.metadata.DisableEntityManagement {
// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple time
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
if !g.metadata.DisableEntityManagement && !topicExists {
err := g.ensureTopic(ctx, req.Topic)
if err != nil {
return fmt.Errorf("%s could not get valid topic %s, %s", errorMessagePrefix, req.Topic, err)
return fmt.Errorf("%s could not get valid topic %s: %w", errorMessagePrefix, req.Topic, err)
}
g.lock.Lock()
g.topicCache[req.Topic] = cacheEntry{
LastSync: time.Now(),
}
g.lock.Unlock()
}

topic := g.getTopic(req.Topic)
Expand Down Expand Up @@ -210,12 +262,22 @@ func (g *GCPPubSub) Subscribe(parentCtx context.Context, req pubsub.SubscribeReq
if g.closed.Load() {
return errors.New("component is closed")
}
g.lock.RLock()
_, topicExists := g.topicCache[req.Topic]
g.lock.RUnlock()

if !g.metadata.DisableEntityManagement {
// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple times
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
if !g.metadata.DisableEntityManagement && !topicExists {
topicErr := g.ensureTopic(parentCtx, req.Topic)
if topicErr != nil {
return fmt.Errorf("%s could not get valid topic - topic:%q, error: %v", errorMessagePrefix, req.Topic, topicErr)
return fmt.Errorf("%s could not get valid topic - topic:%q, error: %w", errorMessagePrefix, req.Topic, topicErr)
}
g.lock.Lock()
g.topicCache[req.Topic] = cacheEntry{
LastSync: time.Now(),
}
g.lock.Unlock()

subError := g.ensureSubscription(parentCtx, g.metadata.ConsumerID, req.Topic)
if subError != nil {
Expand Down Expand Up @@ -354,9 +416,24 @@ func (g *GCPPubSub) getTopic(topic string) *gcppubsub.Topic {
}

func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription string, topic string) error {
err := g.ensureTopic(parentCtx, topic)
if err != nil {
return err
g.lock.RLock()
_, topicOK := g.topicCache[topic]
_, dlTopicOK := g.topicCache[g.metadata.DeadLetterTopic]
g.lock.RUnlock()
if !topicOK {
g.lock.Lock()
// Double-check if the topic still doesn't exist to avoid race condition
if _, ok := g.topicCache[topic]; !ok {
err := g.ensureTopic(parentCtx, topic)
if err != nil {
g.lock.Unlock()
return err
}
g.topicCache[topic] = cacheEntry{
LastSync: time.Now(),
}
}
g.lock.Unlock()
}

managedSubscription := subscription + "-" + topic
Expand All @@ -369,11 +446,20 @@ func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription s
EnableMessageOrdering: g.metadata.EnableMessageOrdering,
}

if g.metadata.DeadLetterTopic != "" {
subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic)
if subErr != nil {
return subErr
if g.metadata.DeadLetterTopic != "" && !dlTopicOK {
g.lock.Lock()
// Double-check if the DeadLetterTopic still doesn't exist to avoid race condition
if _, ok := g.topicCache[g.metadata.DeadLetterTopic]; !ok {
subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic)
if subErr != nil {
g.lock.Unlock()
return subErr
}
g.topicCache[g.metadata.DeadLetterTopic] = cacheEntry{
LastSync: time.Now(),
}
}
g.lock.Unlock()
dlTopic := fmt.Sprintf("projects/%s/topics/%s", g.metadata.ProjectID, g.metadata.DeadLetterTopic)
subConfig.DeadLetterPolicy = &gcppubsub.DeadLetterPolicy{
DeadLetterTopic: dlTopic,
Expand Down

0 comments on commit 8dfa4b6

Please sign in to comment.