From edadd64fb362d8c07710f456707d6449cb01c70a Mon Sep 17 00:00:00 2001 From: Victor Conner Date: Wed, 22 May 2024 19:58:13 +0200 Subject: [PATCH 1/2] feat: Tidy up the API --- README.md | 502 +++++++++++++++++------ buffer.go | 14 +- buffer_test.go | 15 +- cache.go | 32 +- examples/basic/main.go | 89 +++- examples/batch/main.go | 4 +- examples/buffering/main.go | 3 +- examples/generics/main.go | 5 +- examples/missing/main.go | 5 +- examples/permutations/main.go | 5 +- examples/{stampede => refreshes}/main.go | 26 +- fetch_test.go | 28 +- inflight.go | 4 +- options.go | 55 +-- options_test.go | 36 +- passthrough.go | 106 +---- passthrough_test.go | 125 ++---- refresh.go | 6 +- shard.go | 4 +- 19 files changed, 599 insertions(+), 465 deletions(-) rename examples/{stampede => refreshes}/main.go (73%) diff --git a/README.md b/README.md index 95ac675..416f3dd 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ of them build on each other, and many share configurations. Here is a brief overview of what the examples are going to cover: - [**stampede protection**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#stampede-protection) +- [**background refreshes**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#background-refreshes) - [**caching non-existent records**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#non-existent-records) - [**caching batch endpoints per record**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#batch-endpoints) - [**cache key permutations**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#cache-key-permutations) @@ -27,6 +28,8 @@ overview of what the examples are going to cover: - [**request passthrough**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#passthrough) - [**custom metrics**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#custom-metrics) - [**generics**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#generics) +- [**generics**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#generics) +- [**distributed caching**](https://github.com/creativecreature/sturdyc?tab=readme-ov-file#distributed-caching) One thing that makes this package unique is that it has great support for batchable data sources. The cache takes responses apart, and then caches each @@ -34,14 +37,10 @@ record individually based on the permutations of the options with which it was fetched. The options could be query params, SQL filters, or anything else that could affect the data. -Records can be configured to refresh either based on time or at a certain rate -of requests. All refreshes occur in the background, ensuring that users never -have to wait for a record to be updated, resulting in _very low latency_ -applications while also allowing unused keys to expire. - The cache can also help you significantly reduce your traffic to the underlying -data sources by creating buffers for each unique option set, and then delaying -the refreshes of the data until it has gathered enough IDs. +data sources by performing background refreshes with buffers for each unique +option set, and then delaying the refreshes of the data until it has gathered +enough IDs. Below is a screenshot showing the latency improvements we've observed after replacing our old cache with this package: @@ -113,41 +112,188 @@ particular piece of data, which has just expired or been evicted from the cache, come in at once Preventing this has been one of the key objectives for this package. We do not -want to cause a significant load on the underlying data source every time a -record expires. +want to cause a significant load on the underlying data source every time a key +is missing or a record expires. + +The `GetFetch` function takes a key and a function for retrieving the data if +it's not in the cache. The cache is going to ensure that we never have more +than a single request per key. It achieves this by tracking all of the +in-flight requests: + +```go + var count atomic.Int32 + fetchFn := func(_ context.Context) (int, error) { + count.Add(1) + time.Sleep(time.Second) + return 1337, nil + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + // We can ignore the error given the fetchFn we're using. + val, _ := cacheClient.GetFetch(context.Background(), "key2", fetchFn) + log.Printf("got value: %d\n", val) + wg.Done() + }() + } + wg.Wait() -`sturdyc` handles this by scheduling background refreshes which prevents the -records from ever expiring. A refresh gets scheduled if a key is **requested -again** after a certain amount of time has passed. This is an important -distinction because it means that it doesn't just naively refresh every key -it's ever seen. Instead, it only refreshes the records that are actually in -rotation, while allowing the unused keys to expire. + log.Printf("fetchFn was called %d time\n", count.Load()) + log.Println(cacheClient.Get("key2")) -To get this functionality, we can enable **stampede protection**: +``` + +Running this program we'll see that our requests for "key2" were deduplicated, +and that the fetchFn only got called once: + +```sh +❯ go run . +2024/05/21 08:06:29 got value: 1337 +2024/05/21 08:06:29 got value: 1337 +2024/05/21 08:06:29 got value: 1337 +2024/05/21 08:06:29 got value: 1337 +2024/05/21 08:06:29 got value: 1337 +2024/05/21 08:06:29 fetchFn was called 1 time +2024/05/21 08:06:29 1337 true +``` + +We can use the `GetFetchBatch` function for datasources that supports batching. +To demonstrate this, I'll create a mock function that sleeps for `5` seconds, +and then returns a map with a numerical value for every ID: + +```go + var count atomic.Int32 + fetchFn := func(_ context.Context, ids []string) (map[string]int, error) { + count.Add(1) + time.Sleep(time.Second * 5) + + response := make(map[string]int, len(ids)) + for _, id := range ids { + num, _ := strconv.Atoi(id) + response[id] = num + } + + return response, nil + } +``` + +Next, we'll need some batches to test with, so I created three batches with 5 +IDs each: + +```go + batches := [][]string{ + {"1", "2", "3", "4", "5"}, + {"6", "7", "8", "9", "10"}, + {"11", "12", "13", "14", "15"}, + } +``` + +IDs can often be fetched from multiple data sources. Hence, we'll want to +prefix the ID in order to make the cache key unique. The package provides more +functionality for this that we'll see later on, but for now we'll use the most +simple version which adds a string prefix to every ID: + +```go + keyPrefixFn := cacheClient.BatchKeyFn("my-data-source") +``` + +we can now request each batch in a separate goroutine: + +```go + for _, batch := range batches { + go func() { + res, _ := cacheClient.GetFetchBatch(context.Background(), batch, keyPrefixFn, fetchFn) + log.Printf("got batch: %v\n", res) + }() + } + + // Give the goroutines above a chance to run to ensure that the batches are in-flight. + time.Sleep(time.Second * 3) +``` + +At this point, the cache should have in-flight requests for IDs 1-15. Knowing +this, we'll test the stampede protection by launching another five goroutines. +Each goroutine is going to request two random IDs from our batches: + +```go + // Launch another 5 goroutines that are going to pick two random IDs from any of the batches. + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + ids := []string{batches[rand.IntN(2)][rand.IntN(4)], batches[rand.IntN(2)][rand.IntN(4)]} + res, _ := cacheClient.GetFetchBatch(context.Background(), ids, keyPrefixFn, fetchFn) + log.Printf("got batch: %v\n", res) + wg.Done() + }() + } + + wg.Wait() + log.Printf("fetchFn was called %d times\n", count.Load()) +``` + +Running this program, and looking at the logs, we'll see that the cache is able +to pick IDs from different batches: + +```sh +❯ go run . +2024/05/21 09:14:23 got batch: map[8:8 9:9] +2024/05/21 09:14:23 got batch: map[4:4 9:9] <---- NOTE: ID 4 and 9 are part of different batches +2024/05/21 09:14:23 got batch: map[11:11 12:12 13:13 14:14 15:15] +2024/05/21 09:14:23 got batch: map[1:1 7:7] <---- NOTE: ID 1 and 7 are part of different batches +2024/05/21 09:14:23 got batch: map[10:10 6:6 7:7 8:8 9:9] +2024/05/21 09:14:23 got batch: map[3:3 9:9] <---- NOTE: ID 3 and 9 are part of different batches +2024/05/21 09:14:23 got batch: map[1:1 2:2 3:3 4:4 5:5] +2024/05/21 09:14:23 got batch: map[4:4 9:9] <---- NOTE: ID 4 and 9 are part of different batches +2024/05/21 09:14:23 fetchFn was called 3 times <---- NOTE: We only generated 3 outgoing requests. +``` + +And on the last line, we can see that we only generated 3 outgoing requests. The +entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/basic) + +## Background refreshes + +It's fairly common to consume a datasource where you have a rough idea of how +often the data might change, or where it is acceptable for the data to be a +couple of milliseconds old. It could also be that the datasource has a rate +limit, and that you're only allowed to query it once every second. + +For these type of use cases, you can configure the cache to perform background +refreshes. A refresh is scheduled if a key is **requested again** after a +configurable amount of time has passed. This is an important distinction +because it means that the cache doesn't just naively refresh every key it's +ever seen. Instead, it only refreshes the records that are actually in +rotation, while allowing unused keys to be deleted once their TTL expires. + +Below is an example configuration: ```go func main() { - // Set a minimum and maximum refresh delay for the records. This is - // used to spread out the refreshes for our entries evenly over time. - // We don't want our outgoing requests to look like a comb. + // Set a minimum and maximum refresh delay for the record. This is + // used to spread out the refreshes of our entries evenly over time. + // We don't want our outgoing requests graph to look like a comb. minRefreshDelay := time.Millisecond * 10 maxRefreshDelay := time.Millisecond * 30 - // The base used for exponential backoff when retrying a refresh. Most of the time, we perform - // refreshes well in advance of the records expiry time. Hence, we can help a system that is - // having trouble to get back on its feet by making fewer refreshes. Once we receive a - // successful response, the refreshes return to their original frequency. + // The base used for exponential backoff when retrying a refresh. Most of the + // time, we perform refreshes well in advance of the records expiry time. + // Hence, we can use this to make it easier for a system that is having + // trouble to get back on it's feet by making fewer refreshes when we're + // seeing a lot of errors. Once we receive a successful response, the + // refreshes return to their original frequency. You can set this to 0 + // if you don't want this behavior. retryBaseDelay := time.Millisecond * 10 - // NOTE: Ignore this for now, it will be shown in the next example. - storeMisses := false + // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), ) } ``` -To test that our configuration works as intended, we'll create a simple API -client: +To test that it works as intended, we'll create a simple API client that +embedds the cache: ```go type API struct { @@ -159,10 +305,8 @@ func NewAPI(c *sturdyc.Client[string]) *API { } func (a *API) Get(ctx context.Context, key string) (string, error) { - // This could be an API call, a database query, etc. The only requirement is - // that the function adheres to the `sturdyc.FetchFn` signature. Remember - // that you can use closures to capture additional values. - fetchFn := func(_ context.Context) (string, error) { + // This could be an API call, a database query, etc. + fetchFn := func(_ context.Context) (string, error) { log.Printf("Fetching value for key: %s\n", key) return "value", nil } @@ -170,7 +314,8 @@ func (a *API) Get(ctx context.Context, key string) (string, error) { } ``` -and then create an instance of it within our `main` function: +and then return to our `main` function to create an instance of it and call the +`Get` method in a loop: ```go func main() { @@ -212,8 +357,21 @@ go run . ... ``` +This is going to reduce your response times significantly because none of your +consumers will have to wait for the I/O operation that refreshes the data. It's +always performed in the background. + +Additionally, you'll be able to provide a degraded experience by continuously +serving the most recent data you have cached even when an upstream system +encounters issues and the refreshes begin to fail. This data will never be +older than the defined TTL. Therefore, the values for `minRefreshDelay` and +`maxRefreshDelay` that we pass to `sturdyc.WithBackgroundRefreshes` should +specify an optimal interval of how fresh we'd like the data to be. The `TTL` +should be set to a duration where the data is considered too outdated to be +useful. + Now what if the record was deleted? Our cache might use a 2-hour-long TTL, and -we definitely don't want to have a deleted record stay around for that long. +we definitely don't want it to take that long for the deletion to propagate. However, if we were to modify our client so that it returns an error after the first request: @@ -287,11 +445,15 @@ for every refresh, but the value is still being printed: ``` This is a bit tricky because this is actually the functionality we want. If an -upstream goes down, we want to be able to serve stale and reduce the frequency -of the refreshes to make it easier for them to recover. +upstream system goes down, we want to be able to serve stale data for the +duration of the TTL, while reducing the frequency of our refreshes to make it +easier for the system to recover. How you determine if a record has been deleted is going to vary based on your -data source it there is no way for the cache to figure this out automatically. +data source too. It could be a status code, zero value, empty list, specific +error message, etc. There is no way for the cache to figure this out +implicitly. + Therefore, if a record is deleted, we'll have to explicitly inform the cache about it by returning a custom error: @@ -307,7 +469,7 @@ fetchFn := func(_ context.Context) (string, error) { ``` If we run this application again we'll see that it works, and that we're no -longer getting any cache hits which leads to outgoing requests for every +longer getting any cache hits. This leads to outgoing requests for every iteration: ```go @@ -325,14 +487,35 @@ iteration: 2024/05/09 13:40:47 Failed to retrieve the record from the cache. ``` -The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/stampede) +**Please note** that we only have to return the `sturdyc.ErrDeleteRecord` when +we're using `GetFetch`. For `GetFetchBatch`, we'll simply omit the key from the +map we're returning. I think this inconsistency is a little unfortunate, but it +was the best API I could think of. Having to return an error like this even if +the call was successful felt much worse: + +```go + batchFetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) { + response, err := myDataSource(cacheMisses) + for _, id := range cacheMisses { + // NOTE: Don't do this, it's just an example. + if response[id]; !id { + return response, sturdyc.ErrDeleteRecord + } + } + return response, nil + } +``` + +The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/refreshes) # Non-existent records -In the example above, we can see that asking for keys that don't exist leads to -a continuous stream of outgoing requests, as we're never able to get a cache -hit and serve from memory. If this happens frequently, it's going to -significantly increase our system's latency. +In the example above, we could see that once we delete the key, the following +iterations lead to a continuous stream of outgoing requests. This will happen +for every ID that never existed as well. If we can't retrieve it, we can't +cache it. If we can't cache it, we can't serve it from memory. If this happens +frequently, we'll experience a lot of I/O operations, which will significantly +increase our system's latency. The reasons why someone might request IDs that don't exist can vary. It could be due to a faulty CMS configuration, or perhaps it's caused by a slow @@ -340,14 +523,15 @@ ingestion process where it takes time for a new entity to propagate through a distributed system. Regardless, this will negatively impact our systems performance. -To address this issue, we can instruct the cache to store these IDs as missing +To address this issue, we can instruct the cache to mark these IDs as missing records. Missing records are refreshed at the same frequency as regular records. Hence, if an ID is continuously requested, and the upstream eventually -returns a valid response, the record will no longer be marked by the cache as -missing. +returns a valid response, we'll see it propagate to our cache. -To illustrate, we'll make some small modifications to the code from the -previous example: +To illustrate, I'll make some small modifications to the code from the previous +example. The only thing I'm going to change is to make the API client return a +`ErrStoreMissingRecord` error for the first three requests. This error informs +the cache that it should mark this key as missing. ```go type API struct { @@ -366,31 +550,26 @@ func (a *API) Get(ctx context.Context, key string) (string, error) { if a.count > 3 { return "value", nil } + // This error tells the cache that the data does not exist at the source. return "", sturdyc.ErrStoreMissingRecord } return a.GetFetch(ctx, key, fetchFn) } ``` -The only thing that's changed, is that we're now returning `ErrStoreMissingRecord` -for the first 3 request. This error informs the cache that it should mark this -key as missing. - -Next, we'll just have to tell the cache that it should store missing records: +Next, we'll just have to enable this functionality, and check for the +`ErrMissingRecord` error which the cache returns when a record has been marked +as missing: ```go func main() { // ... - // Tell the cache to store missing records. - storeMisses := true - - // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithMissingRecordStorage(), ) - // Create a new API instance with the cache client. api := NewAPI(cacheClient) // ... @@ -430,6 +609,18 @@ refreshes and then transitions into having a value: ... ``` +**Please note** that this functionality is _implicit_ for `GetFetchBatch`: + +```go + batchFetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) { + // The cache will check if every ID in cacheMisses is present in the response. + // If it finds any IDs that are missing it will proceed to mark them as missing + // if missing record storage is enabled. + response, err := myDataSource(cacheMisses) + return response, nil + } +``` + The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/missing) # Batch endpoints @@ -477,7 +668,7 @@ func NewAPI(c *sturdyc.Client[string]) *API { } func (a *API) GetBatch(ctx context.Context, ids []string) (map[string]string, error) { - // We are going to pass the cache a key function that prefixes each id. + // We are going to use a cache a key function that prefixes each id. // This makes it possible to save the same id for different data sources. cacheKeyFn := a.BatchKeyFn("some-prefix") @@ -485,8 +676,6 @@ func (a *API) GetBatch(ctx context.Context, ids []string) (map[string]string, er fetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) { log.Printf("Cache miss. Fetching ids: %s\n", strings.Join(cacheMisses, ", ")) // Batch functions should return a map where the key is the id of the record. - // If you have store missing records enabled, any ID that isn't present - // in this map is going to be stored as a cache miss. response := make(map[string]string, len(cacheMisses)) for _, id := range cacheMisses { response[id] = "value" @@ -508,7 +697,7 @@ func main() { // Create a new API instance with the cache client. api := NewAPI(cacheClient) - // Seed the cache with ids 1-10. + // Make an initial call to make sure that IDs 1-10 are retrieved and cached. log.Println("Seeding ids 1-10") ids := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} api.GetBatch(context.Background(), ids) @@ -538,7 +727,6 @@ while continuously getting cache hits for IDs 1-10, regardless of what the batch looks like: ```sh -... 2024/04/07 11:09:58 Seed completed 2024/04/07 11:09:58 Cache miss. Fetching ids: 173 2024/04/07 11:09:58 map[1:value 173:value 2:value 3:value 4:value] @@ -551,14 +739,6 @@ batch looks like: ... ``` -I've already mentioned this in a previous comment, but I think it's important, -so I'll mention it again. Deleting and marking records as missing is -**implicit** for batch operations (client.GetFetchBatch and -client.PassthroughBatch). The cache will examine the keys of the map you -return, and if the option to store missing records is enabled, the record will -be marked as missing. If this option is not enabled and the record exists in -the cache, it will be deleted. You do not need to return any custom errors. - The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/batch) # Cache key permutations @@ -571,16 +751,21 @@ ways. Consider this: ```sh -curl https://movie-api/movies?ids=1,2,3&filterUpcoming=true -curl https://movie-api/movies?ids=1,2,3&filterUpcoming=false +curl https://movie-api/movies?ids=1,2,3&filterUpcoming=true&includeTrailers=false +curl https://movie-api/movies?ids=1,2,3&filterUpcoming=false&includeTrailers=true ``` -These responses might look completely different depending on whether any of -these movies are upcoming. Hence, it's important to store these records once -for each unique option set. +The IDs might be enough to uniquely identify these records in a database. +However, when you're consuming them through another system, they will probably +appear completely different as transformations are applied based on the options +you pass. Hence, it's important that we store these records once for each +unique option set. + +The options does not have to be query parameters either. The datasource you're +consuming could still be a database, and the options that you want to make part +of the cache key could be different types of filters. -The best way to showcase this, is to create a simple API client. This client is -going to be used to interact with a service for retrieving order statuses: +Below is a small example application to showcase this functionality: ```go type OrderOptions struct { @@ -598,8 +783,7 @@ func NewOrderAPI(c *sturdyc.Client[string]) *OrderAPI { func (a *OrderAPI) OrderStatus(ctx context.Context, ids []string, opts OrderOptions) (map[string]string, error) { // We use the PermutedBatchKeyFn when an ID isn't enough to uniquely identify a - // record. The cache is going to store each id once per set of options. In a more - // realistic scenario, the opts would be query params or arguments to a DB query. + // record. The cache is going to store each id once per set of options. cacheKeyFn := a.PermutatedBatchKeyFn("key", opts) // We'll create a fetchFn with a closure that captures the options. For this @@ -619,8 +803,8 @@ func (a *OrderAPI) OrderStatus(ctx context.Context, ids []string, opts OrderOpti The main difference from the previous example is that we're using `PermutatedBatchKeyFn` instead of `BatchKeyFn`. Internally, the cache will use -reflection to extract the names and values of every exported field in the opts -struct, and then include them when it constructs the cache keys. +reflection to extract the names and values of every **exported** field in the +opts struct, and then include them when it constructs the cache keys. Now, let's try to use this client: @@ -630,10 +814,10 @@ func main() { // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), ) - // We will fetch these IDs using various option sets. + // We will fetch these IDs using three different option sets. ids := []string{"id1", "id2", "id3"} optionSetOne := OrderOptions{CarrierName: "FEDEX", LatestDeliveryTime: "2024-04-06"} optionSetTwo := OrderOptions{CarrierName: "DHL", LatestDeliveryTime: "2024-04-07"} @@ -642,7 +826,8 @@ func main() { orderClient := NewOrderAPI(cacheClient) ctx := context.Background() - // Next, we'll seed our cache by fetching the entire list of IDs for all options sets. + // Next, we'll call the orderClient to make sure that we've retrieved and cached + // these IDs for all of our option sets. log.Println("Filling the cache with all IDs for all option sets") orderClient.OrderStatus(ctx, ids, optionSetOne) orderClient.OrderStatus(ctx, ids, optionSetTwo) @@ -654,10 +839,12 @@ func main() { At this point, the cache has stored each record individually for each option set. We can imagine that the keys might look something like this: -- FEDEX-2024-04-06-id1 -- DHL-2024-04-07-id1 -- UPS-2024-04-08-id1 -- etc.. +``` +FEDEX-2024-04-06-id1 +DHL-2024-04-07-id1 +UPS-2024-04-08-id1 +etc.. +``` Next, we'll add a sleep to make sure that all of the records are due for a refresh, and then request the ids individually for each set of options: @@ -692,15 +879,15 @@ go run . 2024/04/07 13:33:56 Fetching: [id1 id2 id3], carrier: DHL, delivery time: 2024-04-07 2024/04/07 13:33:56 Fetching: [id1 id2 id3], carrier: UPS, delivery time: 2024-04-08 2024/04/07 13:33:56 Cache filled -2024/04/07 13:33:58 Fetching: [id3], carrier: UPS, delivery time: 2024-04-08 2024/04/07 13:33:58 Fetching: [id1], carrier: FEDEX, delivery time: 2024-04-06 2024/04/07 13:33:58 Fetching: [id1], carrier: UPS, delivery time: 2024-04-08 +2024/04/07 13:33:58 Fetching: [id1], carrier: DHL, delivery time: 2024-04-07 2024/04/07 13:33:58 Fetching: [id2], carrier: UPS, delivery time: 2024-04-08 2024/04/07 13:33:58 Fetching: [id2], carrier: FEDEX, delivery time: 2024-04-06 -2024/04/07 13:33:58 Fetching: [id3], carrier: FEDEX, delivery time: 2024-04-06 2024/04/07 13:33:58 Fetching: [id2], carrier: DHL, delivery time: 2024-04-07 +2024/04/07 13:33:58 Fetching: [id3], carrier: FEDEX, delivery time: 2024-04-06 +2024/04/07 13:33:58 Fetching: [id3], carrier: UPS, delivery time: 2024-04-08 2024/04/07 13:33:58 Fetching: [id3], carrier: DHL, delivery time: 2024-04-07 -2024/04/07 13:33:58 Fetching: [id1], carrier: DHL, delivery time: 2024-04-07 ``` The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/permutations) @@ -730,7 +917,7 @@ func main() { // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), ) @@ -753,38 +940,15 @@ go run . 2024/04/07 13:45:44 Fetching: [id1 id2 id3], carrier: UPS, delivery time: 2024-04-08 ``` +The number of outgoing requests for the refreshes went from **9** to **3**. + The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/buffering) # Passthrough - -Time-based refreshes work really well for most use cases. However, there are -scenarios where you might want to allow a certain amount of traffic to hit the -underlying data source. For example, you might achieve a 99.99% cache hit rate, -and even though you refresh the data every 1-2 seconds, it results in only a -handful of requests. This could cause the other system to scale down -excessively. - -To solve this problem, you can use `client.Passthrough` and -`client.PassthroughBatchs`. These functions are functionally equivalent to -`client.GetFetch` and `client.GetFetchBatch`, but they differ in that they -allow a certain percentage of requests to pass through and refresh the records, -instead of refreshing records based solely on time. - -```go -capacity := 5 -numShards := 2 -ttl := time.Minute -evictionPercentage := 10 -c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - // Allow 50% of the requests to pass-through. Default is 100%. - sturdyc.WithPassthroughPercentage(50), - // Buffer the batchable pass-throughs. Default is false. - sturdyc.WithPassthroughBuffering(), -) - -res, err := c.Passthrough(ctx, "id", fetchFn) -batchRes, batchErr := c.PassthroughBatch(ctx, idBatch, c.BatchKeyFn("item"), batchFetchFn) -``` +There are times when you want to always retrieve the latest data from the +source and only use the in-memory cache as a fallback. In such scenarios, you +can use the `Passthrough` and `PassthroughBatch` functions. The cache will +still perform in-flight request tracking and deduplicate your requests. # Custom metrics @@ -837,31 +1001,35 @@ Below are a few images where these metrics have been visualized in Grafana: Screenshot 2024-05-04 at 12 38 20 # Generics +Personally, I tend to create caches based on how frequently the data needs to +be refreshed rather than what type of data it stores. I'll often have one +transient cache which refreshes the data every 2-5 milliseconds, and another +cache where I'm fine if the data is up to a minute old. -There are scenarios, where you might want to use the same cache for a data -source that could return multiples types. Personally, I tend to create caches -based on how frequently the data needs to be refreshed. I'll often have one -transient cache which refreshes the data every 2-5 milliseconds. Hence, I'll -use `any` as the type: +Hence, I don't want to tie the cache to any specific type so I'll often just +use `any`: ```go cacheClient := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.BackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), sturdyc.WithRefreshBuffering(10, time.Second*15), ) ``` -However, if this data source has more than a handful of types, the type -assertions may quickly feel like too much boilerplate. If that is the case, you -can use any of these package level functions: +However, having all client methods return `any` can quickly add a lot of +boilerplate if you're storing more than a handful of types, and need to make +type assertions. + +If you want to avoid this, you can use any of the package level exports: - [`GetFetch`](https://pkg.go.dev/github.com/creativecreature/sturdyc#GetFetch) - [`GetFetchBatch`](https://pkg.go.dev/github.com/creativecreature/sturdyc#GetFetchBatch) - [`Passthrough`](https://pkg.go.dev/github.com/creativecreature/sturdyc#Passthrough) - [`PassthroughBatch`](https://pkg.go.dev/github.com/creativecreature/sturdyc#PassthroughBatch) -They will perform the type conversions for you, and if any of them were to fail, -you'll get a [`ErrInvalidType`](https://pkg.go.dev/github.com/creativecreature/sturdyc#pkg-variables) error. +They will take the cache, call the function for you, and perform the type +conversions internally. If the type conversions were to fail, you'll get a +[`ErrInvalidType`](https://pkg.go.dev/github.com/creativecreature/sturdyc#pkg-variables) error. Below is an example of what an API client that uses these functions could look like: @@ -901,3 +1069,69 @@ func (a *OrderAPI) DeliveryTime(ctx context.Context, ids []string) (map[string]t ``` The entire example is available [here.](https://github.com/creativecreature/sturdyc/tree/main/examples/generics) + +# Distributed caching +I've thought about adding this functionality internally because it would be +really fun to build. However, there are already a lot of projects that are +doing this exceptionally well. + +Therefore, I've tried to design the API so that this package is easy to use in +**combination** with a distributed key-value store + +Let's use this function as an example: + +```go +func (o *OrderAPI) OrderStatus(ctx context.Context, id string) (string, error) { + fetchFn := func(ctx context.Context) (string, error) { + var response OrderStatusResponse + err := requests.URL(o.baseURL). + Param("id", id). + ToJSON(&response). + Fetch(ctx) + if err != nil { + return "", err + } + + return response.OrderStatus, nil + } + + return o.GetFetch(ctx, id, fetchFn) +} +``` + +The only modification you would have to make is to check the distributed storage +first, and then write to it if the key is missing: + +```go +func (o *OrderAPI) OrderStatus(ctx context.Context, id string) (string, error) { + fetchFn := func(ctx context.Context) (string, error) { + // Check redis cache first. + if orderStatus, ok := o.redisClient.Get(id); ok { + return orderStatus, nil + } + + // Fetch the order status from the underlying data source. + var response OrderStatusResponse + err := requests.URL(o.baseURL). + Param("id", id). + ToJSON(&response). + Fetch(ctx) + if err != nil { + return "", err + } + + // Add the order status to the redis cache. + go func() { o.RedisClient.Set(id, response.OrderStatus, time.Hour) }() + + return response.OrderStatus, nil + } + + return o.GetFetch(ctx, id, fetchFn) +} +``` + +I've used this setup a lot, where I let `sturdyc` handle request deduplication, +refresh buffering, and cache key permutations. This has often allowed me to use +a **much cheaper** database cluster, as the efficiency gains from batching the +refreshes, and often being able to serve entirely from memory, significantly +reduces the traffic to the distributed key-value store. diff --git a/buffer.go b/buffer.go index 061dbad..917273d 100644 --- a/buffer.go +++ b/buffer.go @@ -15,7 +15,7 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn } // If we got a perfect batch size, we can refresh the records immediately. - if len(ids) == c.batchSize { + if len(ids) == c.bufferSize { c.refreshBatch(ids, keyFn, fetchFn) return } @@ -23,9 +23,9 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn c.batchMutex.Lock() // If the ids are greater than our batch size we'll have to chunk them. - if len(ids) > c.batchSize { - idsToRefresh := ids[:c.batchSize] - overflowingIDs := ids[c.batchSize:] + if len(ids) > c.bufferSize { + idsToRefresh := ids[:c.bufferSize] + overflowingIDs := ids[c.bufferSize:] c.batchMutex.Unlock() // These IDs are the size we want, so we'll refresh them immediately. @@ -102,7 +102,7 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn c.bufferPermutationIDs[permutationString] = append(c.bufferPermutationIDs[permutationString], additionalIDs...) // If we haven't reached the batch size yet, we'll wait for more ids. - if len(c.bufferPermutationIDs[permutationString]) < c.batchSize { + if len(c.bufferPermutationIDs[permutationString]) < c.bufferSize { c.batchMutex.Unlock() continue } @@ -117,8 +117,8 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn deleteBuffer(c, permutationString) c.batchMutex.Unlock() - idsToRefresh := permIDs[:c.batchSize] - overflowingIDs := permIDs[c.batchSize:] + idsToRefresh := permIDs[:c.bufferSize] + overflowingIDs := permIDs[c.bufferSize:] // Refresh the first batch of IDs immediately. safeGo(func() { diff --git a/buffer_test.go b/buffer_test.go index d22cf1a..3bbe820 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -33,7 +33,8 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) { // 1. The number of scheduled refreshes exceeds the specified 'batchSize'. // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), ) @@ -97,7 +98,8 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) { // 1. The number of scheduled refreshes exceeds the specified 'batchSize'. // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), ) @@ -189,7 +191,8 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) { // 1. The number of scheduled refreshes exceeds the specified 'batchSize'. // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), ) @@ -257,7 +260,8 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) { // 1. The number of scheduled refreshes exceeds the specified 'batchSize'. // 2. The 'batchBufferTimeout' threshold is exceeded. c := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), ) @@ -347,7 +351,8 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) { // 1. The number of scheduled refreshes exceeds the specified 'batchSize'. // 2. The 'batchBufferTimeout' threshold is exceeded. client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), sturdyc.WithClock(clock), ) diff --git a/cache.go b/cache.go index 5d23402..3cd692b 100644 --- a/cache.go +++ b/cache.go @@ -37,22 +37,19 @@ type Config struct { evictionInterval time.Duration metricsRecorder MetricsRecorder - refreshesEnabled bool - minRefreshTime time.Duration - maxRefreshTime time.Duration - retryBaseDelay time.Duration - storeMisses bool + refreshInBackground bool + minRefreshTime time.Duration + maxRefreshTime time.Duration + retryBaseDelay time.Duration + storeMissingRecords bool bufferRefreshes bool batchMutex sync.Mutex - batchSize int + bufferSize int bufferTimeout time.Duration bufferPermutationIDs map[string][]string bufferPermutationChan map[string]chan<- []string - passthroughPercentage int - passthroughBuffering bool - useRelativeTimeKeyFormat bool keyTruncation time.Duration getSize func() int @@ -84,10 +81,9 @@ func New[T any](capacity, numShards int, ttl time.Duration, evictionPercentage i // Create a default configuration, and then apply the options. cfg := &Config{ - clock: NewClock(), - passthroughPercentage: 100, - evictionInterval: ttl / time.Duration(numShards), - getSize: client.Size, + clock: NewClock(), + evictionInterval: ttl / time.Duration(numShards), + getSize: client.Size, } // Apply the options to the configuration. client.Config = cfg @@ -161,6 +157,16 @@ func (c *Client[T]) Get(key string) (T, bool) { return val, ok && !ignore } +func (c *Client[T]) GetMany(ids []string, keyFn KeyFn) map[string]T { + records := make(map[string]T, len(ids)) + for _, id := range ids { + if value, ok := c.Get(keyFn(id)); ok { + records[id] = value + } + } + return records +} + // SetMissing writes a single value to the cache. Returns true if it triggered an eviction. func (c *Client[T]) SetMissing(key string, value T, isMissingRecord bool) bool { shard := c.getShard(key) diff --git a/examples/basic/main.go b/examples/basic/main.go index 7f37bdb..df48163 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -1,12 +1,96 @@ package main import ( + "context" "log" + "math/rand/v2" + "strconv" + "sync" + "sync/atomic" "time" "github.com/creativecreature/sturdyc" ) +func demonstrateGetFetch(cacheClient *sturdyc.Client[int]) { + // The cache has built-in stampede protection where it tracks in-flight + // requests for every key. + var count atomic.Int32 + fetchFn := func(_ context.Context) (int, error) { + count.Add(1) + time.Sleep(time.Second) + return 1337, nil + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + // We can ignore the error given the fetchFn we're using. + val, _ := cacheClient.GetFetch(context.Background(), "key2", fetchFn) + log.Printf("got value: %d\n", val) + wg.Done() + }() + } + wg.Wait() + + log.Printf("fetchFn was called %d time\n", count.Load()) + log.Println(cacheClient.Get("key2")) + log.Println("") +} + +func demonstrateGetFetchBatch(cacheClient *sturdyc.Client[int]) { + var count atomic.Int32 + fetchFn := func(_ context.Context, ids []string) (map[string]int, error) { + count.Add(1) + time.Sleep(time.Second * 5) + + response := make(map[string]int, len(ids)) + for _, id := range ids { + num, _ := strconv.Atoi(id) + response[id] = num + } + + return response, nil + } + + batches := [][]string{ + {"1", "2", "3", "4", "5"}, + {"6", "7", "8", "9", "10"}, + {"11", "12", "13", "14", "15"}, + } + + // We'll use a cache key function to add a prefix to the IDs. If we only used + // the IDs, we wouldn't be able to fetch the same IDs from multiple data sources. + keyPrefixFn := cacheClient.BatchKeyFn("my-data-source") + + // Request the keys for each batch. + for _, batch := range batches { + go func() { + res, _ := cacheClient.GetFetchBatch(context.Background(), batch, keyPrefixFn, fetchFn) + log.Printf("got batch: %v\n", res) + }() + } + + // Give the goroutines above a chance to run to ensure that the batches are in-flight. + time.Sleep(time.Second * 3) + + // Launch another 5 goroutines that are going to pick two random IDs from any of the batches. + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + ids := []string{batches[rand.IntN(2)][rand.IntN(4)], batches[rand.IntN(2)][rand.IntN(4)]} + res, _ := cacheClient.GetFetchBatch(context.Background(), ids, keyPrefixFn, fetchFn) + log.Printf("got batch: %v\n", res) + wg.Done() + }() + } + + wg.Wait() + log.Printf("fetchFn was called %d times\n", count.Load()) +} + func main() { // Maximum number of entries in the sturdyc. capacity := 10000 @@ -25,7 +109,6 @@ func main() { log.Println(cacheClient.Size()) log.Println(cacheClient.Get("key1")) - cacheClient.Delete("key1") - log.Println(cacheClient.Size()) - log.Println(cacheClient.Get("key1")) + demonstrateGetFetch(cacheClient) + demonstrateGetFetchBatch(cacheClient) } diff --git a/examples/batch/main.go b/examples/batch/main.go index bd48ede..df11690 100644 --- a/examples/batch/main.go +++ b/examples/batch/main.go @@ -56,12 +56,10 @@ func main() { maxRefreshDelay := time.Second * 2 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 - // Tell the cache to store missing records. - storeMisses := true // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), ) // Create a new API instance with the cache client. diff --git a/examples/buffering/main.go b/examples/buffering/main.go index eea3d92..3c6a993 100644 --- a/examples/buffering/main.go +++ b/examples/buffering/main.go @@ -60,7 +60,6 @@ func main() { retryBaseDelay := time.Millisecond * 10 // Whether to store misses in the sturdyc. This can be useful to // prevent the cache from fetching a non-existent key repeatedly. - storeMisses := true // With refresh buffering enabled, the cache will buffer refreshes // until the batch size is reached or the buffer timeout is hit. @@ -69,7 +68,7 @@ func main() { // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout), ) diff --git a/examples/generics/main.go b/examples/generics/main.go index b00e533..dd7c4e8 100644 --- a/examples/generics/main.go +++ b/examples/generics/main.go @@ -56,13 +56,10 @@ func main() { maxRefreshDelay := time.Second * 2 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 - // Whether to store misses in the sturdyc. This can be useful to - // prevent the cache from fetching a non-existent key repeatedly. - storeMisses := true // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), sturdyc.WithRefreshBuffering(10, time.Second*15), ) diff --git a/examples/missing/main.go b/examples/missing/main.go index 4dbbc4e..edd87a9 100644 --- a/examples/missing/main.go +++ b/examples/missing/main.go @@ -53,12 +53,11 @@ func main() { maxRefreshDelay := time.Millisecond * 30 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 - // Tell the cache to store missing records. - storeMisses := true // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), + sturdyc.WithMissingRecordStorage(), ) // Create a new API instance with the cache client. diff --git a/examples/permutations/main.go b/examples/permutations/main.go index cf3a177..f0feb3f 100644 --- a/examples/permutations/main.go +++ b/examples/permutations/main.go @@ -58,13 +58,10 @@ func main() { maxRefreshDelay := time.Second * 2 // The base for exponential backoff when retrying a refresh. retryBaseDelay := time.Millisecond * 10 - // Whether to store misses in the sturdyc. This can be useful to - // prevent the cache from fetching a non-existent key repeatedly. - storeMisses := true // Create a new cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), ) // We will fetch these IDs using various option sets, meaning that the ID alone diff --git a/examples/stampede/main.go b/examples/refreshes/main.go similarity index 73% rename from examples/stampede/main.go rename to examples/refreshes/main.go index 40ebeea..7fcd1fd 100644 --- a/examples/stampede/main.go +++ b/examples/refreshes/main.go @@ -42,24 +42,25 @@ func main() { evictionPercentage := 10 // =========================================================== - // =================== Stampede protection =================== + // =================== Background refreshes ================== // =========================================================== // Set a minimum and maximum refresh delay for the record. This is - // used to spread out the refreshes for our entries evenly over time. - // We don't want our outgoing requests to look like a comb. + // used to spread out the refreshes of our entries evenly over time. + // We don't want our outgoing requests graph to look like a comb. minRefreshDelay := time.Millisecond * 10 maxRefreshDelay := time.Millisecond * 30 - // The base used for exponential backoff when retrying a refresh. Most of the time, we perform - // refreshes well in advance of the records expiry time. Hence, we can help a system that is - // having trouble to get back on its feet by making fewer refreshes. Once we receive a - // successful response, the refreshes return to their original frequency. + // The base used for exponential backoff when retrying a refresh. Most of the + // time, we perform refreshes well in advance of the records expiry time. + // Hence, we can use this to make it easier for a system that is having + // trouble to get back on it's feet by making fewer refreshes when we're + // seeing a lot of errors. Once we receive a successful response, the + // refreshes return to their original frequency. You can set this to 0 + // if you don't want this behavior. retryBaseDelay := time.Millisecond * 10 - // NOTE: Ignore this for now, it will be shown in the next example. - storeMisses := false // Create a cache client with the specified configuration. cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryBaseDelay, storeMisses), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay), ) // Create a new API instance with the cache client. @@ -67,9 +68,8 @@ func main() { // We are going to retrieve the values every 10 milliseconds, however the // logs will reveal that actual refreshes fluctuate randomly within a 10-30 - // millisecond range. Even if this loop is executed across multiple - // goroutines, the API call frequency will maintain this variability, - // ensuring we avoid overloading the API with requests. + // millisecond range. Even if this loop is executed across multiple goroutines, + // the API call frequency will maintain this variability. for i := 0; i < 100; i++ { val, err := api.Get(context.Background(), "key") if err != nil { diff --git a/fetch_test.go b/fetch_test.go index b64d249..18d4ee6 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -63,7 +63,8 @@ func TestGetFetchStampedeProtection(t *testing.T) { // The cache is going to have a 2 second TTL, and the first refresh should happen within a second. c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -112,7 +113,8 @@ func TestGetFetchRefreshRetries(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -165,7 +167,8 @@ func TestGetFetchMissingRecord(t *testing.T) { clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, sturdyc.WithClock(clock), - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithMissingRecordStorage(), ) fetchObserver := NewFetchObserver(1) @@ -261,7 +264,8 @@ func TestBatchGetFetchNilMapMissingRecords(t *testing.T) { retryInterval := time.Second clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -307,7 +311,8 @@ func TestGetFetchBatchRetries(t *testing.T) { retryInterval := time.Second clock := sturdyc.NewTestClock(time.Now()) c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, retryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, retryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) fetchObserver := NewFetchObserver(6) @@ -413,7 +418,8 @@ func TestGetFetchBatchStampedeProtection(t *testing.T) { maxRefreshDelay := time.Millisecond * 1000 refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, shards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), sturdyc.WithMetrics(newTestMetricsRecorder(shards)), ) @@ -486,7 +492,7 @@ func TestGetFetchDeletesRecordsThatHaveBeenRemovedAtTheSource(t *testing.T) { refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, false), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), sturdyc.WithClock(clock), ) @@ -531,7 +537,8 @@ func TestGetFetchConvertsDeletedRecordsToMissingRecords(t *testing.T) { refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) @@ -582,7 +589,7 @@ func TestGetFetchBatchDeletesRecordsThatHaveBeenRemovedAtTheSource(t *testing.T) refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, false), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), sturdyc.WithClock(clock), ) @@ -628,7 +635,8 @@ func TestGetFetchBatchConvertsDeletedRecordsToMissingRecords(t *testing.T) { refreshRetryInterval := time.Millisecond * 10 c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(minRefreshDelay, maxRefreshDelay, refreshRetryInterval, true), + sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval), + sturdyc.WithMissingRecordStorage(), sturdyc.WithClock(clock), ) diff --git a/inflight.go b/inflight.go index 07d5727..5dc27ac 100644 --- a/inflight.go +++ b/inflight.go @@ -69,7 +69,7 @@ func makeBatchCall[T, V any](ctx context.Context, c *Client[T], opts makeBatchCa } // Check if we should store any of these IDs as a missing record. - if c.storeMisses && len(response) < len(opts.ids) { + if c.storeMissingRecords && len(response) < len(opts.ids) { for _, id := range opts.ids { if _, ok := response[id]; !ok { var zero T @@ -102,7 +102,7 @@ func callAndCache[V, T any](ctx context.Context, c *Client[T], key string, fn Fe c.inFlightMutex.Unlock() response, err := fn(ctx) - if err != nil && c.storeMisses && errors.Is(err, ErrStoreMissingRecord) { + if err != nil && c.storeMissingRecords && errors.Is(err, ErrStoreMissingRecord) { c.SetMissing(key, *new(T), true) return response, c.endErrorFlight(call, key, ErrMissingRecord) } diff --git a/options.go b/options.go index 2af0f07..b6ba6c7 100644 --- a/options.go +++ b/options.go @@ -26,52 +26,37 @@ func WithEvictionInterval(interval time.Duration) Option { } } -// WithStampedeProtection makes the cache shield the underlying data sources from -// cache stampedes. Cache stampedes occur when many requests for a particular piece -// of data (which has just expired or been evicted from the cache) come in at once. -// This can cause all requests to fetch the data concurrently, which may result in -// a significant burst of outgoing requests to the underlying data source. -func WithStampedeProtection(minRefreshTime, maxRefreshTime, retryBaseDelay time.Duration, storeMisses bool) Option { +// WithBackgroundRefreshes... +func WithBackgroundRefreshes(minRefreshTime, maxRefreshTime, retryBaseDelay time.Duration) Option { return func(c *Config) { - c.refreshesEnabled = true + c.refreshInBackground = true c.minRefreshTime = minRefreshTime c.maxRefreshTime = maxRefreshTime c.retryBaseDelay = retryBaseDelay - c.storeMisses = storeMisses + } +} + +// WithMissingRecordStorage allows the cache to mark keys as missing from the underlying data source. +func WithMissingRecordStorage() Option { + return func(c *Config) { + c.storeMissingRecords = true } } // WithRefreshBuffering will make the cache refresh data from batchable // endpoints more efficiently. It is going to create a buffer for each cache -// key permutation, and gather IDs until the "batchSize" is reached, or the -// "maxBufferTime" has passed. -func WithRefreshBuffering(batchSize int, maxBufferTime time.Duration) Option { +// key permutation, and gather IDs until the bufferSize is reached, or the +// bufferDuration has passed. +func WithRefreshBuffering(bufferSize int, bufferDuration time.Duration) Option { return func(c *Config) { c.bufferRefreshes = true - c.batchSize = batchSize - c.bufferTimeout = maxBufferTime + c.bufferSize = bufferSize + c.bufferTimeout = bufferDuration c.bufferPermutationIDs = make(map[string][]string) c.bufferPermutationChan = make(map[string]chan<- []string) } } -// WithPassthroughPercentage controls the rate at which requests are allowed through -// by the passthrough caching functions. For example, setting the percentage parameter -// to 50 would allow half of the requests to through. -func WithPassthroughPercentage(percentage int) Option { - return func(c *Config) { - c.passthroughPercentage = percentage - } -} - -// WithPassthroughBuffering allows you to decide if the batchable passthrough -// requests should be buffered and batched more efficiently. -func WithPassthroughBuffering() Option { - return func(c *Config) { - c.passthroughBuffering = true - } -} - // WithRelativeTimeKeyFormat allows you to control the truncation of time.Time // values that are being passed in to the cache key functions. func WithRelativeTimeKeyFormat(truncation time.Duration) Option { @@ -99,11 +84,11 @@ func validateConfig(capacity, numShards int, ttl time.Duration, evictionPercenta panic("evictionPercentage must be between 0 and 100") } - if !cfg.refreshesEnabled && cfg.bufferRefreshes { - panic("refresh buffering requires stampede protection to be enabled") + if !cfg.refreshInBackground && cfg.bufferRefreshes { + panic("refresh buffering requires background refreshes to be enabled") } - if cfg.bufferRefreshes && cfg.batchSize < 1 { + if cfg.bufferRefreshes && cfg.bufferSize < 1 { panic("batchSize must be greater than 0") } @@ -122,8 +107,4 @@ func validateConfig(capacity, numShards int, ttl time.Duration, evictionPercenta if cfg.retryBaseDelay < 0 { panic("retryBaseDelay must be greater than or equal to 0") } - - if cfg.passthroughPercentage < 1 || cfg.passthroughPercentage > 100 { - panic("passthroughPercentage must be between 1 and 100") - } } diff --git a/options_test.go b/options_test.go index f9edcae..2c67895 100644 --- a/options_test.go +++ b/options_test.go @@ -91,7 +91,7 @@ func TestPanicsIfTheRefreshBufferSizeIsLessThanOne(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithStampedeProtection(time.Minute, time.Hour, time.Second, true), + sturdyc.WithBackgroundRefreshes(time.Minute, time.Hour, time.Second), sturdyc.WithRefreshBuffering(0, time.Minute), ) } @@ -106,7 +106,7 @@ func TestPanicsIfTheRefreshBufferTimeoutIsLessThanOne(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithStampedeProtection(time.Minute, time.Hour, time.Second, true), + sturdyc.WithBackgroundRefreshes(time.Minute, time.Hour, time.Second), sturdyc.WithRefreshBuffering(10, 0), ) } @@ -135,7 +135,7 @@ func TestPanicsIfTheMinRefreshTimeIsGreaterThanTheMaxRefreshTime(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithStampedeProtection(time.Hour, time.Minute, time.Second, true), + sturdyc.WithBackgroundRefreshes(time.Hour, time.Minute, time.Second), ) } @@ -149,34 +149,6 @@ func TestPanicsIfTheRetryBaseDelayIsLessThanZero(t *testing.T) { } }() sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithStampedeProtection(time.Minute, time.Hour, -1, true), - ) -} - -func TestPanicsIfPassthroughPercentageIsLessThanOne(t *testing.T) { - t.Parallel() - - defer func() { - err := recover() - if err == nil { - t.Error("expected a panic when trying to use -1 as passthrough percentage") - } - }() - sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithPassthroughPercentage(-1), - ) -} - -func TestPanicsIfPassthroughPercentageIsGreaterThanOneHundred(t *testing.T) { - t.Parallel() - - defer func() { - err := recover() - if err == nil { - t.Error("expected a panic when trying to use 101 as passthrough percentage") - } - }() - sturdyc.New[string](100, 10, time.Minute, 5, - sturdyc.WithPassthroughPercentage(101), + sturdyc.WithBackgroundRefreshes(time.Minute, time.Hour, -1), ) } diff --git a/passthrough.go b/passthrough.go index 58054a5..1d06696 100644 --- a/passthrough.go +++ b/passthrough.go @@ -2,47 +2,21 @@ package sturdyc import ( "context" - "errors" - "math/rand/v2" ) -func passthroughAndCache[V, T any](ctx context.Context, c *Client[T], key string, fetchFn FetchFn[V]) (V, error) { - response, err := fetchFn(ctx) - if err != nil && c.storeMisses && errors.Is(err, ErrStoreMissingRecord) { - c.SetMissing(key, *new(T), true) - return response, ErrMissingRecord - } - - if err != nil { - return response, err - } - - res, ok := any(response).(T) - if !ok { - return response, ErrInvalidType +// Passthrough is always going to try and retrieve the latest data by calling the +// fetchFn. The cache is used as a fallback if the fetchFn returns an error. +func (c *Client[T]) Passthrough(ctx context.Context, key string, fetchFn FetchFn[T]) (T, error) { + res, err := callAndCache(ctx, c, key, fetchFn) + if err == nil { + return res, nil } - c.SetMissing(key, res, false) - return response, nil -} - -// Passthrough attempts to retrieve the id from the cache. If looking up the ID -// results in a cache miss, it will fetch the record using the fetchFn. If the -// record was found in the cache, it will perform another check to determine if -// it should allow a request to passthrough to the underlying data source. This -// is performed in the background, and the cache cached record will be updated -// with the response. -func (c *Client[T]) Passthrough(ctx context.Context, key string, fetchFn FetchFn[T]) (T, error) { - if value, ok, _, _ := c.get(key); ok { - // Check if we should do a passthrough. - if c.passthroughPercentage >= 100 || rand.IntN(100) >= c.passthroughPercentage { - safeGo(func() { - c.refresh(key, fetchFn) - }) - } + if value, ok := c.Get(key); ok { return value, nil } - return passthroughAndCache(ctx, c, key, fetchFn) + + return res, err } // Passthrough is a convenience function that performs type assertion on the result of client.Passthrough. @@ -51,64 +25,20 @@ func Passthrough[T, V any](ctx context.Context, c *Client[T], key string, fetchF return unwrap[V](value, err) } -func passthroughAndCacheBatch[V, T any](ctx context.Context, c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[V]) (map[string]V, error) { - response, err := fetchFn(ctx, ids) - if err != nil { - return response, err - } - - // Check if we should store any of these IDs as a missing record. - if c.storeMisses && len(response) < len(ids) { - for _, id := range ids { - if _, ok := response[id]; !ok { - c.SetMissing(keyFn(id), *new(T), true) - } - } - } - - // Store the records in the cache. - for id, record := range response { - v, ok := any(record).(T) - if !ok { - continue - } - c.SetMissing(keyFn(id), v, false) - } - - return response, nil -} - -// PassthroughBatch attempts to retrieve the ids from the cache. If looking up -// any of the IDs results in a cache miss, it will fetch the batch using the -// fetchFn. If all of the ID's are found in the cache, it will perform another -// check to determine if it should allow a request to passthrough to the -// underlying data source. This is performed in the background, and the cache -// will be updated with the response. +// PassthroughBatch is always going to try and retrieve the latest data by calling +// the fetchFn. The cache is used as a fallback if the fetchFn returns an error. func (c *Client[T]) PassthroughBatch(ctx context.Context, ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) (map[string]T, error) { - cachedRecords, cacheMisses, _ := c.groupIDs(ids, keyFn) - - // If we have cache misses, we're going to perform an outgoing refresh - // regardless. We'll utilize this to perform a passthrough for all IDs. - if len(cacheMisses) > 0 { - res, err := passthroughAndCacheBatch(ctx, c, ids, keyFn, fetchFn) - if err != nil && len(cachedRecords) > 0 { - return cachedRecords, ErrOnlyCachedRecords - } - return res, err + res, err := callAndCacheBatch(ctx, c, callBatchOpts[T, T]{ids, keyFn, fetchFn}) + if err == nil { + return res, nil } - // Check if we should do a passthrough. - if c.passthroughPercentage >= 100 || rand.IntN(100) >= c.passthroughPercentage { - safeGo(func() { - if c.passthroughBuffering { - bufferBatchRefresh(c, ids, keyFn, fetchFn) - return - } - c.refreshBatch(ids, keyFn, fetchFn) - }) + values := c.GetMany(ids, keyFn) + if len(values) > 0 { + return values, nil } - return cachedRecords, nil + return res, err } // Passthrough is a convenience function that performs type assertion on the result of client.PassthroughBatch. diff --git a/passthrough_test.go b/passthrough_test.go index 497bf47..c1becc4 100644 --- a/passthrough_test.go +++ b/passthrough_test.go @@ -2,6 +2,7 @@ package sturdyc_test import ( "context" + "errors" "testing" "time" @@ -9,7 +10,7 @@ import ( "github.com/google/go-cmp/cmp" ) -func TestFullPassthrough(t *testing.T) { +func TestPassthrough(t *testing.T) { t.Parallel() ctx := context.Background() @@ -33,9 +34,9 @@ func TestFullPassthrough(t *testing.T) { } for i := 0; i < numPassthroughs; i++ { - res, err := sturdyc.Passthrough(ctx, c, id, fetchObserver.Fetch) - if err != nil { - t.Fatalf("expected no error, got %v", err) + res, passthroughErr := sturdyc.Passthrough(ctx, c, id, fetchObserver.Fetch) + if passthroughErr != nil { + t.Fatalf("expected no error, got %v", passthroughErr) } if res != "value1" { @@ -48,60 +49,21 @@ func TestFullPassthrough(t *testing.T) { } fetchObserver.AssertFetchCount(t, numPassthroughs+1) -} - -func TestHalfPassthrough(t *testing.T) { - t.Parallel() - ctx := context.Background() - capacity := 5 - numShards := 2 - ttl := time.Minute - evictionPercentage := 10 - c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(time.Hour, time.Hour*2, time.Minute, true), - sturdyc.WithPassthroughPercentage(50), - ) - - id := "1" - numPassthroughs := 100 - fetchObserver := NewFetchObserver(numPassthroughs + 1) - fetchObserver.Response(id) - - res, err := sturdyc.Passthrough(ctx, c, id, fetchObserver.Fetch) + fetchObserver.Clear() + fetchObserver.Err(errors.New("error")) + cachedRes, err := sturdyc.Passthrough(ctx, c, id, fetchObserver.Fetch) + <-fetchObserver.FetchCompleted if err != nil { - t.Fatalf("expected no error, got %v", err) + t.Fatal(err) } - if res != "value1" { - t.Errorf("expected value1, got %v", res) + if cachedRes != "value1" { + t.Errorf("expected value1, got %v", cachedRes) } - - for i := 0; i < numPassthroughs; i++ { - res, err := sturdyc.Passthrough(ctx, c, id, fetchObserver.Fetch) - if err != nil { - t.Fatalf("expected no error, got %v", err) - } - - if res != "value1" { - t.Errorf("expected value1, got %v", res) - } - } - - // It's not possible to know how many requests we'll let through. We expect - // half, which would be 50, but let's use 15 as a margin of safety because - // we can't control the randomness. - safetyMargin := 15 - for i := 0; i < (numPassthroughs/2)-safetyMargin; i++ { - <-fetchObserver.FetchCompleted - } - - // We'll also do a short sleep just to ensure there are no more refreshes happening. - time.Sleep(time.Millisecond * 50) - fetchObserver.AssertMinFetchCount(t, (numPassthroughs/2)-safetyMargin) - fetchObserver.AssertMaxFetchCount(t, (numPassthroughs/2)+safetyMargin) + fetchObserver.AssertFetchCount(t, numPassthroughs+2) } -func TestFullPassthroughBatch(t *testing.T) { +func TestPassthroughBatch(t *testing.T) { t.Parallel() ctx := context.Background() @@ -125,9 +87,9 @@ func TestFullPassthroughBatch(t *testing.T) { } for i := 0; i < numPassthroughs; i++ { - res, err := sturdyc.PassthroughBatch(ctx, c, idBatch, c.BatchKeyFn("item"), fetchObserver.FetchBatch) - if err != nil { - t.Fatalf("expected no error, got %v", err) + res, passthroughErr := sturdyc.PassthroughBatch(ctx, c, idBatch, c.BatchKeyFn("item"), fetchObserver.FetchBatch) + if passthroughErr != nil { + t.Fatalf("expected no error, got %v", passthroughErr) } if !cmp.Equal(res, map[string]string{"1": "value1", "2": "value2", "3": "value3"}) { t.Errorf("expected value1, value2, value3, got %v", res) @@ -139,53 +101,16 @@ func TestFullPassthroughBatch(t *testing.T) { } fetchObserver.AssertFetchCount(t, numPassthroughs+1) -} - -func TestHalfPassthroughBatch(t *testing.T) { - t.Parallel() - ctx := context.Background() - capacity := 5 - numShards := 2 - ttl := time.Minute - evictionPercentage := 10 - c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage, - sturdyc.WithStampedeProtection(time.Hour, time.Hour*2, time.Minute, true), - sturdyc.WithPassthroughPercentage(50), - ) - - idBatch := []string{"1", "2", "3"} - numPassthroughs := 100 - fetchObserver := NewFetchObserver(numPassthroughs + 1) - fetchObserver.BatchResponse(idBatch) - - res, err := sturdyc.PassthroughBatch(ctx, c, idBatch, c.BatchKeyFn("item"), fetchObserver.FetchBatch) + fetchObserver.Clear() + fetchObserver.Err(errors.New("error")) + cachedRes, err := sturdyc.PassthroughBatch(ctx, c, idBatch, c.BatchKeyFn("item"), fetchObserver.FetchBatch) + <-fetchObserver.FetchCompleted if err != nil { - t.Fatalf("expected no error, got %v", err) - } - if !cmp.Equal(res, map[string]string{"1": "value1", "2": "value2", "3": "value3"}) { - t.Errorf("expected value1, value2, value3, got %v", res) - } - - for i := 0; i < numPassthroughs; i++ { - res, err := sturdyc.PassthroughBatch(ctx, c, idBatch, c.BatchKeyFn("item"), fetchObserver.FetchBatch) - if err != nil { - t.Fatalf("expected no error, got %v", err) - } - if !cmp.Equal(res, map[string]string{"1": "value1", "2": "value2", "3": "value3"}) { - t.Errorf("expected value1, value2, value3, got %v", res) - } + t.Fatal(err) } - - // It's not possible to know how many requests we'll let through. We expect - // half, which would be 50, but let's use 15 as a margin of safety. - safetyMargin := 15 - for i := 0; i < (numPassthroughs/2)-safetyMargin; i++ { - <-fetchObserver.FetchCompleted + if !cmp.Equal(cachedRes, map[string]string{"1": "value1", "2": "value2", "3": "value3"}) { + t.Errorf("expected value1, value2, value3, got %v", cachedRes) } - - // We'll also do a short sleep just to ensure there are no more refreshes happening. - time.Sleep(time.Millisecond * 50) - fetchObserver.AssertMinFetchCount(t, (numPassthroughs/2)-safetyMargin) - fetchObserver.AssertMaxFetchCount(t, (numPassthroughs/2)+safetyMargin) + fetchObserver.AssertFetchCount(t, numPassthroughs+2) } diff --git a/refresh.go b/refresh.go index 4f0ea3d..bc2d5a4 100644 --- a/refresh.go +++ b/refresh.go @@ -8,7 +8,7 @@ import ( func (c *Client[T]) refresh(key string, fetchFn FetchFn[T]) { response, err := fetchFn(context.Background()) if err != nil { - if c.storeMisses && errors.Is(err, ErrStoreMissingRecord) { + if c.storeMissingRecords && errors.Is(err, ErrStoreMissingRecord) { c.SetMissing(key, response, true) } if errors.Is(err, ErrDeleteRecord) { @@ -38,11 +38,11 @@ func (c *Client[T]) refreshBatch(ids []string, keyFn KeyFn, fetchFn BatchFetchFn continue } - if !c.storeMisses && !okResponse && okCache { + if !c.storeMissingRecords && !okResponse && okCache { c.Delete(keyFn(id)) } - if c.storeMisses && !okResponse { + if c.storeMissingRecords && !okResponse { c.SetMissing(keyFn(id), v, true) } } diff --git a/shard.go b/shard.go index 5381f56..8ccfc7c 100644 --- a/shard.go +++ b/shard.go @@ -87,7 +87,7 @@ func (s *shard[T]) get(key string) (val T, exists, ignore, refresh bool) { return val, false, false, false } - shouldRefresh := s.refreshesEnabled && s.clock.Now().After(item.refreshAt) + shouldRefresh := s.refreshInBackground && s.clock.Now().After(item.refreshAt) if shouldRefresh { // Release the read lock, and switch to a write lock. s.RUnlock() @@ -140,7 +140,7 @@ func (s *shard[T]) set(key string, value T, isMissingRecord bool) bool { isMissingRecord: isMissingRecord, } - if s.refreshesEnabled { + if s.refreshInBackground { // If there is a difference between the min- and maxRefreshTime we'll use that to // set a random padding so that the refreshes get spread out evenly over time. var padding time.Duration From 21ef3d9d678557193492f154fd17d775d2bb7e28 Mon Sep 17 00:00:00 2001 From: Victor Conner Date: Thu, 23 May 2024 08:06:35 +0200 Subject: [PATCH 2/2] fix: Improve the cache keys --- keys.go | 63 ++++++++++++++++++++++++++++++---------------------- keys_test.go | 25 ++++++++++++++++++++- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/keys.go b/keys.go index 9ea3b6c..67ccb81 100644 --- a/keys.go +++ b/keys.go @@ -8,6 +8,8 @@ import ( "time" ) +const idPrefix = "STURDYC_ID" + func handleSlice(v reflect.Value) string { // If the value is a pointer to a slice, get the actual slice if v.Kind() == reflect.Ptr { @@ -27,7 +29,7 @@ func handleSlice(v reflect.Value) string { } func extractPermutation(cacheKey string) string { - idIndex := strings.LastIndex(cacheKey, "ID-") + idIndex := strings.LastIndex(cacheKey, idPrefix+"-") // "ID-" not found, return the original cache key. if idIndex == -1 { @@ -76,17 +78,9 @@ func (c *Client[T]) handleTime(v reflect.Value) string { return "empty-time" } -// PermutatedKey is a helper function for creating a cache key from a struct of -// options. Passing anything but a struct for "permutationStruct" will result -// in a panic. -func (c *Client[T]) PermutatedKey(prefix string, permutationStruct interface{}) string { - var sb strings.Builder - sb.WriteString(prefix) - sb.WriteString("-") - - // Get the value of the interface +func (c *Client[T]) handleStruct(permutationStruct interface{}) string { + str := "" v := reflect.ValueOf(permutationStruct) - if v.Kind() == reflect.Ptr { v = v.Elem() } @@ -104,57 +98,72 @@ func (c *Client[T]) PermutatedKey(prefix string, permutationStruct interface{}) } if i > 0 { - sb.WriteString("-") + str += "-" } if field.Kind() == reflect.Ptr { if field.IsNil() { - sb.WriteString("nil") + str += "nil" continue } // If it's not nil we'll dereference the pointer to handle its value. field = field.Elem() } - //nolint:exhaustive // We only need special logic for slices and time.Time values. + //nolint:exhaustive // We don't need special logic for every kind. switch field.Kind() { + // All of these types makes for bad keys. + case reflect.Map, reflect.Func, reflect.Chan, reflect.Interface, reflect.UnsafePointer: + continue case reflect.Slice: if field.IsNil() { - sb.WriteString("nil") + str += "nil" } else { - sliceString := handleSlice(field) - sb.WriteString(sliceString) + str += handleSlice(field) } case reflect.Struct: + // Only handle time.Time structs. if field.Type() == reflect.TypeOf(time.Time{}) { - sb.WriteString(c.handleTime(field)) - continue + str += c.handleTime(field) } - sb.WriteString(fmt.Sprintf("%v", field.Interface())) + continue default: - sb.WriteString(fmt.Sprintf("%v", field.Interface())) + str += fmt.Sprintf("%v", field.Interface()) } } + return str +} - return sb.String() +// Permutated key takes a prefix, and a struct where the fields are +// concatenated with in order to make a unique cache key. Passing +// anything but a struct for "permutationStruct" will result in a panic. +// +// The cache will only use the EXPORTED fields of the struct to construct the key. +// The permutation struct should be FLAT, with no nested structs. The fields can +// be any of the basic types, as well as slices and time.Time values. +func (c *Client[T]) PermutatedKey(prefix string, permutationStruct interface{}) string { + return prefix + "-" + c.handleStruct(permutationStruct) } // BatchKeyFn provides a function for that can be used in conjunction with "GetFetchBatch". // It takes in a prefix, and returns a function that will append an ID suffix for each item. func (c *Client[T]) BatchKeyFn(prefix string) KeyFn { return func(id string) string { - return fmt.Sprintf("%s-ID-%s", prefix, id) + return fmt.Sprintf("%s-%s-%s", prefix, idPrefix, id) } } // PermutatedBatchKeyFn provides a function that can be used in conjunction // with GetFetchBatch. It takes a prefix, and a struct where the fields are -// concatenated with the id in order to make a unique key. Passing anything but -// a struct for "permutationStruct" will result in a panic. This function is useful -// when the id isn't enough in itself to uniquely identify a record. +// concatenated with the id in order to make a unique cache key. Passing +// anything but a struct for "permutationStruct" will result in a panic. +// +// The cache will only use the EXPORTED fields of the struct to construct the key. +// The permutation struct should be FLAT, with no nested structs. The fields can +// be any of the basic types, as well as slices and time.Time values. func (c *Client[T]) PermutatedBatchKeyFn(prefix string, permutationStruct interface{}) KeyFn { return func(id string) string { key := c.PermutatedKey(prefix, permutationStruct) - return fmt.Sprintf("%s-ID-%s", key, id) + return fmt.Sprintf("%s-%s-%s", key, idPrefix, id) } } diff --git a/keys_test.go b/keys_test.go index e3b1a0b..df1daf7 100644 --- a/keys_test.go +++ b/keys_test.go @@ -175,7 +175,7 @@ func TestPermutatedBatchKeyFn(t *testing.T) { IncludeUpcoming: true, limit: 2, }) - want := "cache-key-true-ID-1" + want := "cache-key-true-STURDYC_ID-1" got := cacheKeyFunc("1") if got != want { @@ -209,3 +209,26 @@ func TestTimePointers(t *testing.T) { t.Errorf("got: %s wanted: %s", got, want) } } + +func TestFunctionAndMapsAreIgnored(t *testing.T) { + t.Parallel() + + c := sturdyc.New[any](100, 1, time.Hour, 5) + type queryParams struct { + BoolValues []bool + Fn func() bool + Map map[string]string + } + params := queryParams{ + BoolValues: []bool{true, false}, + Fn: func() bool { return true }, + Map: map[string]string{"key1": "value1"}, + } + + want := "cache-key-true,false--" + got := c.PermutatedKey("cache-key", params) + + if got != want { + t.Errorf("got: %s wanted: %s", got, want) + } +}