Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: V4.0.0 #9

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
502 changes: 368 additions & 134 deletions README.md

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn
}

// If we got a perfect batch size, we can refresh the records immediately.
if len(ids) == c.batchSize {
if len(ids) == c.bufferSize {
c.refreshBatch(ids, keyFn, fetchFn)
return
}

c.batchMutex.Lock()

// If the ids are greater than our batch size we'll have to chunk them.
if len(ids) > c.batchSize {
idsToRefresh := ids[:c.batchSize]
overflowingIDs := ids[c.batchSize:]
if len(ids) > c.bufferSize {
idsToRefresh := ids[:c.bufferSize]
overflowingIDs := ids[c.bufferSize:]
c.batchMutex.Unlock()

// These IDs are the size we want, so we'll refresh them immediately.
Expand Down Expand Up @@ -102,7 +102,7 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn
c.bufferPermutationIDs[permutationString] = append(c.bufferPermutationIDs[permutationString], additionalIDs...)

// If we haven't reached the batch size yet, we'll wait for more ids.
if len(c.bufferPermutationIDs[permutationString]) < c.batchSize {
if len(c.bufferPermutationIDs[permutationString]) < c.bufferSize {
c.batchMutex.Unlock()
continue
}
Expand All @@ -117,8 +117,8 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn
deleteBuffer(c, permutationString)
c.batchMutex.Unlock()

idsToRefresh := permIDs[:c.batchSize]
overflowingIDs := permIDs[c.batchSize:]
idsToRefresh := permIDs[:c.bufferSize]
overflowingIDs := permIDs[c.bufferSize:]

// Refresh the first batch of IDs immediately.
safeGo(func() {
Expand Down
15 changes: 10 additions & 5 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)
Expand Down Expand Up @@ -97,7 +98,8 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)
Expand Down Expand Up @@ -189,7 +191,8 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)
Expand Down Expand Up @@ -257,7 +260,8 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
c := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)
Expand Down Expand Up @@ -347,7 +351,8 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)
Expand Down
32 changes: 19 additions & 13 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,19 @@ type Config struct {
evictionInterval time.Duration
metricsRecorder MetricsRecorder

refreshesEnabled bool
minRefreshTime time.Duration
maxRefreshTime time.Duration
retryBaseDelay time.Duration
storeMisses bool
refreshInBackground bool
minRefreshTime time.Duration
maxRefreshTime time.Duration
retryBaseDelay time.Duration
storeMissingRecords bool

bufferRefreshes bool
batchMutex sync.Mutex
batchSize int
bufferSize int
bufferTimeout time.Duration
bufferPermutationIDs map[string][]string
bufferPermutationChan map[string]chan<- []string

passthroughPercentage int
passthroughBuffering bool

useRelativeTimeKeyFormat bool
keyTruncation time.Duration
getSize func() int
Expand Down Expand Up @@ -84,10 +81,9 @@ func New[T any](capacity, numShards int, ttl time.Duration, evictionPercentage i

// Create a default configuration, and then apply the options.
cfg := &Config{
clock: NewClock(),
passthroughPercentage: 100,
evictionInterval: ttl / time.Duration(numShards),
getSize: client.Size,
clock: NewClock(),
evictionInterval: ttl / time.Duration(numShards),
getSize: client.Size,
}
// Apply the options to the configuration.
client.Config = cfg
Expand Down Expand Up @@ -161,6 +157,16 @@ func (c *Client[T]) Get(key string) (T, bool) {
return val, ok && !ignore
}

func (c *Client[T]) GetMany(ids []string, keyFn KeyFn) map[string]T {
records := make(map[string]T, len(ids))
for _, id := range ids {
if value, ok := c.Get(keyFn(id)); ok {
records[id] = value
}
}
return records
}

// SetMissing writes a single value to the cache. Returns true if it triggered an eviction.
func (c *Client[T]) SetMissing(key string, value T, isMissingRecord bool) bool {
shard := c.getShard(key)
Expand Down
89 changes: 86 additions & 3 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,96 @@
package main

import (
"context"
"log"
"math/rand/v2"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/creativecreature/sturdyc"
)

func demonstrateGetFetch(cacheClient *sturdyc.Client[int]) {
// The cache has built-in stampede protection where it tracks in-flight
// requests for every key.
var count atomic.Int32
fetchFn := func(_ context.Context) (int, error) {
count.Add(1)
time.Sleep(time.Second)
return 1337, nil
}

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
// We can ignore the error given the fetchFn we're using.
val, _ := cacheClient.GetFetch(context.Background(), "key2", fetchFn)
log.Printf("got value: %d\n", val)
wg.Done()
}()
}
wg.Wait()

log.Printf("fetchFn was called %d time\n", count.Load())
log.Println(cacheClient.Get("key2"))
log.Println("")
}

func demonstrateGetFetchBatch(cacheClient *sturdyc.Client[int]) {
var count atomic.Int32
fetchFn := func(_ context.Context, ids []string) (map[string]int, error) {
count.Add(1)
time.Sleep(time.Second * 5)

response := make(map[string]int, len(ids))
for _, id := range ids {
num, _ := strconv.Atoi(id)
response[id] = num
}

return response, nil
}

batches := [][]string{
{"1", "2", "3", "4", "5"},
{"6", "7", "8", "9", "10"},
{"11", "12", "13", "14", "15"},
}

// We'll use a cache key function to add a prefix to the IDs. If we only used
// the IDs, we wouldn't be able to fetch the same IDs from multiple data sources.
keyPrefixFn := cacheClient.BatchKeyFn("my-data-source")

// Request the keys for each batch.
for _, batch := range batches {
go func() {
res, _ := cacheClient.GetFetchBatch(context.Background(), batch, keyPrefixFn, fetchFn)
log.Printf("got batch: %v\n", res)
}()
}

// Give the goroutines above a chance to run to ensure that the batches are in-flight.
time.Sleep(time.Second * 3)

// Launch another 5 goroutines that are going to pick two random IDs from any of the batches.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
ids := []string{batches[rand.IntN(2)][rand.IntN(4)], batches[rand.IntN(2)][rand.IntN(4)]}
res, _ := cacheClient.GetFetchBatch(context.Background(), ids, keyPrefixFn, fetchFn)
log.Printf("got batch: %v\n", res)
wg.Done()
}()
}

wg.Wait()
log.Printf("fetchFn was called %d times\n", count.Load())
}

func main() {
// Maximum number of entries in the sturdyc.
capacity := 10000
Expand All @@ -25,7 +109,6 @@ func main() {
log.Println(cacheClient.Size())
log.Println(cacheClient.Get("key1"))

cacheClient.Delete("key1")
log.Println(cacheClient.Size())
log.Println(cacheClient.Get("key1"))
demonstrateGetFetch(cacheClient)
demonstrateGetFetchBatch(cacheClient)
}
4 changes: 1 addition & 3 deletions examples/batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,10 @@ func main() {
maxRefreshDelay := time.Second * 2
// The base for exponential backoff when retrying a refresh.
retryBaseDelay := time.Millisecond * 10
// Tell the cache to store missing records.
storeMisses := true

// Create a cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
)

// Create a new API instance with the cache client.
Expand Down
3 changes: 1 addition & 2 deletions examples/buffering/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func main() {
retryBaseDelay := time.Millisecond * 10
// Whether to store misses in the sturdyc. This can be useful to
// prevent the cache from fetching a non-existent key repeatedly.
storeMisses := true

// With refresh buffering enabled, the cache will buffer refreshes
// until the batch size is reached or the buffer timeout is hit.
Expand All @@ -69,7 +68,7 @@ func main() {

// Create a new cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
)

Expand Down
5 changes: 1 addition & 4 deletions examples/generics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,10 @@ func main() {
maxRefreshDelay := time.Second * 2
// The base for exponential backoff when retrying a refresh.
retryBaseDelay := time.Millisecond * 10
// Whether to store misses in the sturdyc. This can be useful to
// prevent the cache from fetching a non-existent key repeatedly.
storeMisses := true

// Create a new cache client with the specified configuration.
cacheClient := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
sturdyc.WithRefreshBuffering(10, time.Second*15),
)

Expand Down
5 changes: 2 additions & 3 deletions examples/missing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ func main() {
maxRefreshDelay := time.Millisecond * 30
// The base for exponential backoff when retrying a refresh.
retryBaseDelay := time.Millisecond * 10
// Tell the cache to store missing records.
storeMisses := true

// Create a cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
sturdyc.WithMissingRecordStorage(),
)

// Create a new API instance with the cache client.
Expand Down
5 changes: 1 addition & 4 deletions examples/permutations/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,10 @@ func main() {
maxRefreshDelay := time.Second * 2
// The base for exponential backoff when retrying a refresh.
retryBaseDelay := time.Millisecond * 10
// Whether to store misses in the sturdyc. This can be useful to
// prevent the cache from fetching a non-existent key repeatedly.
storeMisses := true

// Create a new cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
)

// We will fetch these IDs using various option sets, meaning that the ID alone
Expand Down
26 changes: 13 additions & 13 deletions examples/stampede/main.go → examples/refreshes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ func main() {
evictionPercentage := 10

// ===========================================================
// =================== Stampede protection ===================
// =================== Background refreshes ==================
// ===========================================================
// Set a minimum and maximum refresh delay for the record. This is
// used to spread out the refreshes for our entries evenly over time.
// We don't want our outgoing requests to look like a comb.
// used to spread out the refreshes of our entries evenly over time.
// We don't want our outgoing requests graph to look like a comb.
minRefreshDelay := time.Millisecond * 10
maxRefreshDelay := time.Millisecond * 30
// The base used for exponential backoff when retrying a refresh. Most of the time, we perform
// refreshes well in advance of the records expiry time. Hence, we can help a system that is
// having trouble to get back on its feet by making fewer refreshes. Once we receive a
// successful response, the refreshes return to their original frequency.
// The base used for exponential backoff when retrying a refresh. Most of the
// time, we perform refreshes well in advance of the records expiry time.
// Hence, we can use this to make it easier for a system that is having
// trouble to get back on it's feet by making fewer refreshes when we're
// seeing a lot of errors. Once we receive a successful response, the
// refreshes return to their original frequency. You can set this to 0
// if you don't want this behavior.
retryBaseDelay := time.Millisecond * 10
// NOTE: Ignore this for now, it will be shown in the next example.
storeMisses := false

// Create a cache client with the specified configuration.
cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses),
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
)

// Create a new API instance with the cache client.
api := NewAPI(cacheClient)

// We are going to retrieve the values every 10 milliseconds, however the
// logs will reveal that actual refreshes fluctuate randomly within a 10-30
// millisecond range. Even if this loop is executed across multiple
// goroutines, the API call frequency will maintain this variability,
// ensuring we avoid overloading the API with requests.
// millisecond range. Even if this loop is executed across multiple goroutines,
// the API call frequency will maintain this variability.
for i := 0; i < 100; i++ {
val, err := api.Get(context.Background(), "key")
if err != nil {
Expand Down
Loading
Loading