Skip to content

Commit

Permalink
fix: Added more observability
Browse files Browse the repository at this point in the history
  • Loading branch information
viccon committed May 28, 2024
1 parent 2f9f2af commit d68ade3
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 79 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
[![codecov](https://codecov.io/gh/creativecreature/sturdyc/graph/badge.svg?token=CYSKW3Z7E6)](https://codecov.io/gh/creativecreature/sturdyc)

`Sturdyc` is a highly concurrent cache that supports non-blocking reads and has
a configurable number of shards that makes it possible to achieve parallel
writes without any lock contention. The [xxhash](https://github.com/cespare/xxhash) algorithm
a configurable number of shards that makes it possible to achieve writes without
any lock contention. The [xxhash](https://github.com/cespare/xxhash) algorithm
is used for efficient key distribution. Evictions are performed per shard based
on recency at O(N) time complexity using [quickselect](https://en.wikipedia.org/wiki/Quickselect).

It has all the functionality you would expect from a caching library, but what
**sets it apart** is all the functionality you get that has been designed to
**sets it apart** is all the features you get that has been designed to
make it easier to build highly _performant_ and _robust_ applications.

You can enable *background refreshes* which instructs the cache to refresh the
Expand Down
87 changes: 80 additions & 7 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,17 +303,14 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
optsTwoBatch1 := []string{"4", "5"}
optsTwoBatch2 := []string{"6", "7", "8"}

// Request the first batch of records. We'll wait for a message on the
// refreshScheduledStream to be able to tell that the goroutine that is
// waiting for additional ids has been started. We'll also sleep for a brief
// moment to be sure that it's ready to receive messages on its channels.
// We're doing this just to be sure that one is launched first, because it
// should not receive any ids from the other two batches.
// Request the first batch of records. This should wait for additional IDs.
sturdyc.GetFetchBatch(ctx, c, optsOneIDs, c.PermutatedBatchKeyFn(prefix, optsOne), fetchObserver.FetchBatch)

// Next, we're requesting ids 4-8 with the second options which should exceed the buffer size for that permutation.
fetchObserver.BatchResponse([]string{"4", "5", "6", "7", "8"})
sturdyc.GetFetchBatch(ctx, c, optsTwoBatch1, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)
sturdyc.GetFetchBatch(ctx, c, optsTwoBatch2, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)

<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 3)
fetchObserver.AssertRequestedRecords(t, []string{"4", "5", "6", "7", "8"})
Expand Down Expand Up @@ -374,7 +371,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {

// Next, we'll move the clock past the maxRefreshDelay. This should guarantee
// that the next records we request gets scheduled for a refresh.
clock.Add(maxRefreshDelay + time.Second)
clock.Add(maxRefreshDelay + time.Second*5)

// Now we are going to request 50 items at once. The batch size is set to
// 5, so this should be chunked internally into 10 separate batches.
Expand All @@ -388,3 +385,79 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
}
fetchObserver.AssertFetchCount(t, 11)
}

func TestValuesAreUpdatedCorrectly(t *testing.T) {
t.Parallel()

ctx := context.Background()
capacity := 1000
ttl := time.Hour
numShards := 100
evictionPercentage := 10
minRefreshDelay := time.Minute * 5
maxRefreshDelay := time.Minute * 10
refreshRetryInterval := time.Millisecond * 10
batchSize := 10
batchBufferTimeout := time.Minute
clock := sturdyc.NewTestClock(time.Now())

// The client will be configured as follows:
// - Records will be assigned a TTL of one hour.
// - If a record is re-requested within a random interval of 5 to
// 10 minutes, the client will queue a refresh for that record.
// - The queued refresh will be executed under two conditions:
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

type Foo struct {
Value string
}

records := []string{"1", "2", "3"}
res, _ := sturdyc.GetFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
values := make(map[string]Foo, len(ids))
for _, id := range ids {
values[id] = Foo{Value: "foo-" + id}
}
return values, nil
})

if res["1"].Value != "foo-1" {
t.Errorf("expected 'foo-1', got '%s'", res["1"].Value)
}

clock.Add(time.Minute * 45)
sturdyc.GetFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
values := make(map[string]Foo, len(ids))
for _, id := range ids {
values[id] = Foo{Value: "foo2-" + id}
}
return values, nil
})

time.Sleep(50 * time.Millisecond)
clock.Add(batchBufferTimeout + time.Second*10)
time.Sleep(50 * time.Millisecond)
clock.Add(time.Minute * 45)
time.Sleep(50 * time.Millisecond)
clock.Add(time.Minute * 5)
time.Sleep(50 * time.Millisecond)

resTwo, _ := sturdyc.GetFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
values := make(map[string]Foo, len(ids))
for _, id := range ids {
values[id] = Foo{Value: "foo3-" + id}
}
return values, nil
})

if resTwo["1"].Value != "foo2-1" {
t.Errorf("expected 'foo2-1', got '%s'", resTwo["1"].Value)
}
}
3 changes: 2 additions & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func (c *Client[T]) Set(key string, value T) bool {
// StoreMissingRecord writes a single value to the cache. Returns true if it triggered an eviction.
func (c *Client[T]) StoreMissingRecord(key string) bool {
shard := c.getShard(key)
return shard.set(key, *new(T), true)
var zero T
return shard.set(key, zero, true)
}

// SetMany writes a map of key value pairs to the cache.
Expand Down
1 change: 1 addition & 0 deletions inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func makeBatchCall[T, V any](ctx context.Context, c *Client[T], opts makeBatchCa
for id, record := range response {
v, ok := any(record).(T)
if !ok {
c.log.Error("sturdyc: invalid type for ID:" + id)
continue
}
c.Set(opts.keyFn(id), v)
Expand Down
68 changes: 33 additions & 35 deletions keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"
)

const idPrefix = "STURDYC_ID"

func handleSlice(v reflect.Value) string {
// If the value is a pointer to a slice, get the actual slice
if v.Kind() == reflect.Ptr {
Expand All @@ -29,16 +27,16 @@ func handleSlice(v reflect.Value) string {
}

func extractPermutation(cacheKey string) string {
idIndex := strings.LastIndex(cacheKey, idPrefix+"-")
idIndex := strings.LastIndex(cacheKey, "ID-")

// "STURDYC_ID-" not found, return the original cache key.
// "ID-" not found, return the original cache key.
if idIndex == -1 {
return cacheKey
}

// Find the last "-" before "STURDYC_ID-" to ensure we include "STURDYC_ID-" in the result
// Find the last "-" before "ID-" to ensure we include "ID-" in the result
lastDashIndex := strings.LastIndex(cacheKey[:idIndex], "-")
// "-" not found before "STURDYC_ID-", return original string
// "-" not found before "ID-", return original string
if lastDashIndex == -1 {
return cacheKey
}
Expand Down Expand Up @@ -78,21 +76,33 @@ func (c *Client[T]) handleTime(v reflect.Value) string {
return "empty-time"
}

func (c *Client[T]) handleStruct(permutationStruct interface{}) string {
str := ""
// Permutated key takes a prefix, and a struct where the fields are
// concatenated with in order to make a unique cache key. Passing
// anything but a struct for "permutationStruct" will result in a panic.
//
// The cache will only use the EXPORTED fields of the struct to construct the key.
// The permutation struct should be FLAT, with no nested structs. The fields can
// be any of the basic types, as well as slices and time.Time values.
func (c *Client[T]) PermutatedKey(prefix string, permutationStruct interface{}) string {
var sb strings.Builder
sb.WriteString(prefix)
sb.WriteString("-")

// Get the value of the interface
v := reflect.ValueOf(permutationStruct)

if v.Kind() == reflect.Ptr {
v = v.Elem()
}

if v.Kind() != reflect.Struct {
panic("permutationStruct must be a struct")
panic("val must be a struct")
}

for i := 0; i < v.NumField(); i++ {
field := v.Field(i)

// Skip unexported fields
// Check if the field is exported, and if so skip it.
if !field.CanInterface() {
message := fmt.Sprintf(
"sturdyc: permutationStruct contains unexported field: %s which won't be part of the cache key",
Expand All @@ -103,58 +113,46 @@ func (c *Client[T]) handleStruct(permutationStruct interface{}) string {
}

if i > 0 {
str += "-"
sb.WriteString("-")
}

if field.Kind() == reflect.Ptr {
if field.IsNil() {
str += "nil"
sb.WriteString("nil")
continue
}
// If it's not nil we'll dereference the pointer to handle its value.
field = field.Elem()
}

//nolint:exhaustive // We don't need special logic for every kind.
//nolint:exhaustive // We only need special logic for slices and time.Time values.
switch field.Kind() {
case reflect.Slice:
if field.IsNil() {
str += "nil"
sb.WriteString("nil")
} else {
str += handleSlice(field)
sliceString := handleSlice(field)
sb.WriteString(sliceString)
}
case reflect.Struct:
// Only handle time.Time structs.
if field.Type() == reflect.TypeOf(time.Time{}) {
str += c.handleTime(field)
sb.WriteString(c.handleTime(field))
continue
}
continue
// All of these types makes for bad keys.
case reflect.Map, reflect.Func, reflect.Chan, reflect.Interface, reflect.UnsafePointer:
continue
sb.WriteString(fmt.Sprintf("%v", field.Interface()))
default:
str += fmt.Sprintf("%v", field.Interface())
sb.WriteString(fmt.Sprintf("%v", field.Interface()))
}
}
return str
}

// Permutated key takes a prefix, and a struct where the fields are
// concatenated with in order to make a unique cache key. Passing
// anything but a struct for "permutationStruct" will result in a panic.
//
// The cache will only use the EXPORTED fields of the struct to construct the key.
// The permutation struct should be FLAT, with no nested structs. The fields can
// be any of the basic types, as well as slices and time.Time values.
func (c *Client[T]) PermutatedKey(prefix string, permutationStruct interface{}) string {
return prefix + "-" + c.handleStruct(permutationStruct)
return sb.String()
}

// BatchKeyFn provides a function for that can be used in conjunction with "GetFetchBatch".
// It takes in a prefix, and returns a function that will append an ID suffix for each item.
func (c *Client[T]) BatchKeyFn(prefix string) KeyFn {
return func(id string) string {
return fmt.Sprintf("%s-%s-%s", prefix, idPrefix, id)
return fmt.Sprintf("%s-ID-%s", prefix, id)
}
}

Expand All @@ -169,6 +167,6 @@ func (c *Client[T]) BatchKeyFn(prefix string) KeyFn {
func (c *Client[T]) PermutatedBatchKeyFn(prefix string, permutationStruct interface{}) KeyFn {
return func(id string) string {
key := c.PermutatedKey(prefix, permutationStruct)
return fmt.Sprintf("%s-%s-%s", key, idPrefix, id)
return fmt.Sprintf("%s-ID-%s", key, id)
}
}
25 changes: 1 addition & 24 deletions keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestPermutatedBatchKeyFn(t *testing.T) {
IncludeUpcoming: true,
limit: 2,
})
want := "cache-key-true-STURDYC_ID-1"
want := "cache-key-true-ID-1"
got := cacheKeyFunc("1")

if got != want {
Expand Down Expand Up @@ -209,26 +209,3 @@ func TestTimePointers(t *testing.T) {
t.Errorf("got: %s wanted: %s", got, want)
}
}

func TestFunctionAndMapsAreIgnored(t *testing.T) {
t.Parallel()

c := sturdyc.New[any](100, 1, time.Hour, 5)
type queryParams struct {
BoolValues []bool
Fn func() bool
Map map[string]string
}
params := queryParams{
BoolValues: []bool{true, false},
Fn: func() bool { return true },
Map: map[string]string{"key1": "value1"},
}

want := "cache-key-true,false--"
got := c.PermutatedKey("cache-key", params)

if got != want {
t.Errorf("got: %s wanted: %s", got, want)
}
}
26 changes: 17 additions & 9 deletions safe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ func wrap[T, V any](fetchFn FetchFn[V]) FetchFn[T] {
return func(ctx context.Context) (T, error) {
res, err := fetchFn(ctx)
if err != nil {
return *new(T), err
var zero T
return zero, err
}
if val, ok := any(res).(T); ok {
return val, nil
val, ok := any(res).(T)
if !ok {
var zero T
return zero, ErrInvalidType
}
return *new(T), ErrInvalidType
return val, nil
}
}

func unwrap[V, T any](val T, err error) (V, error) {
if err != nil {
return *new(V), err
var zero V
return zero, err
}

v, ok := any(val).(V)
Expand All @@ -53,9 +57,11 @@ func wrapBatch[T, V any](fetchFn BatchFetchFn[V]) BatchFetchFn[T] {

resT := make(map[string]T, len(resV))
for id, v := range resV {
if val, ok := any(v).(T); ok {
resT[id] = val
val, ok := any(v).(T)
if !ok {
return resT, ErrInvalidType
}
resT[id] = val
}

return resT, nil
Expand All @@ -65,9 +71,11 @@ func wrapBatch[T, V any](fetchFn BatchFetchFn[V]) BatchFetchFn[T] {
func unwrapBatch[V, T any](values map[string]T, err error) (map[string]V, error) {
vals := make(map[string]V, len(values))
for id, v := range values {
if val, ok := any(v).(V); ok {
vals[id] = val
val, ok := any(v).(V)
if !ok {
return vals, ErrInvalidType
}
vals[id] = val
}
return vals, err
}

0 comments on commit d68ade3

Please sign in to comment.