Skip to content

Commit

Permalink
move caching_bucket_config to pkg cache
Browse files Browse the repository at this point in the history
Signed-off-by: akanshat <[email protected]>
  • Loading branch information
akanshat committed Dec 22, 2021
1 parent 088b9d3 commit b15440e
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 302 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.multi-stage
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions.
ARG BASE_DOCKER_SHA="14d68ca3d69fceaa6224250c83d81d935c053fb13594c811038c461194599973"
FROM golang:1.16-alpine3.12 as builder
FROM golang:1.17-alpine3.15 as builder

WORKDIR $GOPATH/src/github.com/thanos-io/thanos
# Change in the docker context invalidates the cache so to leverage docker
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ replace (
// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86.
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab => github.com/akanshat/cortex v1.10.1-0.20211222182735-328fbeedd424
github.com/efficientgo/tools/core => github.com/efficientgo/tools/core v0.0.0-20210731122119-5d4a0645ce9a
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/akanshat/cortex v1.10.1-0.20211222182735-328fbeedd424 h1:XxsZ4+as6/m2fO+YpQq0BYQEoC7fDP8Nje66RWG9YCw=
github.com/akanshat/cortex v1.10.1-0.20211222182735-328fbeedd424/go.mod h1:rOgO27ZndSaiFCRrWXYRIUHAJjeGSjk7s+fDPJU6gHg=
github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 h1:aDITxVUQ/3KBhpVWX57Vo9ntGTxoRw1F0T6/x/tRzNU=
github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497/go.mod h1:b6br6/pDFSfMkBgC96TbpOji05q5pa+v5rIlS0Y6XtI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -448,8 +450,6 @@ github.com/cortexproject/cortex v1.6.1-0.20210215155036-dfededd9f331/go.mod h1:8
github.com/cortexproject/cortex v1.7.1-0.20210224085859-66d6fb5b0d42/go.mod h1:u2dxcHInYbe45wxhLoWVdlFJyDhXewsMcxtnbq/QbH4=
github.com/cortexproject/cortex v1.7.1-0.20210316085356-3fedc1108a49/go.mod h1:/DBOW8TzYBTE/U+O7Whs7i7E2eeeZl1iRVDtIqxn5kg=
github.com/cortexproject/cortex v1.8.1-0.20210422151339-cf1c444e0905/go.mod h1:xxm4/CLvTmDxwE7yXwtClR4dIvkG4S09o5DygPOgc1U=
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab h1:THN4VQQqsZn5gNwcmQJO1GarnfZkSWfp5824ifoD9fQ=
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab/go.mod h1:njSBkQ1wUNx9X4knV/j65Pi4ItlJXX4QwXRKoMflJd8=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
Expand Down
205 changes: 205 additions & 0 deletions pkg/cache/caching_bucket_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"time"

"github.com/thanos-io/thanos/pkg/objstore"
)

// Codec for encoding and decoding results of Iter call.
type IterCodec interface {
Encode(files []string) ([]byte, error)
Decode(cachedData []byte) ([]string, error)
}

// CachingBucketConfig contains low-level configuration for individual bucket operations.
// This is not exposed to the user, but it is expected that code sets up individual
// operations based on user-provided configuration.
type CachingBucketConfig struct {
get map[string]*GetConfig
iter map[string]*IterConfig
exists map[string]*ExistsConfig
getRange map[string]*GetRangeConfig
attributes map[string]*AttributesConfig
}

func NewCachingBucketConfig() *CachingBucketConfig {
return &CachingBucketConfig{
get: map[string]*GetConfig{},
iter: map[string]*IterConfig{},
exists: map[string]*ExistsConfig{},
getRange: map[string]*GetRangeConfig{},
attributes: map[string]*AttributesConfig{},
}
}

// Generic config for single operation.
type OperationConfig struct {
Matcher func(name string) bool
Cache Cache
}

// Operation-specific configs.
type IterConfig struct {
OperationConfig
TTL time.Duration
Codec IterCodec
}

type ExistsConfig struct {
OperationConfig
ExistsTTL time.Duration
DoesntExistTTL time.Duration
}

type GetConfig struct {
ExistsConfig
ContentTTL time.Duration
MaxCacheableSize int
}

type GetRangeConfig struct {
OperationConfig
SubrangeSize int64
MaxSubRequests int
AttributesTTL time.Duration
SubrangeTTL time.Duration
}

type AttributesConfig struct {
OperationConfig
TTL time.Duration
}

func newOperationConfig(cache Cache, matcher func(string) bool) OperationConfig {
if matcher == nil {
panic("matcher")
}

return OperationConfig{
Matcher: matcher,
Cache: cache,
}
}

// CacheIter configures caching of "Iter" operation for matching directories.
func (cfg *CachingBucketConfig) CacheIter(configName string, cache Cache, matcher func(string) bool, ttl time.Duration, codec IterCodec) {
cfg.iter[configName] = &IterConfig{
OperationConfig: newOperationConfig(cache, matcher),
TTL: ttl,
Codec: codec,
}
}

// CacheGet configures caching of "Get" operation for matching files. Content of the object is cached, as well as whether object exists or not.
func (cfg *CachingBucketConfig) CacheGet(configName string, cache Cache, matcher func(string) bool, maxCacheableSize int, contentTTL, existsTTL, doesntExistTTL time.Duration) {
cfg.get[configName] = &GetConfig{
ExistsConfig: ExistsConfig{
OperationConfig: newOperationConfig(cache, matcher),
ExistsTTL: existsTTL,
DoesntExistTTL: doesntExistTTL,
},
ContentTTL: contentTTL,
MaxCacheableSize: maxCacheableSize,
}
}

// CacheExists configures caching of "Exists" operation for matching files. Negative values are cached as well.
func (cfg *CachingBucketConfig) CacheExists(configName string, cache Cache, matcher func(string) bool, existsTTL, doesntExistTTL time.Duration) {
cfg.exists[configName] = &ExistsConfig{
OperationConfig: newOperationConfig(cache, matcher),
ExistsTTL: existsTTL,
DoesntExistTTL: doesntExistTTL,
}
}

// CacheGetRange configures caching of "GetRange" operation. Subranges (aligned on subrange size) are cached individually.
// Since caching operation needs to know the object size to compute correct subranges, object size is cached as well.
// Single "GetRange" requests can result in multiple smaller GetRange sub-requests issued on the underlying bucket.
// MaxSubRequests specifies how many such subrequests may be issued. Values <= 0 mean there is no limit (requests
// for adjacent missing subranges are still merged).
func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache Cache, matcher func(string) bool, subrangeSize int64, attributesTTL, subrangeTTL time.Duration, maxSubRequests int) {
cfg.getRange[configName] = &GetRangeConfig{
OperationConfig: newOperationConfig(cache, matcher),
SubrangeSize: subrangeSize,
AttributesTTL: attributesTTL,
SubrangeTTL: subrangeTTL,
MaxSubRequests: maxSubRequests,
}
}

// CacheAttributes configures caching of "Attributes" operation for matching files.
func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache Cache, matcher func(name string) bool, ttl time.Duration) {
cfg.attributes[configName] = &AttributesConfig{
OperationConfig: newOperationConfig(cache, matcher),
TTL: ttl,
}
}

func (cfg *CachingBucketConfig) AllConfigNames() map[string][]string {
result := map[string][]string{}
for n := range cfg.get {
result[objstore.OpGet] = append(result[objstore.OpGet], n)
}
for n := range cfg.iter {
result[objstore.OpIter] = append(result[objstore.OpIter], n)
}
for n := range cfg.exists {
result[objstore.OpExists] = append(result[objstore.OpExists], n)
}
for n := range cfg.getRange {
result[objstore.OpGetRange] = append(result[objstore.OpGetRange], n)
}
for n := range cfg.attributes {
result[objstore.OpAttributes] = append(result[objstore.OpAttributes], n)
}
return result
}

func (cfg *CachingBucketConfig) FindIterConfig(dir string) (string, *IterConfig) {
for n, cfg := range cfg.iter {
if cfg.Matcher(dir) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindExistConfig(name string) (string, *ExistsConfig) {
for n, cfg := range cfg.exists {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindGetConfig(name string) (string, *GetConfig) {
for n, cfg := range cfg.get {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindGetRangeConfig(name string) (string, *GetRangeConfig) {
for n, cfg := range cfg.getRange {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindAttributesConfig(name string) (string, *AttributesConfig) {
for n, cfg := range cfg.attributes {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}
64 changes: 34 additions & 30 deletions pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,18 @@ func parseGroupcacheConfig(conf []byte) (GroupcacheConfig, error) {
}

// NewGroupcache creates a new Groupcache instance.
func NewGroupcache(logger log.Logger, reg prometheus.Registerer, conf []byte, basepath string, r *route.Router, bucket objstore.Bucket,
isTSDBChunkFile, isMetaFile, isBlocksRootDir func(path string) bool,
MetaFileExistsTTL, MetafileDoesntExistTTL, MetafileContentTTL, ChunkObjectAttrsTTL, ChunkSubrangeTTL, BlocksIterTTL time.Duration) (*Groupcache, error) {
func NewGroupcache(logger log.Logger, reg prometheus.Registerer, conf []byte, basepath string, r *route.Router, bucket objstore.Bucket, cfg *CachingBucketConfig) (*Groupcache, error) {
config, err := parseGroupcacheConfig(conf)
if err != nil {
return nil, err
}

return NewGroupcacheWithConfig(logger, reg, config, basepath, r, bucket, isTSDBChunkFile, isMetaFile, isBlocksRootDir, MetaFileExistsTTL, MetafileDoesntExistTTL, MetafileContentTTL, ChunkObjectAttrsTTL, ChunkSubrangeTTL, BlocksIterTTL)
return NewGroupcacheWithConfig(logger, reg, config, basepath, r, bucket, cfg)
}

// NewGroupcacheWithConfig creates a new Groupcache instance with the given config.
func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf GroupcacheConfig, basepath string, r *route.Router, bucket objstore.Bucket,
isTSDBChunkFile, isMetaFile, isBlocksRootDir func(path string) bool,
MetaFileExistsTTL, MetafileDoesntExistTTL, MetafileContentTTL, ChunkObjectAttrsTTL, ChunkSubrangeTTL, BlocksIterTTL time.Duration) (*Groupcache, error) {
cfg *CachingBucketConfig) (*Groupcache, error) {
httpProto := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{
BasePath: basepath,
})
Expand Down Expand Up @@ -135,6 +132,12 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf

switch parsedData.Verb {
case cachekey.AttributesVerb:
_, attrCfg := cfg.FindAttributesConfig(parsedData.Name)
if attrCfg == nil {
// TODO: Debug this. Why? Attributes get called for Chunks.
panic("caching bucket layer must not call on unconfigured paths")
}

attrs, err := bucket.Attributes(ctx, parsedData.Name)
if err != nil {
return err
Expand All @@ -145,11 +148,13 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isTSDBChunkFile(parsedData.Name) {
return dest.UnmarshalBinary(finalAttrs, time.Now().Add(ChunkObjectAttrsTTL))
}
panic("caching bucket layer must not call on unconfigured paths")
return dest.UnmarshalBinary(finalAttrs, time.Now().Add(attrCfg.TTL))
case cachekey.IterVerb:
_, iterCfg := cfg.FindIterConfig(parsedData.Name)
if iterCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}

var list []string
if err := bucket.Iter(ctx, parsedData.Name, func(s string) error {
list = append(list, s)
Expand All @@ -163,11 +168,12 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isBlocksRootDir(parsedData.Name) {
return dest.UnmarshalBinary(encodedList, time.Now().Add(BlocksIterTTL))
}
panic("caching bucket layer must not call on unconfigured paths")
return dest.UnmarshalBinary(encodedList, time.Now().Add(iterCfg.TTL))
case cachekey.ContentVerb:
_, contentCfg := cfg.FindGetConfig(parsedData.Name)
if contentCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}
rc, err := bucket.Get(ctx, parsedData.Name)
if err != nil {
return err
Expand All @@ -179,27 +185,28 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isMetaFile(parsedData.Name) {
return dest.UnmarshalBinary(b, time.Now().Add(MetafileContentTTL))
}
panic("caching bucket layer must not call on unconfigured paths")

return dest.UnmarshalBinary(b, time.Now().Add(contentCfg.ContentTTL))
case cachekey.ExistsVerb:
_, existsCfg := cfg.FindExistConfig(parsedData.Name)
if existsCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}
exists, err := bucket.Exists(ctx, parsedData.Name)
if err != nil {
return err
}

if isMetaFile(parsedData.Name) {
if exists {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(MetaFileExistsTTL))
} else {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(MetafileDoesntExistTTL))
}
if exists {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(existsCfg.ExistsTTL))
} else {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(existsCfg.DoesntExistTTL))
}
panic("caching bucket layer must not call on unconfigured paths")

case cachekey.SubrangeVerb:
_, subrangeCfg := cfg.FindGetRangeConfig(parsedData.Name)
if subrangeCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}
rc, err := bucket.GetRange(ctx, parsedData.Name, parsedData.Start, parsedData.End-parsedData.Start)
if err != nil {
return err
Expand All @@ -211,10 +218,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isTSDBChunkFile(parsedData.Name) {
return dest.UnmarshalBinary(b, time.Now().Add(ChunkSubrangeTTL))
}
panic("caching bucket layer must not call on unconfigured paths")
return dest.UnmarshalBinary(b, time.Now().Add(subrangeCfg.SubrangeTTL))

}

Expand Down
Loading

0 comments on commit b15440e

Please sign in to comment.