Skip to content

Commit

Permalink
S3: Allow setting a constant prefix for all created keys
Browse files Browse the repository at this point in the history
Adds a new option under the aws stanza named key_prefix.
  • Loading branch information
April Schleck committed Jul 28, 2023
1 parent de77777 commit 8a71b0e
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [10096](https://github.com/grafana/loki/pull/10096) **aschleck**: S3: Allow setting a constant prefix for all created keys
* [10010](https://github.com/grafana/loki/pull/10010) **rasta-rocket**: feat(promtail): retrieve BotTags field from cloudflare
* [9995](https://github.com/grafana/loki/pull/9995) **chaudum**: Add jitter to the flush interval to prevent multiple ingesters to flush at the same time.
* [9797](https://github.com/grafana/loki/pull/9797) **chaudum**: Add new `loki_index_gateway_requests_total` counter metric to observe per-tenant RPS
Expand Down
8 changes: 8 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4141,6 +4141,10 @@ dynamodb:
# CLI flag: -s3.buckets
[bucketnames: <string> | default = ""]

# Sets a constant prefix for all keys inserted in S3. Example: loki/
# CLI flag: -s3.key-prefix
[key_prefix: <string> | default = ""]

# S3 Endpoint to connect to.
# CLI flag: -s3.endpoint
[endpoint: <string> | default = ""]
Expand Down Expand Up @@ -4419,6 +4423,10 @@ The `s3_storage_config` block configures the connection to Amazon S3 object stor
# CLI flag: -<prefix>.storage.s3.buckets
[bucketnames: <string> | default = ""]

# Sets a constant prefix for all keys inserted in S3. Example: loki/
# CLI flag: -<prefix>.storage.s3.key-prefix
[key_prefix: <string> | default = ""]

# S3 Endpoint to connect to.
# CLI flag: -<prefix>.storage.s3.endpoint
[endpoint: <string> | default = ""]
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type S3Config struct {
S3ForcePathStyle bool

BucketNames string
KeyPrefix string `yaml:"key_prefix" doc:"description=Sets a constant prefix for all keys inserted in S3. Example: loki/"`
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
AccessKeyID string `yaml:"access_key_id"`
Expand Down Expand Up @@ -105,6 +106,7 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
f.BoolVar(&cfg.S3ForcePathStyle, prefix+"s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
f.StringVar(&cfg.BucketNames, prefix+"s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
f.StringVar(&cfg.KeyPrefix, prefix+"s3.key-prefix", "", "The prefix to all keys inserted in s3. Example: loki-instances/west/")

f.StringVar(&cfg.Endpoint, prefix+"s3.endpoint", "", "S3 Endpoint to connect to.")
f.StringVar(&cfg.Region, prefix+"s3.region", "", "AWS region to use.")
Expand Down Expand Up @@ -345,7 +347,7 @@ func (a *S3ObjectClient) DeleteObject(ctx context.Context, objectKey string) err
return instrument.CollectedRequest(ctx, "S3.DeleteObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
deleteObjectInput := &s3.DeleteObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
Key: aws.String(a.cfg.KeyPrefix + objectKey),
}

_, err := a.S3.DeleteObjectWithContext(ctx, deleteObjectInput)
Expand Down Expand Up @@ -385,7 +387,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
var requestErr error
resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
Key: aws.String(a.cfg.KeyPrefix + objectKey),
})
return requestErr
})
Expand All @@ -409,7 +411,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object
putObjectInput := &s3.PutObjectInput{
Body: object,
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
Key: aws.String(a.cfg.KeyPrefix + objectKey),
StorageClass: aws.String(a.cfg.StorageClass),
}

Expand All @@ -433,7 +435,7 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([]
err := loki_instrument.TimeRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
input := s3.ListObjectsV2Input{
Bucket: aws.String(a.bucketNames[i]),
Prefix: aws.String(prefix),
Prefix: aws.String(a.cfg.KeyPrefix + prefix),
Delimiter: aws.String(delimiter),
}

Expand Down
52 changes: 52 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,58 @@ func (f RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}

func TestRequestPrefix(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, r.URL.Path)
}))
defer ts.Close()

cfg := S3Config{
Endpoint: ts.URL,
BucketNames: "buck-o",
KeyPrefix: "some/prefix/",
S3ForcePathStyle: true,
Insecure: true,
AccessKeyID: "key",
SecretAccessKey: flagext.SecretWithValue("secret"),
}

tests := []struct {
name string
key string
expected string
}{
{
name: "Single",
key: "key",
expected: "/buck-o/some/prefix/key",
},
{
name: "Multi",
key: "some/random/key",
expected: "/buck-o/some/prefix/some/random/key",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewS3ObjectClient(cfg, hedging.Config{})
require.NoError(t, err)

readCloser, _, err := client.GetObject(context.Background(), tt.key)
require.NoError(t, err)

buffer := make([]byte, 100)
_, err = readCloser.Read(buffer)
if err != io.EOF {
require.NoError(t, err)
}

assert.Equal(t, tt.expected, strings.Trim(string(buffer), "\n\x00"))
})
}
}

func TestRequestMiddleware(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, r.Header.Get("echo-me"))
Expand Down

0 comments on commit 8a71b0e

Please sign in to comment.