Skip to content

Commit

Permalink
Candidate for v1.6.0 release (#11)
Browse files Browse the repository at this point in the history
* Adds batching for MGET commands where the Cache type can be configured to split large MGET commands into smaller MGET commands and execute them together using pipelining. This has shown significant improvement in latency under certain conditions.
* Adds method to fetch the client from the Cache
* Updated release notes I missed because I was being lazy

Signed-off-by: Joseph Kratz <[email protected]>
  • Loading branch information
jkratz55 authored Sep 20, 2024
1 parent 90528ba commit 534563b
Show file tree
Hide file tree
Showing 232 changed files with 29,716 additions and 780 deletions.
16 changes: 16 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
# Release Notes

## v1.6.0

* Cache type can now be configured to split MGet with a large number of keys into multiple MGET commands executed within a pipeline which under some conditions can significantly reduce latency.
* Added method to get the underlying Redis client from the `Cache` type
* Updated dependencies

## v1.5.3

* Added MSetValues to allow fetching values only with multiple keys.

## v1.5.2

* Update dependencies

## v1.5.1

* Added Scan method on `Cache` to fetch multiple keys by a pattern
* Fixes a bug where Prometheus cache hits metric was being incremented when there was a cache miss
* Adds additional metrics to track how many keys are being sent in MGET commands

## v1.5.0

Nothing much with this release. Just updated the dependencies to the latest versions.
Expand Down
128 changes: 128 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Cache struct {
marshaller Marshaller
unmarshaller Unmarshaller
codec Codec
mgetBatch int // zero-value indicates no batching
hooksMixin
}

Expand Down Expand Up @@ -441,6 +442,20 @@ func (c *Cache) ExtendTTL(ctx context.Context, key string, dur time.Duration) er
return c.Expire(ctx, key, ttl+dur)
}

// Client returns the underlying Redis client the Cache is wrapping/using.
//
// This can be useful when there are operations that are not supported by Cache
// that are required, but you don't want to have to pass the Redis client around
// your application as well. This allows for the Redis client to be accessed from
// the Cache. However, it is important to understand that when using the Redis
// client directly it will not have the same behavior as the Cache. You will have
// to handle marshalling, unmarshalling, and compression yourself. You will also
// not have the same hooks behavior as the Cache and some metrics will not be
// tracked.
func (c *Cache) Client() redis.UniversalClient {
return c.redis.(redis.UniversalClient)
}

// DefaultMarshaller returns a Marshaller using msgpack to marshall
// values.
func DefaultMarshaller() Marshaller {
Expand Down Expand Up @@ -492,7 +507,17 @@ func (mr MultiResult[T]) IsEmpty() bool {

// MGet uses the provided Cache to retrieve multiple keys from Redis and returns
// a MultiResult.
//
// If a key doesn't exist in Redis it will not be included in the MultiResult
// returned. If all keys are not found the MultiResult will be empty.
func MGet[R any](ctx context.Context, c *Cache, keys ...string) (MultiResult[R], error) {

// If batching is enabled and the number of keys exceeds the batch size use
// multiple MGET commands in a pipeline.
if c.mgetBatch > 0 && len(keys) > c.mgetBatch {
return mGetBatch[R](ctx, c, keys...)
}

results, err := c.redis.MGet(ctx, keys...).Result()
if err != nil {
return nil, fmt.Errorf("redis: %w", err)
Expand Down Expand Up @@ -521,12 +546,69 @@ func MGet[R any](ctx context.Context, c *Cache, keys ...string) (MultiResult[R],
return resultMap, nil
}

// mGetBatch is a helper function that uses pipelining to fetch multiple keys
// from Redis in batches. Under certain conditions, this can be faster than
// fetching all keys in a single MGET command.
func mGetBatch[R any](ctx context.Context, c *Cache, keys ...string) (MultiResult[R], error) {

chunks := chunk(keys, c.mgetBatch)

pipe := c.redis.Pipeline()
cmds := make([]*redis.SliceCmd, 0, len(chunks))
for i := 0; i < len(chunks); i++ {
cmds = append(cmds, pipe.MGet(ctx, chunks[i]...))
}

_, err := pipe.Exec(ctx)
if err != nil {
return nil, fmt.Errorf("redis: %w", err)
}

resultMap := make(map[string]R)
for i := 0; i < len(cmds); i++ {
results, err := cmds[i].Result()
if err != nil {
return nil, fmt.Errorf("redis: %w", err)
}
for j, res := range results {
if res == nil || res == redis.Nil {
// Some or all of the requested keys may not exist. Skip iterations
// where the key wasn't found
continue
}
str, ok := res.(string)
if !ok {
return nil, fmt.Errorf("unexpected value type from Redis: expected %T but got %T", str, res)
}
data, err := c.hooksMixin.current.decompress([]byte(str))
if err != nil {
return nil, fmt.Errorf("decompress value: %w", err)
}
var val R
if err := c.hooksMixin.current.unmarshall(data, &val); err != nil {
return nil, fmt.Errorf("unmarshall value to type %T: %w", val, err)
}
key := chunks[i][j]
resultMap[key] = val
}
}

return resultMap, nil
}

// MGetValues fetches multiple keys from Redis and returns only the values. If
// the relationship between key -> value is required use MGet instead.
//
// MGetValues is useful when you only want to values and want to avoid the
// overhead of allocating a slice from a MultiResult.
func MGetValues[T any](ctx context.Context, c *Cache, keys ...string) ([]T, error) {

// If batching is enabled and the number of keys exceeds the batch size use
// multiple MGET commands in a pipeline.
if c.mgetBatch > 0 && len(keys) > c.mgetBatch {
return mGetValuesBatch[T](ctx, c, keys...)
}

results, err := c.redis.MGet(ctx, keys...).Result()
if err != nil {
return nil, fmt.Errorf("redis: %w", err)
Expand Down Expand Up @@ -557,6 +639,52 @@ func MGetValues[T any](ctx context.Context, c *Cache, keys ...string) ([]T, erro
return values, nil
}

func mGetValuesBatch[T any](ctx context.Context, c *Cache, keys ...string) ([]T, error) {
chunks := chunk(keys, c.mgetBatch)
pipe := c.redis.Pipeline()
cmds := make([]*redis.SliceCmd, 0, len(chunks))

for i := 0; i < len(chunks); i++ {
cmds = append(cmds, pipe.MGet(ctx, chunks[i]...))
}

_, err := pipe.Exec(ctx)
if err != nil {
return nil, fmt.Errorf("redis: %w", err)
}

values := make([]T, 0, len(keys))
for i := 0; i < len(cmds); i++ {
results, err := cmds[i].Result()
if err != nil {
return nil, fmt.Errorf("redis: %w", err)
}
for j := 0; j < len(results); j++ {
res := results[j]
if res == nil || res == redis.Nil {
// Some or all of the requested keys may not exist. Skip iterations
// where the key wasn't found
continue
}
str, ok := res.(string)
if !ok {
return nil, fmt.Errorf("unexpected value type from Redis: expected %T but got %T", str, res)
}
data, err := c.hooksMixin.current.decompress([]byte(str))
if err != nil {
return nil, fmt.Errorf("decompress value: %w", err)
}
var val T
if err := c.hooksMixin.current.unmarshall(data, &val); err != nil {
return nil, fmt.Errorf("unmarshall value to type %T: %w", val, err)
}
values = append(values, val)
}
}

return values, nil
}

// UpsertCallback is a callback function that is invoked by Upsert. An UpsertCallback
// is passed if a key was found, the old value (or zero-value if the key wasn't found)
// and the new value. An UpsertCallback is responsible for determining what value should
Expand Down
72 changes: 72 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,42 @@ func TestMGet(t *testing.T) {
}, map[string]name(results))
}

func TestMGetBatch(t *testing.T) {
setup()
defer tearDown()

type name struct {
First string
Middle string
Last string
}

testVal := name{
First: "Billy",
Middle: "Joel",
Last: "Bob",
}

val, _ := msgpack.Marshal(testVal)
expected := make(map[string]name)

keys := make([]string, 0, 10000)
for i := 0; i < 10000; i++ {
key := fmt.Sprintf("key%d", i)
keys = append(keys, key)
expected[key] = testVal
if err := client.Set(context.Background(), key, val, 0).Err(); err != nil {
t.Errorf("failed to setup data in Redis")
}
}

cache := New(client, BatchMultiGets(1000))
results, err := MGet[name](context.Background(), cache, keys...)
assert.NoError(t, err)
assert.Equal(t, 10000, len(results))
assert.Equal(t, MultiResult[name](expected), results)
}

func TestMGetValues(t *testing.T) {
setup()
defer tearDown()
Expand Down Expand Up @@ -473,6 +509,42 @@ func TestMGetValues(t *testing.T) {
}, results)
}

func TestMGetValuesBatch(t *testing.T) {
setup()
defer tearDown()

type name struct {
First string
Middle string
Last string
}

testVal := name{
First: "Billy",
Middle: "Joel",
Last: "Bob",
}

val, _ := msgpack.Marshal(testVal)
expected := make([]name, 0, 10000)
keys := make([]string, 0, 10000)

for i := 0; i < 10000; i++ {
key := fmt.Sprintf("key%d", i)
keys = append(keys, key)
expected = append(expected, testVal)
if err := client.Set(context.Background(), key, val, 0).Err(); err != nil {
t.Errorf("failed to setup data in Redis")
}
}

cache := New(client, BatchMultiGets(1000))
results, err := MGetValues[name](context.Background(), cache, keys...)
assert.NoError(t, err)
assert.Equal(t, 10000, len(results))
assert.Equal(t, expected, results)
}

func TestUpsertTTL(t *testing.T) {
setup()
defer tearDown()
Expand Down
23 changes: 12 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
module github.com/jkratz55/redis-cache

go 1.21
go 1.22

toolchain go1.22.0
toolchain go1.23.1

require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/andybalholm/brotli v1.1.0
github.com/pierrec/lz4/v4 v4.1.21
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_golang v1.20.4
github.com/redis/go-redis/v9 v9.6.1
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/exporters/prometheus v0.50.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/exporters/prometheus v0.52.0
go.opentelemetry.io/otel/metric v1.30.0
go.opentelemetry.io/otel/sdk/metric v1.30.0
go.uber.org/multierr v1.11.0
)

Expand All @@ -28,16 +28,17 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/common v0.59.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sys v0.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
golang.org/x/sys v0.25.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 534563b

Please sign in to comment.