From dfeb78eed95a0d7bad7d8099786bc805db013b94 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Thu, 5 Jan 2023 19:09:51 -0500 Subject: [PATCH 1/6] feat: expire messages from the cache based on last seen time --- blacklist.go | 12 ++--- go.mod | 6 +-- go.sum | 4 +- pubsub.go | 24 +++++++-- timecache/first_seen_cache.go | 69 +++++++++++++++++++++++++ timecache/first_seen_cache_test.go | 29 +++++++++++ timecache/last_seen_cache.go | 83 ++++++++++++++++++++++++++++++ timecache/last_seen_cache_test.go | 74 ++++++++++++++++++++++++++ timecache/time_cache.go | 32 ++++++++++++ 9 files changed, 314 insertions(+), 19 deletions(-) create mode 100644 timecache/first_seen_cache.go create mode 100644 timecache/first_seen_cache_test.go create mode 100644 timecache/last_seen_cache.go create mode 100644 timecache/last_seen_cache_test.go create mode 100644 timecache/time_cache.go diff --git a/blacklist.go b/blacklist.go index ecaafe8c..2d9bf252 100644 --- a/blacklist.go +++ b/blacklist.go @@ -1,11 +1,11 @@ package pubsub import ( - "sync" "time" "github.com/libp2p/go-libp2p/core/peer" - "github.com/whyrusleeping/timecache" + + "github.com/libp2p/go-libp2p-pubsub/timecache" ) // Blacklist is an interface for peer blacklisting. @@ -34,8 +34,7 @@ func (b MapBlacklist) Contains(p peer.ID) bool { // TimeCachedBlacklist is a blacklist implementation using a time cache type TimeCachedBlacklist struct { - sync.RWMutex - tc *timecache.TimeCache + tc timecache.TimeCache } // NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration @@ -46,8 +45,6 @@ func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) { // Add returns a bool saying whether Add of peer was successful func (b *TimeCachedBlacklist) Add(p peer.ID) bool { - b.Lock() - defer b.Unlock() s := p.String() if b.tc.Has(s) { return false @@ -57,8 +54,5 @@ func (b *TimeCachedBlacklist) Add(p peer.ID) bool { } func (b *TimeCachedBlacklist) Contains(p peer.ID) bool { - b.RLock() - defer b.RUnlock() - return b.tc.Has(p.String()) } diff --git a/go.mod b/go.mod index 231497f1..c0695a55 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,15 @@ go 1.17 require ( github.com/benbjohnson/clock v1.3.0 + github.com/emirpasic/gods v1.18.1 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log v1.0.5 + github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-libp2p v0.22.0 github.com/libp2p/go-libp2p-testing v0.12.0 github.com/libp2p/go-msgio v0.2.0 github.com/multiformats/go-multiaddr v0.6.0 - github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee + github.com/multiformats/go-varint v0.0.6 ) require ( @@ -36,7 +38,6 @@ require ( github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/klauspost/compress v1.15.1 // indirect github.com/klauspost/cpuid/v2 v2.1.0 // indirect - github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-netroute v0.2.0 // indirect github.com/libp2p/go-openssl v0.1.0 // indirect @@ -62,7 +63,6 @@ require ( github.com/multiformats/go-multicodec v0.5.0 // indirect github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-multistream v0.3.3 // indirect - github.com/multiformats/go-varint v0.0.6 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect diff --git a/go.sum b/go.sum index 84d10209..3b40bead 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -544,8 +546,6 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= -github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pubsub.go b/pubsub.go index b94a1ecd..efeda931 100644 --- a/pubsub.go +++ b/pubsub.go @@ -11,6 +11,7 @@ import ( "time" pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p-pubsub/timecache" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/discovery" @@ -20,7 +21,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" logging "github.com/ipfs/go-log" - "github.com/whyrusleeping/timecache" ) // DefaultMaximumMessageSize is 1mb. @@ -31,6 +31,10 @@ var ( // Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default. TimeCacheDuration = 120 * time.Second + // TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache. + // Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default. + TimeCacheStrategy = timecache.Strategy_LastSeen + // ErrSubscriptionCancelled may be returned when a subscription Next() is called after the // subscription has been cancelled. ErrSubscriptionCancelled = errors.New("subscription cancelled") @@ -148,9 +152,10 @@ type PubSub struct { inboundStreamsMx sync.Mutex inboundStreams map[peer.ID]network.Stream - seenMessagesMx sync.Mutex - seenMessages *timecache.TimeCache - seenMsgTTL time.Duration + seenMessagesMx sync.Mutex + seenMessages timecache.TimeCache + seenMsgTTL time.Duration + seenMsgStrategy timecache.Strategy // generator used to compute the ID for a message idGen *msgIDGenerator @@ -286,6 +291,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMsgTTL: TimeCacheDuration, + seenMsgStrategy: TimeCacheStrategy, idGen: newMsgIdGenerator(), counter: uint64(time.Now().UnixNano()), } @@ -307,7 +313,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option } } - ps.seenMessages = timecache.NewTimeCache(ps.seenMsgTTL) + ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL) if err := ps.disc.Start(ps); err != nil { return nil, err @@ -533,6 +539,14 @@ func WithSeenMessagesTTL(ttl time.Duration) Option { } } +// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache +func WithSeenMessagesStrategy(strategy timecache.Strategy) Option { + return func(ps *PubSub) error { + ps.seenMsgStrategy = strategy + return nil + } +} + // WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to // processing them. The inspector is invoked on an accepted RPC just before it // is handled. If inspector's error is nil, the RPC is handled. Otherwise, it diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go new file mode 100644 index 00000000..468e6d1a --- /dev/null +++ b/timecache/first_seen_cache.go @@ -0,0 +1,69 @@ +package timecache + +import ( + "container/list" + "sync" + "time" +) + +// FirstSeenTimeCache is a thread-safe copy of https://github.com/whyrusleeping/timecache. +type FirstSeenTimeCache struct { + q *list.List + m map[string]time.Time + span time.Duration + guard *sync.RWMutex +} + +func newFirstSeenCache(span time.Duration) TimeCache { + return &FirstSeenTimeCache{ + q: list.New(), + m: make(map[string]time.Time), + span: span, + guard: new(sync.RWMutex), + } +} + +func (tc FirstSeenTimeCache) Add(s string) { + tc.guard.Lock() + defer tc.guard.Unlock() + + _, ok := tc.m[s] + if ok { + panic("putting the same entry twice not supported") + } + + tc.sweep() + + tc.m[s] = time.Now() + tc.q.PushFront(s) +} + +func (tc FirstSeenTimeCache) sweep() { + for { + back := tc.q.Back() + if back == nil { + return + } + + v := back.Value.(string) + t, ok := tc.m[v] + if !ok { + panic("inconsistent cache state") + } + + if time.Since(t) > tc.span { + tc.q.Remove(back) + delete(tc.m, v) + } else { + return + } + } +} + +func (tc FirstSeenTimeCache) Has(s string) bool { + tc.guard.RLock() + defer tc.guard.RUnlock() + + _, ok := tc.m[s] + return ok +} diff --git a/timecache/first_seen_cache_test.go b/timecache/first_seen_cache_test.go new file mode 100644 index 00000000..c551beb9 --- /dev/null +++ b/timecache/first_seen_cache_test.go @@ -0,0 +1,29 @@ +package timecache + +import ( + "fmt" + "testing" + "time" +) + +func TestFirstSeenCacheEntriesFound(t *testing.T) { + tc := newFirstSeenCache(time.Minute) + + tc.Add("test") + + if !tc.Has("test") { + t.Fatal("should have this key") + } +} + +func TestFirstSeenCacheEntriesExpire(t *testing.T) { + tc := newFirstSeenCache(time.Second) + for i := 0; i < 11; i++ { + tc.Add(fmt.Sprint(i)) + time.Sleep(time.Millisecond * 100) + } + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} diff --git a/timecache/last_seen_cache.go b/timecache/last_seen_cache.go new file mode 100644 index 00000000..af74f043 --- /dev/null +++ b/timecache/last_seen_cache.go @@ -0,0 +1,83 @@ +package timecache + +import ( + "sync" + "time" + + "github.com/emirpasic/gods/maps/linkedhashmap" +) + +// LastSeenTimeCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has +// elapsed, "old" entries will be purged from the cache. +// +// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This +// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network +// issues that might increase the number of duplicate messages in the network. +// +// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the +// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one +// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache. +// +// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache. +type LastSeenTimeCache struct { + m *linkedhashmap.Map + span time.Duration + guard *sync.Mutex +} + +func newLastSeenCache(span time.Duration) TimeCache { + return &LastSeenTimeCache{ + m: linkedhashmap.New(), + span: span, + guard: new(sync.Mutex), + } +} + +func (tc *LastSeenTimeCache) Add(s string) { + tc.guard.Lock() + defer tc.guard.Unlock() + + tc.add(s) + + // Garbage collect expired entries + tc.gc() +} + +func (tc *LastSeenTimeCache) add(s string) { + // We don't need a lock here because this function is always called with the lock already acquired. + + // If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and + // an accurate sliding window. + tc.m.Remove(s) + now := time.Now() + tc.m.Put(s, &now) +} + +func (tc *LastSeenTimeCache) gc() { + // We don't need a lock here because this function is always called with the lock already acquired. + iter := tc.m.Iterator() + for iter.Next() { + key := iter.Key() + ts := iter.Value().(*time.Time) + // Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all + // entries hereafter will be unexpired. + if time.Since(*ts) <= tc.span { + return + } + tc.m.Remove(key) + } +} + +func (tc *LastSeenTimeCache) Has(s string) bool { + tc.guard.Lock() + defer tc.guard.Unlock() + + // If the entry exists and has not already expired, slide it forward. + if ts, found := tc.m.Get(s); found { + if t := ts.(*time.Time); time.Since(*t) <= tc.span { + tc.add(s) + return true + } + } + return false +} diff --git a/timecache/last_seen_cache_test.go b/timecache/last_seen_cache_test.go new file mode 100644 index 00000000..6ce68940 --- /dev/null +++ b/timecache/last_seen_cache_test.go @@ -0,0 +1,74 @@ +package timecache + +import ( + "fmt" + "testing" + "time" +) + +func TestLastSeenCacheFound(t *testing.T) { + tc := newLastSeenCache(time.Minute) + + tc.Add("test") + + if !tc.Has("test") { + t.Fatal("should have this key") + } +} + +func TestLastSeenCacheExpire(t *testing.T) { + tc := newLastSeenCache(time.Second) + for i := 0; i < 11; i++ { + tc.Add(fmt.Sprint(i)) + time.Sleep(time.Millisecond * 100) + } + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} + +func TestLastSeenCacheSlideForward(t *testing.T) { + tc := newLastSeenCache(time.Second) + i := 0 + + // T0ms: Add 8 entries with a 100ms sleep after each + for i < 8 { + tc.Add(fmt.Sprint(i)) + time.Sleep(time.Millisecond * 100) + i++ + } + + // T800ms: Lookup the first entry - this should slide the entry forward so that its expiration is a full second + // later. + if !tc.Has(fmt.Sprint(0)) { + t.Fatal("should have this key") + } + + // T800ms: Wait till after the first and second entries would have normally expired (had we not looked the first + // entry up). + time.Sleep(time.Millisecond * 400) + + // T1200ms: The first entry should still be present in the cache - this will also slide the entry forward. + if !tc.Has(fmt.Sprint(0)) { + t.Fatal("should still have this key") + } + + // T1200ms: The second entry should have expired + if tc.Has(fmt.Sprint(1)) { + t.Fatal("should have dropped this from the cache already") + } + + // T1200ms: Sleep till the first entry actually expires + time.Sleep(time.Millisecond * 1100) + + // T2300ms: Now the first entry should have expired + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } + + // And it should not have been added back + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} diff --git a/timecache/time_cache.go b/timecache/time_cache.go new file mode 100644 index 00000000..f10ff214 --- /dev/null +++ b/timecache/time_cache.go @@ -0,0 +1,32 @@ +package timecache + +import "time" + +type Strategy uint8 + +const ( + Strategy_FirstSeen Strategy = iota + Strategy_LastSeen +) + +type TimeCache interface { + Add(string) + Has(string) bool +} + +// NewTimeCache defaults to the original ("first seen") cache implementation +func NewTimeCache(span time.Duration) TimeCache { + return NewTimeCacheWithStrategy(Strategy_FirstSeen, span) +} + +func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache { + switch strategy { + case Strategy_FirstSeen: + return newFirstSeenCache(span) + case Strategy_LastSeen: + return newLastSeenCache(span) + default: + // Default to the original time cache implementation + return newFirstSeenCache(span) + } +} From e6faa7bbf5448321d1d86168b5824268273f8721 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Thu, 5 Jan 2023 19:26:05 -0500 Subject: [PATCH 2/6] chore: minor renaming --- timecache/first_seen_cache.go | 12 ++++++------ timecache/first_seen_cache_test.go | 4 ++-- timecache/last_seen_cache.go | 16 ++++++++-------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go index 468e6d1a..ec8d2061 100644 --- a/timecache/first_seen_cache.go +++ b/timecache/first_seen_cache.go @@ -6,8 +6,8 @@ import ( "time" ) -// FirstSeenTimeCache is a thread-safe copy of https://github.com/whyrusleeping/timecache. -type FirstSeenTimeCache struct { +// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache. +type FirstSeenCache struct { q *list.List m map[string]time.Time span time.Duration @@ -15,7 +15,7 @@ type FirstSeenTimeCache struct { } func newFirstSeenCache(span time.Duration) TimeCache { - return &FirstSeenTimeCache{ + return &FirstSeenCache{ q: list.New(), m: make(map[string]time.Time), span: span, @@ -23,7 +23,7 @@ func newFirstSeenCache(span time.Duration) TimeCache { } } -func (tc FirstSeenTimeCache) Add(s string) { +func (tc FirstSeenCache) Add(s string) { tc.guard.Lock() defer tc.guard.Unlock() @@ -38,7 +38,7 @@ func (tc FirstSeenTimeCache) Add(s string) { tc.q.PushFront(s) } -func (tc FirstSeenTimeCache) sweep() { +func (tc FirstSeenCache) sweep() { for { back := tc.q.Back() if back == nil { @@ -60,7 +60,7 @@ func (tc FirstSeenTimeCache) sweep() { } } -func (tc FirstSeenTimeCache) Has(s string) bool { +func (tc FirstSeenCache) Has(s string) bool { tc.guard.RLock() defer tc.guard.RUnlock() diff --git a/timecache/first_seen_cache_test.go b/timecache/first_seen_cache_test.go index c551beb9..98ee7501 100644 --- a/timecache/first_seen_cache_test.go +++ b/timecache/first_seen_cache_test.go @@ -6,7 +6,7 @@ import ( "time" ) -func TestFirstSeenCacheEntriesFound(t *testing.T) { +func TestFirstSeenCacheFound(t *testing.T) { tc := newFirstSeenCache(time.Minute) tc.Add("test") @@ -16,7 +16,7 @@ func TestFirstSeenCacheEntriesFound(t *testing.T) { } } -func TestFirstSeenCacheEntriesExpire(t *testing.T) { +func TestFirstSeenCacheExpire(t *testing.T) { tc := newFirstSeenCache(time.Second) for i := 0; i < 11; i++ { tc.Add(fmt.Sprint(i)) diff --git a/timecache/last_seen_cache.go b/timecache/last_seen_cache.go index af74f043..a2995fb0 100644 --- a/timecache/last_seen_cache.go +++ b/timecache/last_seen_cache.go @@ -7,8 +7,8 @@ import ( "github.com/emirpasic/gods/maps/linkedhashmap" ) -// LastSeenTimeCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has -// elapsed, "old" entries will be purged from the cache. +// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed, +// "old" entries will be purged from the cache. // // It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This // keeps frequently occurring entries cached and prevents them from being propagated, especially because of network @@ -19,21 +19,21 @@ import ( // appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache. // // This implementation is heavily inspired by https://github.com/whyrusleeping/timecache. -type LastSeenTimeCache struct { +type LastSeenCache struct { m *linkedhashmap.Map span time.Duration guard *sync.Mutex } func newLastSeenCache(span time.Duration) TimeCache { - return &LastSeenTimeCache{ + return &LastSeenCache{ m: linkedhashmap.New(), span: span, guard: new(sync.Mutex), } } -func (tc *LastSeenTimeCache) Add(s string) { +func (tc *LastSeenCache) Add(s string) { tc.guard.Lock() defer tc.guard.Unlock() @@ -43,7 +43,7 @@ func (tc *LastSeenTimeCache) Add(s string) { tc.gc() } -func (tc *LastSeenTimeCache) add(s string) { +func (tc *LastSeenCache) add(s string) { // We don't need a lock here because this function is always called with the lock already acquired. // If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and @@ -53,7 +53,7 @@ func (tc *LastSeenTimeCache) add(s string) { tc.m.Put(s, &now) } -func (tc *LastSeenTimeCache) gc() { +func (tc *LastSeenCache) gc() { // We don't need a lock here because this function is always called with the lock already acquired. iter := tc.m.Iterator() for iter.Next() { @@ -68,7 +68,7 @@ func (tc *LastSeenTimeCache) gc() { } } -func (tc *LastSeenTimeCache) Has(s string) bool { +func (tc *LastSeenCache) Has(s string) bool { tc.guard.Lock() defer tc.guard.Unlock() From 8571b8a614959944114dee67374bf4beaf382973 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 10 Jan 2023 10:52:25 -0500 Subject: [PATCH 3/6] fix: messages should not be found after expiration --- timecache/first_seen_cache.go | 7 +++++-- timecache/first_seen_cache_test.go | 10 ++++++++++ timecache/last_seen_cache_test.go | 10 ++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go index ec8d2061..7b1a7885 100644 --- a/timecache/first_seen_cache.go +++ b/timecache/first_seen_cache.go @@ -64,6 +64,9 @@ func (tc FirstSeenCache) Has(s string) bool { tc.guard.RLock() defer tc.guard.RUnlock() - _, ok := tc.m[s] - return ok + // Only consider the entry found if it was present in the cache AND hadn't already expired. + if ts, ok := tc.m[s]; ok { + return time.Since(ts) <= tc.span + } + return false } diff --git a/timecache/first_seen_cache_test.go b/timecache/first_seen_cache_test.go index 98ee7501..adacf8a9 100644 --- a/timecache/first_seen_cache_test.go +++ b/timecache/first_seen_cache_test.go @@ -27,3 +27,13 @@ func TestFirstSeenCacheExpire(t *testing.T) { t.Fatal("should have dropped this from the cache already") } } + +func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) { + tc := newFirstSeenCache(time.Second) + tc.Add(fmt.Sprint(0)) + time.Sleep(1100 * time.Millisecond) + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} diff --git a/timecache/last_seen_cache_test.go b/timecache/last_seen_cache_test.go index 6ce68940..49d0c068 100644 --- a/timecache/last_seen_cache_test.go +++ b/timecache/last_seen_cache_test.go @@ -72,3 +72,13 @@ func TestLastSeenCacheSlideForward(t *testing.T) { t.Fatal("should have dropped this from the cache already") } } + +func TestLastSeenCacheNotFoundAfterExpire(t *testing.T) { + tc := newLastSeenCache(time.Second) + tc.Add(fmt.Sprint(0)) + time.Sleep(1100 * time.Millisecond) + + if tc.Has(fmt.Sprint(0)) { + t.Fatal("should have dropped this from the cache already") + } +} From 04bfcf58514f59bddb650fec5b2edac018c8c406 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Tue, 10 Jan 2023 10:57:24 -0500 Subject: [PATCH 4/6] chore: editorial --- timecache/first_seen_cache.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go index 7b1a7885..7f68f81b 100644 --- a/timecache/first_seen_cache.go +++ b/timecache/first_seen_cache.go @@ -64,9 +64,7 @@ func (tc FirstSeenCache) Has(s string) bool { tc.guard.RLock() defer tc.guard.RUnlock() + ts, ok := tc.m[s] // Only consider the entry found if it was present in the cache AND hadn't already expired. - if ts, ok := tc.m[s]; ok { - return time.Since(ts) <= tc.span - } - return false + return ok && time.Since(ts) <= tc.span } From f7c6da67bdd17bcf9fb10a05bcee8964c6a0c7b3 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 16 Jan 2023 10:34:07 -0500 Subject: [PATCH 5/6] fix: use new time cache strategy consistently --- timecache/time_cache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/timecache/time_cache.go b/timecache/time_cache.go index f10ff214..bf513926 100644 --- a/timecache/time_cache.go +++ b/timecache/time_cache.go @@ -14,9 +14,9 @@ type TimeCache interface { Has(string) bool } -// NewTimeCache defaults to the original ("first seen") cache implementation +// NewTimeCache defaults to the new ("last seen") cache implementation func NewTimeCache(span time.Duration) TimeCache { - return NewTimeCacheWithStrategy(Strategy_FirstSeen, span) + return NewTimeCacheWithStrategy(Strategy_LastSeen, span) } func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache { @@ -26,7 +26,7 @@ func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache { case Strategy_LastSeen: return newLastSeenCache(span) default: - // Default to the original time cache implementation - return newFirstSeenCache(span) + // Default to the new time cache implementation + return newLastSeenCache(span) } } From c3bfcfac95e395286d2742e4bad8feff087b00b2 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 23 Jan 2023 15:41:27 -0500 Subject: [PATCH 6/6] fix: default to old time cache and add todo for background gc --- pubsub.go | 2 +- timecache/first_seen_cache.go | 1 + timecache/last_seen_cache.go | 1 + timecache/time_cache.go | 8 ++++---- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pubsub.go b/pubsub.go index efeda931..9bddf138 100644 --- a/pubsub.go +++ b/pubsub.go @@ -33,7 +33,7 @@ var ( // TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache. // Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default. - TimeCacheStrategy = timecache.Strategy_LastSeen + TimeCacheStrategy = timecache.Strategy_FirstSeen // ErrSubscriptionCancelled may be returned when a subscription Next() is called after the // subscription has been cancelled. diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go index 7f68f81b..f8626aeb 100644 --- a/timecache/first_seen_cache.go +++ b/timecache/first_seen_cache.go @@ -32,6 +32,7 @@ func (tc FirstSeenCache) Add(s string) { panic("putting the same entry twice not supported") } + // TODO(#515): Do GC in the background tc.sweep() tc.m[s] = time.Now() diff --git a/timecache/last_seen_cache.go b/timecache/last_seen_cache.go index a2995fb0..daaa629f 100644 --- a/timecache/last_seen_cache.go +++ b/timecache/last_seen_cache.go @@ -40,6 +40,7 @@ func (tc *LastSeenCache) Add(s string) { tc.add(s) // Garbage collect expired entries + // TODO(#515): Do GC in the background tc.gc() } diff --git a/timecache/time_cache.go b/timecache/time_cache.go index bf513926..f10ff214 100644 --- a/timecache/time_cache.go +++ b/timecache/time_cache.go @@ -14,9 +14,9 @@ type TimeCache interface { Has(string) bool } -// NewTimeCache defaults to the new ("last seen") cache implementation +// NewTimeCache defaults to the original ("first seen") cache implementation func NewTimeCache(span time.Duration) TimeCache { - return NewTimeCacheWithStrategy(Strategy_LastSeen, span) + return NewTimeCacheWithStrategy(Strategy_FirstSeen, span) } func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache { @@ -26,7 +26,7 @@ func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache { case Strategy_LastSeen: return newLastSeenCache(span) default: - // Default to the new time cache implementation - return newLastSeenCache(span) + // Default to the original time cache implementation + return newFirstSeenCache(span) } }