Skip to content

Commit

Permalink
Allow setting a constant prefix for all created keys
Browse files Browse the repository at this point in the history
  • Loading branch information
April Schleck committed Oct 30, 2023
1 parent 8628b15 commit 99440ed
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
##### Enhancements

* [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace.
* [10096](https://github.com/grafana/loki/pull/10096) **aschleck**: Storage: Allow setting a constant prefix for all created keys
* [11038](https://github.com/grafana/loki/pull/11038) **kavirajk**: Remove already deprecated `store.max-look-back-period`.
* [10906](https://github.com/grafana/loki/pull/10906) **kavirajk**: Support Loki ruler to notify WAL writes to remote storage.
* [10613](https://github.com/grafana/loki/pull/10613) **ngc4579**: Helm: allow GrafanaAgent tolerations
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,11 @@ congestion_control:
# CLI flag: -store.congestion-control.hedge.strategy
[strategy: <string> | default = ""]

# Sets a constant prefix for all keys inserted into object storage. Example:
# loki/
# CLI flag: -store.object-prefix
[object_prefix: <string> | default = ""]

# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is: store.index-cache-read
[index_queries_cache_config: <cache_config>]
Expand Down
8 changes: 7 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie

if c.cfg.RetentionEnabled {
var (
raw client.ObjectClient
encoder client.KeyEncoder
name = fmt.Sprintf("%s_%s", period.ObjectType, period.From.String())
retentionWorkDir = filepath.Join(c.cfg.WorkingDirectory, "retention", name)
Expand All @@ -313,7 +314,12 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie
// remove markers from the store dir after copying them to period specific dirs.
legacyMarkerDirs[period.ObjectType] = struct{}{}

if _, ok := objectClient.(*local.FSObjectClient); ok {
if casted, ok := objectClient.(client.PrefixedObjectClient); ok {
raw = casted.GetDownstream()
} else {
raw = objectClient
}
if _, ok := raw.(*local.FSObjectClient); ok {
encoder = client.FSEncoder
}
chunkClient := client.NewClient(objectClient, encoder, schemaConfig)
Expand Down
69 changes: 69 additions & 0 deletions pkg/storage/chunk/client/prefixed_object_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package client

import (
"context"
"io"
"strings"
)

type PrefixedObjectClient struct {
downstreamClient ObjectClient
prefix string
}

func NewPrefixedObjectClient(downstreamClient ObjectClient, prefix string) ObjectClient {
return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix}
}

func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object)
}

func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) {
objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter)
if err != nil {
return nil, nil, err
}

for i := range objects {
objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix)
}

for i := range commonPrefixes {
commonPrefixes[i] = StorageCommonPrefix(strings.TrimPrefix(string(commonPrefixes[i]), p.prefix))
}

return objects, commonPrefixes, nil
}

func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) IsObjectNotFoundErr(err error) bool {
return p.downstreamClient.IsObjectNotFoundErr(err)
}

func (p PrefixedObjectClient) IsRetryableErr(err error) bool {
return p.downstreamClient.IsRetryableErr(err)
}

func (p PrefixedObjectClient) Stop() {
p.downstreamClient.Stop()
}

func (p PrefixedObjectClient) GetDownstream() ObjectClient {
return p.downstreamClient
}

func (p PrefixedObjectClient) GetPrefix() string {
return p.prefix
}
19 changes: 18 additions & 1 deletion pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ type Config struct {
COSConfig ibmcloud.COSConfig `yaml:"cos"`
IndexCacheValidity time.Duration `yaml:"index_cache_validity"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
ObjectPrefix string `yaml:"object_prefix" doc:"description=Sets a constant prefix for all keys inserted into object storage. Example: loki/"`

IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config"`
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
Expand Down Expand Up @@ -362,6 +363,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "", f)
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
f.StringVar(&cfg.ObjectPrefix, "store.object-prefix", "", "The prefix to all keys inserted in object storage. Example: loki-instances/west/")
f.BoolVar(&cfg.DisableBroadIndexQueries, "store.disable-broad-index-queries", false, "Disable broad index queries which results in reduced cache usage and faster query performance at the expense of somewhat higher QPS on the index store.")
f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.")
cfg.BoltDBShipperConfig.RegisterFlags(f)
Expand Down Expand Up @@ -634,8 +636,23 @@ func (c *ClientMetrics) Unregister() {
c.AzureMetrics.Unregister()
}

// NewObjectClient makes a new StorageClient of the desired types.
// NewObjectClient makes a new StorageClient with the prefix in the front.
func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
actual, err := internalNewObjectClient(name, cfg, clientMetrics)
if err != nil {
return nil, err
}

if cfg.ObjectPrefix == "" {
return actual, nil
} else {
prefix := strings.Trim(cfg.ObjectPrefix, "/") + "/"
return client.NewPrefixedObjectClient(actual, prefix), nil
}
}

// internalNewObjectClient makes the underlying StorageClient of the desired types.
func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) {
var (
namedStore string
storeType = name
Expand Down
53 changes: 53 additions & 0 deletions pkg/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/cassandra"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
Expand Down Expand Up @@ -227,6 +228,58 @@ func TestNamedStores_populateStoreType(t *testing.T) {
})
}

func TestNewObjectClient_prefixing(t *testing.T) {
t.Run("no prefix", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

_, ok := objectClient.(client.PrefixedObjectClient)
assert.False(t, ok)
})

t.Run("prefix with trailing /", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "my/prefix/"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
assert.True(t, ok)
assert.Equal(t, "my/prefix/", prefixed.GetPrefix())
})

t.Run("prefix without trailing /", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "my/prefix"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
assert.True(t, ok)
assert.Equal(t, "my/prefix/", prefixed.GetPrefix())
})

t.Run("prefix with starting and trailing /", func(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)
cfg.ObjectPrefix = "/my/prefix/"

objectClient, err := NewObjectClient("inmemory", cfg, cm)
require.NoError(t, err)

prefixed, ok := objectClient.(client.PrefixedObjectClient)
assert.True(t, ok)
assert.Equal(t, "my/prefix/", prefixed.GetPrefix())
})
}

// DefaultSchemaConfig creates a simple schema config for testing
func DefaultSchemaConfig(store, schema string, from model.Time) config.SchemaConfig {
s := config.SchemaConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func (m *mockObjectClient) List(ctx context.Context, prefix, delimiter string) (
}

func TestCachedObjectClient_List(t *testing.T) {
objectKeys := func(items []client.StorageObject) []string {
keys := make([]string, 0, len(items))
for _, item := range items {
keys = append(keys, item.Key)
}
return keys
}

t.Run("refresh table name cache if requested table is not in cache", func(t *testing.T) {
ctx := context.Background()

Expand All @@ -81,14 +89,6 @@ func TestCachedObjectClient_List(t *testing.T) {
// replace mock object client with one that returns more tables
cachedObjectClient.ObjectClient = newMockObjectClient(t, newObjectsInStorage)

objectKeys := func(items []client.StorageObject) []string {
keys := make([]string, 0, len(items))
for _, item := range items {
keys = append(keys, item.Key)
}
return keys
}

// list contents of a table that is in table name cache
objects, _, err = cachedObjectClient.listTable(ctx, "table1")
require.Nil(t, err)
Expand All @@ -110,6 +110,24 @@ func TestCachedObjectClient_List(t *testing.T) {
objectsFromListCall, _, _ = cachedObjectClient.List(ctx, "table3/", "/", false)
require.Equal(t, objectsFromListCall, objects)
})

t.Run("supports prefixed clients", func(t *testing.T) {
ctx := context.Background()

prefix := "my/amazing/prefix/"
objectsInStorage := []string{
prefix + "table1/db.gz",
prefix + "table2/db.gz",
prefix + "table2/db2.gz",
}
objectClient := newMockObjectClient(t, objectsInStorage)
prefixedClient := client.NewPrefixedObjectClient(objectClient, prefix)
cachedObjectClient := newCachedObjectClient(prefixedClient)

objects, _, err := cachedObjectClient.List(ctx, "table2/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"table2/db.gz", "table2/db2.gz"}, objectKeys(objects))
})
}

func TestCachedObjectClient(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type IndexFile struct {
}

func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix string) Client {
objectClient := newCachedObjectClient(newPrefixedObjectClient(origObjectClient, storagePrefix))
objectClient := newCachedObjectClient(client.NewPrefixedObjectClient(origObjectClient, storagePrefix))
return &indexStorageClient{objectClient: objectClient}
}

Expand Down

This file was deleted.

0 comments on commit 99440ed

Please sign in to comment.