Skip to content

Commit

Permalink
minor changes
Browse files Browse the repository at this point in the history
Signed-off-by: sadath-12 <[email protected]>
  • Loading branch information
sadath-12 committed Nov 29, 2023
1 parent 006b841 commit 1101382
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 38 deletions.
10 changes: 7 additions & 3 deletions pubsub/gcp/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type WhatNow struct {
Type string `json:"type"`
}

var topicCacheRefreshInterval = 5 * time.Hour
const topicCacheRefreshInterval = 5 * time.Hour

// NewGCPPubSub returns a new GCPPubSub instance.
func NewGCPPubSub(logger logger.Logger) pubsub.PubSub {
Expand All @@ -96,7 +96,7 @@ func NewGCPPubSub(logger logger.Logger) pubsub.PubSub {
}

func (g *GCPPubSub) periodicCacheRefresh() {
ticker := time.NewTicker(topicCacheRefreshInterval)
ticker := time.NewTicker(topicCacheRefreshInterval / 5)
defer ticker.Stop()

for {
Expand All @@ -106,7 +106,7 @@ func (g *GCPPubSub) periodicCacheRefresh() {
case <-ticker.C:
g.lock.Lock()
for key, entry := range g.topicCache {
if time.Since(entry.LastSync) > topicCacheRefreshInterval {
if time.Since(entry.LastSync) > topicCacheRefreshInterval/5 {
delete(g.topicCache, key)
}
}
Expand Down Expand Up @@ -218,6 +218,8 @@ func (g *GCPPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) err
_, topicExists := g.topicCache[req.Topic]
g.lock.RUnlock()

// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple time
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
if !g.metadata.DisableEntityManagement && !topicExists {
err := g.ensureTopic(ctx, req.Topic)
if err != nil {
Expand Down Expand Up @@ -262,6 +264,8 @@ func (g *GCPPubSub) Subscribe(parentCtx context.Context, req pubsub.SubscribeReq
_, topicExists := g.topicCache[req.Topic]
g.lock.RUnlock()

// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple times
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
if !g.metadata.DisableEntityManagement && !topicExists {
topicErr := g.ensureTopic(parentCtx, req.Topic)
if topicErr != nil {
Expand Down
35 changes: 0 additions & 35 deletions pubsub/gcp/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ package pubsub

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/dapr/kit/logger"

"github.com/dapr/components-contrib/pubsub"
)

Expand Down Expand Up @@ -131,35 +128,3 @@ func TestInit(t *testing.T) {
assert.ErrorContains(t, err, "connectionRecoveryInSec")
})
}

func TestTopicCache(t *testing.T) {
var testLogger logger.Logger
client := NewGCPPubSub(testLogger)
gClient := client.(*GCPPubSub)

topicCacheRefreshInterval = 3 * time.Second

gClient.wg.Add(1)

topicCache := gClient.topicCache

t.Run("test refresh cache", func(t *testing.T) {
topicCache["topic1"] = cacheEntry{
LastSync: time.Now(),
}
go func() {
defer gClient.wg.Done()
gClient.periodicCacheRefresh()
}()
_, exists := topicCache["topic1"]
assert.True(t, exists)

time.Sleep(4 * time.Second)
close(gClient.closeCh)

_, exists = topicCache["topic1"]
assert.False(t, exists)

gClient.wg.Wait()
})
}

0 comments on commit 1101382

Please sign in to comment.