Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move cache TTLs to a struct #2

Merged
merged 6 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.multi-stage
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions.
ARG BASE_DOCKER_SHA="14d68ca3d69fceaa6224250c83d81d935c053fb13594c811038c461194599973"
FROM golang:1.16-alpine3.12 as builder
FROM golang:1.17-alpine3.15 as builder

WORKDIR $GOPATH/src/github.com/thanos-io/thanos
# Change in the docker context invalidates the cache so to leverage docker
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ replace (
// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86.
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab => github.com/akanshat/cortex v1.10.1-0.20211222182735-328fbeedd424
github.com/efficientgo/tools/core => github.com/efficientgo/tools/core v0.0.0-20210731122119-5d4a0645ce9a
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/akanshat/cortex v1.10.1-0.20211222182735-328fbeedd424 h1:XxsZ4+as6/m2fO+YpQq0BYQEoC7fDP8Nje66RWG9YCw=
github.com/akanshat/cortex v1.10.1-0.20211222182735-328fbeedd424/go.mod h1:rOgO27ZndSaiFCRrWXYRIUHAJjeGSjk7s+fDPJU6gHg=
github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 h1:aDITxVUQ/3KBhpVWX57Vo9ntGTxoRw1F0T6/x/tRzNU=
github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497/go.mod h1:b6br6/pDFSfMkBgC96TbpOji05q5pa+v5rIlS0Y6XtI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -448,8 +450,6 @@ github.com/cortexproject/cortex v1.6.1-0.20210215155036-dfededd9f331/go.mod h1:8
github.com/cortexproject/cortex v1.7.1-0.20210224085859-66d6fb5b0d42/go.mod h1:u2dxcHInYbe45wxhLoWVdlFJyDhXewsMcxtnbq/QbH4=
github.com/cortexproject/cortex v1.7.1-0.20210316085356-3fedc1108a49/go.mod h1:/DBOW8TzYBTE/U+O7Whs7i7E2eeeZl1iRVDtIqxn5kg=
github.com/cortexproject/cortex v1.8.1-0.20210422151339-cf1c444e0905/go.mod h1:xxm4/CLvTmDxwE7yXwtClR4dIvkG4S09o5DygPOgc1U=
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab h1:THN4VQQqsZn5gNwcmQJO1GarnfZkSWfp5824ifoD9fQ=
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab/go.mod h1:njSBkQ1wUNx9X4knV/j65Pi4ItlJXX4QwXRKoMflJd8=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
Expand Down
234 changes: 234 additions & 0 deletions pkg/cache/caching_bucket_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"time"

"github.com/thanos-io/thanos/pkg/objstore"
)

// Codec for encoding and decoding results of Iter call.
type IterCodec interface {
Encode(files []string) ([]byte, error)
Decode(cachedData []byte) ([]string, error)
}

// CachingBucketConfig contains low-level configuration for individual bucket operations.
// This is not exposed to the user, but it is expected that code sets up individual
// operations based on user-provided configuration.
type CachingBucketConfig struct {
get map[string]*GetConfig
iter map[string]*IterConfig
exists map[string]*ExistsConfig
getRange map[string]*GetRangeConfig
attributes map[string]*AttributesConfig
}

func NewCachingBucketConfig() *CachingBucketConfig {
return &CachingBucketConfig{
get: map[string]*GetConfig{},
iter: map[string]*IterConfig{},
exists: map[string]*ExistsConfig{},
getRange: map[string]*GetRangeConfig{},
attributes: map[string]*AttributesConfig{},
}
}

// SetCacheImplementation sets the value of Cache for all configurations.
func (cfg *CachingBucketConfig) SetCacheImplementation(c Cache) {
if cfg.get != nil {
for k := range cfg.get {
cfg.get[k].Cache = c
}
}
if cfg.iter != nil {
for k := range cfg.iter {
cfg.iter[k].Cache = c
}
}
if cfg.exists != nil {
for k := range cfg.exists {
cfg.exists[k].Cache = c
}
}
if cfg.getRange != nil {
for k := range cfg.getRange {
cfg.getRange[k].Cache = c
}
}
if cfg.attributes != nil {
for k := range cfg.attributes {
cfg.attributes[k].Cache = c
}
}
}

// Generic config for single operation.
type OperationConfig struct {
Matcher func(name string) bool
Cache Cache
}

// Operation-specific configs.
type IterConfig struct {
OperationConfig
TTL time.Duration
Codec IterCodec
}

type ExistsConfig struct {
OperationConfig
ExistsTTL time.Duration
DoesntExistTTL time.Duration
}

type GetConfig struct {
ExistsConfig
ContentTTL time.Duration
MaxCacheableSize int
}

type GetRangeConfig struct {
OperationConfig
SubrangeSize int64
MaxSubRequests int
AttributesTTL time.Duration
SubrangeTTL time.Duration
}

type AttributesConfig struct {
OperationConfig
TTL time.Duration
}

func newOperationConfig(cache Cache, matcher func(string) bool) OperationConfig {
if matcher == nil {
panic("matcher")
}

return OperationConfig{
Matcher: matcher,
Cache: cache,
}
}

// CacheIter configures caching of "Iter" operation for matching directories.
func (cfg *CachingBucketConfig) CacheIter(configName string, cache Cache, matcher func(string) bool, ttl time.Duration, codec IterCodec) {
cfg.iter[configName] = &IterConfig{
OperationConfig: newOperationConfig(cache, matcher),
TTL: ttl,
Codec: codec,
}
}

// CacheGet configures caching of "Get" operation for matching files. Content of the object is cached, as well as whether object exists or not.
func (cfg *CachingBucketConfig) CacheGet(configName string, cache Cache, matcher func(string) bool, maxCacheableSize int, contentTTL, existsTTL, doesntExistTTL time.Duration) {
cfg.get[configName] = &GetConfig{
ExistsConfig: ExistsConfig{
OperationConfig: newOperationConfig(cache, matcher),
ExistsTTL: existsTTL,
DoesntExistTTL: doesntExistTTL,
},
ContentTTL: contentTTL,
MaxCacheableSize: maxCacheableSize,
}
}

// CacheExists configures caching of "Exists" operation for matching files. Negative values are cached as well.
func (cfg *CachingBucketConfig) CacheExists(configName string, cache Cache, matcher func(string) bool, existsTTL, doesntExistTTL time.Duration) {
cfg.exists[configName] = &ExistsConfig{
OperationConfig: newOperationConfig(cache, matcher),
ExistsTTL: existsTTL,
DoesntExistTTL: doesntExistTTL,
}
}

// CacheGetRange configures caching of "GetRange" operation. Subranges (aligned on subrange size) are cached individually.
// Since caching operation needs to know the object size to compute correct subranges, object size is cached as well.
// Single "GetRange" requests can result in multiple smaller GetRange sub-requests issued on the underlying bucket.
// MaxSubRequests specifies how many such subrequests may be issued. Values <= 0 mean there is no limit (requests
// for adjacent missing subranges are still merged).
func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache Cache, matcher func(string) bool, subrangeSize int64, attributesTTL, subrangeTTL time.Duration, maxSubRequests int) {
cfg.getRange[configName] = &GetRangeConfig{
OperationConfig: newOperationConfig(cache, matcher),
SubrangeSize: subrangeSize,
AttributesTTL: attributesTTL,
SubrangeTTL: subrangeTTL,
MaxSubRequests: maxSubRequests,
}
}

// CacheAttributes configures caching of "Attributes" operation for matching files.
func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache Cache, matcher func(name string) bool, ttl time.Duration) {
cfg.attributes[configName] = &AttributesConfig{
OperationConfig: newOperationConfig(cache, matcher),
TTL: ttl,
}
}

func (cfg *CachingBucketConfig) AllConfigNames() map[string][]string {
result := map[string][]string{}
for n := range cfg.get {
result[objstore.OpGet] = append(result[objstore.OpGet], n)
}
for n := range cfg.iter {
result[objstore.OpIter] = append(result[objstore.OpIter], n)
}
for n := range cfg.exists {
result[objstore.OpExists] = append(result[objstore.OpExists], n)
}
for n := range cfg.getRange {
result[objstore.OpGetRange] = append(result[objstore.OpGetRange], n)
}
for n := range cfg.attributes {
result[objstore.OpAttributes] = append(result[objstore.OpAttributes], n)
}
return result
}

func (cfg *CachingBucketConfig) FindIterConfig(dir string) (string, *IterConfig) {
for n, cfg := range cfg.iter {
if cfg.Matcher(dir) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindExistConfig(name string) (string, *ExistsConfig) {
for n, cfg := range cfg.exists {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindGetConfig(name string) (string, *GetConfig) {
for n, cfg := range cfg.get {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindGetRangeConfig(name string) (string, *GetRangeConfig) {
for n, cfg := range cfg.getRange {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}

func (cfg *CachingBucketConfig) FindAttributesConfig(name string) (string, *AttributesConfig) {
for n, cfg := range cfg.attributes {
if cfg.Matcher(name) {
return n, cfg
}
}
return "", nil
}
63 changes: 33 additions & 30 deletions pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,18 @@ func parseGroupcacheConfig(conf []byte) (GroupcacheConfig, error) {
}

// NewGroupcache creates a new Groupcache instance.
func NewGroupcache(logger log.Logger, reg prometheus.Registerer, conf []byte, basepath string, r *route.Router, bucket objstore.Bucket,
isTSDBChunkFile, isMetaFile, isBlocksRootDir func(path string) bool,
MetaFileExistsTTL, MetafileDoesntExistTTL, MetafileContentTTL, ChunkObjectAttrsTTL, ChunkSubrangeTTL, BlocksIterTTL time.Duration) (*Groupcache, error) {
func NewGroupcache(logger log.Logger, reg prometheus.Registerer, conf []byte, basepath string, r *route.Router, bucket objstore.Bucket, cfg *CachingBucketConfig) (*Groupcache, error) {
config, err := parseGroupcacheConfig(conf)
if err != nil {
return nil, err
}

return NewGroupcacheWithConfig(logger, reg, config, basepath, r, bucket, isTSDBChunkFile, isMetaFile, isBlocksRootDir, MetaFileExistsTTL, MetafileDoesntExistTTL, MetafileContentTTL, ChunkObjectAttrsTTL, ChunkSubrangeTTL, BlocksIterTTL)
return NewGroupcacheWithConfig(logger, reg, config, basepath, r, bucket, cfg)
}

// NewGroupcacheWithConfig creates a new Groupcache instance with the given config.
func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf GroupcacheConfig, basepath string, r *route.Router, bucket objstore.Bucket,
isTSDBChunkFile, isMetaFile, isBlocksRootDir func(path string) bool,
MetaFileExistsTTL, MetafileDoesntExistTTL, MetafileContentTTL, ChunkObjectAttrsTTL, ChunkSubrangeTTL, BlocksIterTTL time.Duration) (*Groupcache, error) {
cfg *CachingBucketConfig) (*Groupcache, error) {
httpProto := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{
BasePath: basepath,
})
Expand Down Expand Up @@ -135,6 +132,11 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf

switch parsedData.Verb {
case cachekey.AttributesVerb:
_, attrCfg := cfg.FindAttributesConfig(parsedData.Name)
if attrCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}

attrs, err := bucket.Attributes(ctx, parsedData.Name)
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
Expand All @@ -145,11 +147,13 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isTSDBChunkFile(parsedData.Name) {
return dest.UnmarshalBinary(finalAttrs, time.Now().Add(ChunkObjectAttrsTTL))
}
panic("caching bucket layer must not call on unconfigured paths")
return dest.UnmarshalBinary(finalAttrs, time.Now().Add(attrCfg.TTL))
case cachekey.IterVerb:
_, iterCfg := cfg.FindIterConfig(parsedData.Name)
if iterCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}

var list []string
if err := bucket.Iter(ctx, parsedData.Name, func(s string) error {
list = append(list, s)
Expand All @@ -163,11 +167,12 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isBlocksRootDir(parsedData.Name) {
return dest.UnmarshalBinary(encodedList, time.Now().Add(BlocksIterTTL))
}
panic("caching bucket layer must not call on unconfigured paths")
return dest.UnmarshalBinary(encodedList, time.Now().Add(iterCfg.TTL))
case cachekey.ContentVerb:
_, contentCfg := cfg.FindGetConfig(parsedData.Name)
if contentCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}
rc, err := bucket.Get(ctx, parsedData.Name)
if err != nil {
return err
Expand All @@ -179,27 +184,28 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isMetaFile(parsedData.Name) {
return dest.UnmarshalBinary(b, time.Now().Add(MetafileContentTTL))
}
panic("caching bucket layer must not call on unconfigured paths")

return dest.UnmarshalBinary(b, time.Now().Add(contentCfg.ContentTTL))
case cachekey.ExistsVerb:
_, existsCfg := cfg.FindExistConfig(parsedData.Name)
if existsCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}
exists, err := bucket.Exists(ctx, parsedData.Name)
if err != nil {
return err
}

if isMetaFile(parsedData.Name) {
if exists {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(MetaFileExistsTTL))
} else {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(MetafileDoesntExistTTL))
}
if exists {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(existsCfg.ExistsTTL))
} else {
return dest.UnmarshalBinary([]byte(strconv.FormatBool(exists)), time.Now().Add(existsCfg.DoesntExistTTL))
}
panic("caching bucket layer must not call on unconfigured paths")

case cachekey.SubrangeVerb:
_, subrangeCfg := cfg.FindGetRangeConfig(parsedData.Name)
if subrangeCfg == nil {
panic("caching bucket layer must not call on unconfigured paths")
}
rc, err := bucket.GetRange(ctx, parsedData.Name, parsedData.Start, parsedData.End-parsedData.Start)
if err != nil {
return err
Expand All @@ -211,10 +217,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
return err
}

if isTSDBChunkFile(parsedData.Name) {
return dest.UnmarshalBinary(b, time.Now().Add(ChunkSubrangeTTL))
}
panic("caching bucket layer must not call on unconfigured paths")
return dest.UnmarshalBinary(b, time.Now().Add(subrangeCfg.SubrangeTTL))

}

Expand Down
Loading