From f786640fcecd9721d6741f64d537c5fa85a2e2be Mon Sep 17 00:00:00 2001 From: Shunsuke Tokunaga Date: Tue, 21 May 2024 15:04:50 +0100 Subject: [PATCH] Add a feature to introduce BucketPolicy (#233) * Add bucketPolicy in type * mod * add new APIs in s3 backend * add policies in RGW * add policy * policy * lint * tidy * remove unnecessary prefix 'Bucket' * remove meaningless comments * add error messages * add PolicyClient to subresourceClient * fix code block in DEVELOPMENT --- apis/provider-ceph/v1alpha1/bucket_types.go | 6 + docs/DEVELOPMENT.md | 2 +- internal/backendstore/backend.go | 3 + .../backendstorefakes/fake_s3client.go | 249 ++++++++++++++ internal/controller/bucket/consts.go | 4 + .../bucket/lifecycleconfiguration.go | 2 +- internal/controller/bucket/policy.go | 202 ++++++++++++ internal/controller/bucket/policy_test.go | 308 ++++++++++++++++++ internal/controller/bucket/subresources.go | 1 + internal/rgw/policy.go | 60 ++++ ...vider-ceph.ceph.crossplane.io_buckets.yaml | 6 + 11 files changed, 841 insertions(+), 2 deletions(-) create mode 100644 internal/controller/bucket/policy.go create mode 100644 internal/controller/bucket/policy_test.go create mode 100644 internal/rgw/policy.go diff --git a/apis/provider-ceph/v1alpha1/bucket_types.go b/apis/provider-ceph/v1alpha1/bucket_types.go index 7bf84e8e..0f7f760c 100644 --- a/apis/provider-ceph/v1alpha1/bucket_types.go +++ b/apis/provider-ceph/v1alpha1/bucket_types.go @@ -85,6 +85,12 @@ type BucketParameters struct { // AssumeRoleTags may be used to add custom values to an AssumeRole request. // +optional AssumeRoleTags []Tag `json:"assumeRoleTags,omitempty"` + + // Policy is a JSON string of BucketPolicy. + // If it is set, Provider-Ceph calls PutBucketPolicy API after creating the bucket. + // Before adding it, you should validate the JSON string. + // +optional + Policy string `json:"policy,omitempty"` } // BackendInfo contains relevant information about an S3 backend for diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index 2a6daa33..a8976c78 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -20,7 +20,7 @@ Spin up the test environment, but without Localstack and use your own external C ``` AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= CEPH_ADDRESS= make dev-ceph -` +``` In either case, after you've made some changes, kill (Ctrl+C) the existing `provider-ceph` and re-run it: diff --git a/internal/backendstore/backend.go b/internal/backendstore/backend.go index b871aa7c..e152210b 100644 --- a/internal/backendstore/backend.go +++ b/internal/backendstore/backend.go @@ -41,6 +41,9 @@ type S3Client interface { DeleteBucketLifecycle(context.Context, *s3.DeleteBucketLifecycleInput, ...func(*s3.Options)) (*s3.DeleteBucketLifecycleOutput, error) GetBucketAcl(context.Context, *s3.GetBucketAclInput, ...func(*s3.Options)) (*s3.GetBucketAclOutput, error) PutBucketAcl(context.Context, *s3.PutBucketAclInput, ...func(*s3.Options)) (*s3.PutBucketAclOutput, error) + PutBucketPolicy(context.Context, *s3.PutBucketPolicyInput, ...func(*s3.Options)) (*s3.PutBucketPolicyOutput, error) + GetBucketPolicy(context.Context, *s3.GetBucketPolicyInput, ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) + DeleteBucketPolicy(context.Context, *s3.DeleteBucketPolicyInput, ...func(*s3.Options)) (*s3.DeleteBucketPolicyOutput, error) } //counterfeiter:generate . STSClient diff --git a/internal/backendstore/backendstorefakes/fake_s3client.go b/internal/backendstore/backendstorefakes/fake_s3client.go index 24ba2922..90c41468 100644 --- a/internal/backendstore/backendstorefakes/fake_s3client.go +++ b/internal/backendstore/backendstorefakes/fake_s3client.go @@ -55,6 +55,21 @@ type FakeS3Client struct { result1 *s3.DeleteBucketLifecycleOutput result2 error } + DeleteBucketPolicyStub func(context.Context, *s3.DeleteBucketPolicyInput, ...func(*s3.Options)) (*s3.DeleteBucketPolicyOutput, error) + deleteBucketPolicyMutex sync.RWMutex + deleteBucketPolicyArgsForCall []struct { + arg1 context.Context + arg2 *s3.DeleteBucketPolicyInput + arg3 []func(*s3.Options) + } + deleteBucketPolicyReturns struct { + result1 *s3.DeleteBucketPolicyOutput + result2 error + } + deleteBucketPolicyReturnsOnCall map[int]struct { + result1 *s3.DeleteBucketPolicyOutput + result2 error + } DeleteObjectStub func(context.Context, *s3.DeleteObjectInput, ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) deleteObjectMutex sync.RWMutex deleteObjectArgsForCall []struct { @@ -100,6 +115,21 @@ type FakeS3Client struct { result1 *s3.GetBucketLifecycleConfigurationOutput result2 error } + GetBucketPolicyStub func(context.Context, *s3.GetBucketPolicyInput, ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) + getBucketPolicyMutex sync.RWMutex + getBucketPolicyArgsForCall []struct { + arg1 context.Context + arg2 *s3.GetBucketPolicyInput + arg3 []func(*s3.Options) + } + getBucketPolicyReturns struct { + result1 *s3.GetBucketPolicyOutput + result2 error + } + getBucketPolicyReturnsOnCall map[int]struct { + result1 *s3.GetBucketPolicyOutput + result2 error + } GetObjectStub func(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) getObjectMutex sync.RWMutex getObjectArgsForCall []struct { @@ -190,6 +220,21 @@ type FakeS3Client struct { result1 *s3.PutBucketLifecycleConfigurationOutput result2 error } + PutBucketPolicyStub func(context.Context, *s3.PutBucketPolicyInput, ...func(*s3.Options)) (*s3.PutBucketPolicyOutput, error) + putBucketPolicyMutex sync.RWMutex + putBucketPolicyArgsForCall []struct { + arg1 context.Context + arg2 *s3.PutBucketPolicyInput + arg3 []func(*s3.Options) + } + putBucketPolicyReturns struct { + result1 *s3.PutBucketPolicyOutput + result2 error + } + putBucketPolicyReturnsOnCall map[int]struct { + result1 *s3.PutBucketPolicyOutput + result2 error + } PutObjectStub func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) putObjectMutex sync.RWMutex putObjectArgsForCall []struct { @@ -407,6 +452,72 @@ func (fake *FakeS3Client) DeleteBucketLifecycleReturnsOnCall(i int, result1 *s3. }{result1, result2} } +func (fake *FakeS3Client) DeleteBucketPolicy(arg1 context.Context, arg2 *s3.DeleteBucketPolicyInput, arg3 ...func(*s3.Options)) (*s3.DeleteBucketPolicyOutput, error) { + fake.deleteBucketPolicyMutex.Lock() + ret, specificReturn := fake.deleteBucketPolicyReturnsOnCall[len(fake.deleteBucketPolicyArgsForCall)] + fake.deleteBucketPolicyArgsForCall = append(fake.deleteBucketPolicyArgsForCall, struct { + arg1 context.Context + arg2 *s3.DeleteBucketPolicyInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.DeleteBucketPolicyStub + fakeReturns := fake.deleteBucketPolicyReturns + fake.recordInvocation("DeleteBucketPolicy", []interface{}{arg1, arg2, arg3}) + fake.deleteBucketPolicyMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) DeleteBucketPolicyCallCount() int { + fake.deleteBucketPolicyMutex.RLock() + defer fake.deleteBucketPolicyMutex.RUnlock() + return len(fake.deleteBucketPolicyArgsForCall) +} + +func (fake *FakeS3Client) DeleteBucketPolicyCalls(stub func(context.Context, *s3.DeleteBucketPolicyInput, ...func(*s3.Options)) (*s3.DeleteBucketPolicyOutput, error)) { + fake.deleteBucketPolicyMutex.Lock() + defer fake.deleteBucketPolicyMutex.Unlock() + fake.DeleteBucketPolicyStub = stub +} + +func (fake *FakeS3Client) DeleteBucketPolicyArgsForCall(i int) (context.Context, *s3.DeleteBucketPolicyInput, []func(*s3.Options)) { + fake.deleteBucketPolicyMutex.RLock() + defer fake.deleteBucketPolicyMutex.RUnlock() + argsForCall := fake.deleteBucketPolicyArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) DeleteBucketPolicyReturns(result1 *s3.DeleteBucketPolicyOutput, result2 error) { + fake.deleteBucketPolicyMutex.Lock() + defer fake.deleteBucketPolicyMutex.Unlock() + fake.DeleteBucketPolicyStub = nil + fake.deleteBucketPolicyReturns = struct { + result1 *s3.DeleteBucketPolicyOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) DeleteBucketPolicyReturnsOnCall(i int, result1 *s3.DeleteBucketPolicyOutput, result2 error) { + fake.deleteBucketPolicyMutex.Lock() + defer fake.deleteBucketPolicyMutex.Unlock() + fake.DeleteBucketPolicyStub = nil + if fake.deleteBucketPolicyReturnsOnCall == nil { + fake.deleteBucketPolicyReturnsOnCall = make(map[int]struct { + result1 *s3.DeleteBucketPolicyOutput + result2 error + }) + } + fake.deleteBucketPolicyReturnsOnCall[i] = struct { + result1 *s3.DeleteBucketPolicyOutput + result2 error + }{result1, result2} +} + func (fake *FakeS3Client) DeleteObject(arg1 context.Context, arg2 *s3.DeleteObjectInput, arg3 ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { fake.deleteObjectMutex.Lock() ret, specificReturn := fake.deleteObjectReturnsOnCall[len(fake.deleteObjectArgsForCall)] @@ -605,6 +716,72 @@ func (fake *FakeS3Client) GetBucketLifecycleConfigurationReturnsOnCall(i int, re }{result1, result2} } +func (fake *FakeS3Client) GetBucketPolicy(arg1 context.Context, arg2 *s3.GetBucketPolicyInput, arg3 ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + fake.getBucketPolicyMutex.Lock() + ret, specificReturn := fake.getBucketPolicyReturnsOnCall[len(fake.getBucketPolicyArgsForCall)] + fake.getBucketPolicyArgsForCall = append(fake.getBucketPolicyArgsForCall, struct { + arg1 context.Context + arg2 *s3.GetBucketPolicyInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.GetBucketPolicyStub + fakeReturns := fake.getBucketPolicyReturns + fake.recordInvocation("GetBucketPolicy", []interface{}{arg1, arg2, arg3}) + fake.getBucketPolicyMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) GetBucketPolicyCallCount() int { + fake.getBucketPolicyMutex.RLock() + defer fake.getBucketPolicyMutex.RUnlock() + return len(fake.getBucketPolicyArgsForCall) +} + +func (fake *FakeS3Client) GetBucketPolicyCalls(stub func(context.Context, *s3.GetBucketPolicyInput, ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error)) { + fake.getBucketPolicyMutex.Lock() + defer fake.getBucketPolicyMutex.Unlock() + fake.GetBucketPolicyStub = stub +} + +func (fake *FakeS3Client) GetBucketPolicyArgsForCall(i int) (context.Context, *s3.GetBucketPolicyInput, []func(*s3.Options)) { + fake.getBucketPolicyMutex.RLock() + defer fake.getBucketPolicyMutex.RUnlock() + argsForCall := fake.getBucketPolicyArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) GetBucketPolicyReturns(result1 *s3.GetBucketPolicyOutput, result2 error) { + fake.getBucketPolicyMutex.Lock() + defer fake.getBucketPolicyMutex.Unlock() + fake.GetBucketPolicyStub = nil + fake.getBucketPolicyReturns = struct { + result1 *s3.GetBucketPolicyOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) GetBucketPolicyReturnsOnCall(i int, result1 *s3.GetBucketPolicyOutput, result2 error) { + fake.getBucketPolicyMutex.Lock() + defer fake.getBucketPolicyMutex.Unlock() + fake.GetBucketPolicyStub = nil + if fake.getBucketPolicyReturnsOnCall == nil { + fake.getBucketPolicyReturnsOnCall = make(map[int]struct { + result1 *s3.GetBucketPolicyOutput + result2 error + }) + } + fake.getBucketPolicyReturnsOnCall[i] = struct { + result1 *s3.GetBucketPolicyOutput + result2 error + }{result1, result2} +} + func (fake *FakeS3Client) GetObject(arg1 context.Context, arg2 *s3.GetObjectInput, arg3 ...func(*s3.Options)) (*s3.GetObjectOutput, error) { fake.getObjectMutex.Lock() ret, specificReturn := fake.getObjectReturnsOnCall[len(fake.getObjectArgsForCall)] @@ -1001,6 +1178,72 @@ func (fake *FakeS3Client) PutBucketLifecycleConfigurationReturnsOnCall(i int, re }{result1, result2} } +func (fake *FakeS3Client) PutBucketPolicy(arg1 context.Context, arg2 *s3.PutBucketPolicyInput, arg3 ...func(*s3.Options)) (*s3.PutBucketPolicyOutput, error) { + fake.putBucketPolicyMutex.Lock() + ret, specificReturn := fake.putBucketPolicyReturnsOnCall[len(fake.putBucketPolicyArgsForCall)] + fake.putBucketPolicyArgsForCall = append(fake.putBucketPolicyArgsForCall, struct { + arg1 context.Context + arg2 *s3.PutBucketPolicyInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.PutBucketPolicyStub + fakeReturns := fake.putBucketPolicyReturns + fake.recordInvocation("PutBucketPolicy", []interface{}{arg1, arg2, arg3}) + fake.putBucketPolicyMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) PutBucketPolicyCallCount() int { + fake.putBucketPolicyMutex.RLock() + defer fake.putBucketPolicyMutex.RUnlock() + return len(fake.putBucketPolicyArgsForCall) +} + +func (fake *FakeS3Client) PutBucketPolicyCalls(stub func(context.Context, *s3.PutBucketPolicyInput, ...func(*s3.Options)) (*s3.PutBucketPolicyOutput, error)) { + fake.putBucketPolicyMutex.Lock() + defer fake.putBucketPolicyMutex.Unlock() + fake.PutBucketPolicyStub = stub +} + +func (fake *FakeS3Client) PutBucketPolicyArgsForCall(i int) (context.Context, *s3.PutBucketPolicyInput, []func(*s3.Options)) { + fake.putBucketPolicyMutex.RLock() + defer fake.putBucketPolicyMutex.RUnlock() + argsForCall := fake.putBucketPolicyArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) PutBucketPolicyReturns(result1 *s3.PutBucketPolicyOutput, result2 error) { + fake.putBucketPolicyMutex.Lock() + defer fake.putBucketPolicyMutex.Unlock() + fake.PutBucketPolicyStub = nil + fake.putBucketPolicyReturns = struct { + result1 *s3.PutBucketPolicyOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) PutBucketPolicyReturnsOnCall(i int, result1 *s3.PutBucketPolicyOutput, result2 error) { + fake.putBucketPolicyMutex.Lock() + defer fake.putBucketPolicyMutex.Unlock() + fake.PutBucketPolicyStub = nil + if fake.putBucketPolicyReturnsOnCall == nil { + fake.putBucketPolicyReturnsOnCall = make(map[int]struct { + result1 *s3.PutBucketPolicyOutput + result2 error + }) + } + fake.putBucketPolicyReturnsOnCall[i] = struct { + result1 *s3.PutBucketPolicyOutput + result2 error + }{result1, result2} +} + func (fake *FakeS3Client) PutObject(arg1 context.Context, arg2 *s3.PutObjectInput, arg3 ...func(*s3.Options)) (*s3.PutObjectOutput, error) { fake.putObjectMutex.Lock() ret, specificReturn := fake.putObjectReturnsOnCall[len(fake.putObjectArgsForCall)] @@ -1076,12 +1319,16 @@ func (fake *FakeS3Client) Invocations() map[string][][]interface{} { defer fake.deleteBucketMutex.RUnlock() fake.deleteBucketLifecycleMutex.RLock() defer fake.deleteBucketLifecycleMutex.RUnlock() + fake.deleteBucketPolicyMutex.RLock() + defer fake.deleteBucketPolicyMutex.RUnlock() fake.deleteObjectMutex.RLock() defer fake.deleteObjectMutex.RUnlock() fake.getBucketAclMutex.RLock() defer fake.getBucketAclMutex.RUnlock() fake.getBucketLifecycleConfigurationMutex.RLock() defer fake.getBucketLifecycleConfigurationMutex.RUnlock() + fake.getBucketPolicyMutex.RLock() + defer fake.getBucketPolicyMutex.RUnlock() fake.getObjectMutex.RLock() defer fake.getObjectMutex.RUnlock() fake.headBucketMutex.RLock() @@ -1094,6 +1341,8 @@ func (fake *FakeS3Client) Invocations() map[string][][]interface{} { defer fake.putBucketAclMutex.RUnlock() fake.putBucketLifecycleConfigurationMutex.RLock() defer fake.putBucketLifecycleConfigurationMutex.RUnlock() + fake.putBucketPolicyMutex.RLock() + defer fake.putBucketPolicyMutex.RUnlock() fake.putObjectMutex.RLock() defer fake.putObjectMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/internal/controller/bucket/consts.go b/internal/controller/bucket/consts.go index 478872ca..8ab0bfef 100644 --- a/internal/controller/bucket/consts.go +++ b/internal/controller/bucket/consts.go @@ -24,5 +24,9 @@ const ( errObserveAcl = "failed to observe bucket acl" errHandleAcl = "failed to handle bucket acl" + // Policy error messages. + errObservePolicy = "failed to observe bucket policy" + errHandlePolicy = "failed to handle bucket policy" + True = "true" ) diff --git a/internal/controller/bucket/lifecycleconfiguration.go b/internal/controller/bucket/lifecycleconfiguration.go index ba86c869..88ae3d8e 100644 --- a/internal/controller/bucket/lifecycleconfiguration.go +++ b/internal/controller/bucket/lifecycleconfiguration.go @@ -32,11 +32,11 @@ type LifecycleConfigurationClient struct { log logging.Logger } -// NewLifecycleConfigurationClient creates the client for Accelerate Configuration func NewLifecycleConfigurationClient(b *backendstore.BackendStore, h *s3clienthandler.Handler, l logging.Logger) *LifecycleConfigurationClient { return &LifecycleConfigurationClient{backendStore: b, s3ClientHandler: h, log: l} } +//nolint:dupl // LifecycleConfiguration and Policy are different feature. func (l *LifecycleConfigurationClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backendNames []string) (ResourceStatus, error) { ctx, span := otel.Tracer("").Start(ctx, "bucket.LifecycleConfigurationClient.Observe") defer span.End() diff --git a/internal/controller/bucket/policy.go b/internal/controller/bucket/policy.go new file mode 100644 index 00000000..ff4a14e7 --- /dev/null +++ b/internal/controller/bucket/policy.go @@ -0,0 +1,202 @@ +package bucket + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/smithy-go" + + "github.com/crossplane/crossplane-runtime/pkg/errors" + "github.com/crossplane/crossplane-runtime/pkg/logging" + + "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" + apisv1alpha1 "github.com/linode/provider-ceph/apis/v1alpha1" + "github.com/linode/provider-ceph/internal/backendstore" + "github.com/linode/provider-ceph/internal/consts" + "github.com/linode/provider-ceph/internal/controller/s3clienthandler" + "github.com/linode/provider-ceph/internal/otel/traces" + "github.com/linode/provider-ceph/internal/rgw" + "go.opentelemetry.io/otel" +) + +// PolicyClient is the client for API methods and reconciling a BucketPolicy +type PolicyClient struct { + backendStore *backendstore.BackendStore + s3ClientHandler *s3clienthandler.Handler + log logging.Logger +} + +func NewPolicyClient(b *backendstore.BackendStore, h *s3clienthandler.Handler, l logging.Logger) *PolicyClient { + return &PolicyClient{backendStore: b, s3ClientHandler: h, log: l} +} + +//nolint:dupl // LifecycleConfiguration and Policy are different feature. +func (p *PolicyClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backendNames []string) (ResourceStatus, error) { + ctx, span := otel.Tracer("").Start(ctx, "bucket.PolicyClient.Observe") + defer span.End() + + observationChan := make(chan ResourceStatus) + errChan := make(chan error) + + for _, backendName := range backendNames { + beName := backendName + go func() { + observation, err := p.observeBackend(ctx, bucket, beName) + if err != nil { + errChan <- err + + return + } + observationChan <- observation + }() + } + + for i := 0; i < len(backendNames); i++ { + select { + case <-ctx.Done(): + p.log.Info("Context timeout during bucket policy observation", consts.KeyBucketName, bucket.Name) + err := errors.Wrap(ctx.Err(), errObservePolicy) + traces.SetAndRecordError(span, err) + + return NeedsUpdate, err + case observation := <-observationChan: + if observation != Updated { + return observation, nil + } + case err := <-errChan: + err = errors.Wrap(err, errObservePolicy) + traces.SetAndRecordError(span, err) + + return NeedsUpdate, err + } + } + + return Updated, nil +} + +func (p *PolicyClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) { + p.log.Info("Observing subresource policy on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) + + if p.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy { + // If a backend is marked as unhealthy, we can ignore it for now by returning Updated. + // The backend may be down for some time and we do not want to block Create/Update/Delete + // calls on other backends. By returning NeedsUpdate here, we would never pass the Observe + // phase until the backend becomes Healthy or Disabled. + return Updated, nil + } + + s3Client, err := p.s3ClientHandler.GetS3Client(ctx, bucket, backendName) + if err != nil { + return NeedsUpdate, err + } + + // external keeps the bucket policy in backend. + var external string + + response, err := rgw.GetBucketPolicy(ctx, s3Client, aws.String(bucket.Name)) + // If error is not NoSuchBucketPolicy error, return with the error. + if err != nil && !isNoSuchBucketPolicy(err) { + return NeedsUpdate, err + } + + if response != nil && response.Policy != nil { + external = *response.Policy + } + + if bucket.Spec.ForProvider.Policy == "" { + // No policy config is specified. + // In that case, it should not exist on any backend. + if external == "" { + p.log.Info("No bucket policy found on backend - no action required", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) + + return Updated, nil + } else { + p.log.Info("Bucket policy found on backend - requires deletion", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) + + return NeedsDeletion, nil + } + } + + local := bucket.Spec.ForProvider.Policy + if local != external { + p.log.Info("Bucket policy requires update on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) + + return NeedsUpdate, nil + } + + return Updated, nil +} + +func (p *PolicyClient) Handle(ctx context.Context, b *v1alpha1.Bucket, backendName string, bb *bucketBackends) error { + ctx, span := otel.Tracer("").Start(ctx, "bucket.PolicyClient.Handle") + defer span.End() + + observation, err := p.observeBackend(ctx, b, backendName) + if err != nil { + err = errors.Wrap(err, errHandlePolicy) + traces.SetAndRecordError(span, err) + + return err + } + + switch observation { + case Updated: + return nil + case NeedsDeletion: + if err := p.delete(ctx, b, backendName); err != nil { + err = errors.Wrap(err, errHandlePolicy) + + traces.SetAndRecordError(span, err) + + return err + } + case NeedsUpdate: + if err := p.createOrUpdate(ctx, b, backendName); err != nil { + err = errors.Wrap(err, errHandlePolicy) + + traces.SetAndRecordError(span, err) + + return err + } + } + + return nil +} + +func (p *PolicyClient) createOrUpdate(ctx context.Context, b *v1alpha1.Bucket, backendName string) error { + p.log.Info("Updating bucket policy", consts.KeyBucketName, b.Name, consts.KeyBackendName, backendName) + s3Client, err := p.s3ClientHandler.GetS3Client(ctx, b, backendName) + if err != nil { + return err + } + + _, err = rgw.PutBucketPolicy(ctx, s3Client, b) + if err != nil { + return err + } + + return nil +} + +func (p *PolicyClient) delete(ctx context.Context, b *v1alpha1.Bucket, backendName string) error { + p.log.Info("Deleting bucket policy", consts.KeyBucketName, b.Name, consts.KeyBackendName, backendName) + s3Client, err := p.s3ClientHandler.GetS3Client(ctx, b, backendName) + if err != nil { + return err + } + + if err := rgw.DeleteBucketPolicy(ctx, s3Client, aws.String(b.Name)); err != nil { + return err + } + + return nil +} + +func isNoSuchBucketPolicy(err error) bool { + var ae smithy.APIError + if !errors.As(err, &ae) { + return false + } + + return ae != nil && ae.ErrorCode() == "NoSuchBucketPolicy" +} diff --git a/internal/controller/bucket/policy_test.go b/internal/controller/bucket/policy_test.go new file mode 100644 index 00000000..558162bc --- /dev/null +++ b/internal/controller/bucket/policy_test.go @@ -0,0 +1,308 @@ +package bucket + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go" + "github.com/crossplane/crossplane-runtime/pkg/errors" + "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" + apisv1alpha1 "github.com/linode/provider-ceph/apis/v1alpha1" + "github.com/linode/provider-ceph/internal/backendstore" + "github.com/linode/provider-ceph/internal/backendstore/backendstorefakes" + "github.com/linode/provider-ceph/internal/controller/s3clienthandler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + errS3 = errors.New("some error") + samplePolicy = `{"Policy": "{\"Version\": \"2012-10-17\", \"Statement\": [{\"Action\": [\"*\"], \"Principal\": \"*\", \"Sid\": \"\", \"Effect\": \"Allow\", \"Resource\": \"arn:aws:s3:::shunsuke-rgw-acl-test/*\"},]}"}` +) + +//nolint:maintidx // for testing +func TestPolicyObserveBackend(t *testing.T) { + t.Parallel() + + type fields struct { + backendStore *backendstore.BackendStore + } + + type args struct { + bucket *v1alpha1.Bucket + backendName string + } + + type want struct { + status ResourceStatus + err error + } + + cases := map[string]struct { + reason string + fields fields + args args + want want + }{ + "Attempt to observe policy on unhealthy backend": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{} + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusUnhealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: Updated, + }, + }, + + "s3 error": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return nil, errS3 + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: NeedsUpdate, + err: errS3, + }, + }, + "ok - policy is updated": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return &s3.GetBucketPolicyOutput{Policy: aws.String(samplePolicy)}, nil + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + Spec: v1alpha1.BucketSpec{ + ForProvider: v1alpha1.BucketParameters{ + Policy: samplePolicy, + }, + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: Updated, + }, + }, + "ok - no bucket policy in backend, but bucket CR has": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return nil, &smithy.GenericAPIError{Code: "NoSuchBucketPolicy", Message: "no policy"} + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + Spec: v1alpha1.BucketSpec{ + ForProvider: v1alpha1.BucketParameters{ + Policy: samplePolicy, + }, + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: NeedsUpdate, + }, + }, + "ok - no bucket policy both in backend and bucket cr": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return nil, &smithy.GenericAPIError{Code: "NoSuchBucketPolicy", Message: "no policy"} + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: Updated, + }, + }, + "ok - policy is returned, but empty": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + emptyPolicy := "" + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return &s3.GetBucketPolicyOutput{Policy: aws.String(emptyPolicy)}, nil + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + Spec: v1alpha1.BucketSpec{ + ForProvider: v1alpha1.BucketParameters{ + Policy: samplePolicy, + }, + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: NeedsUpdate, + }, + }, + "ok - backend has policy, but bucket cr doesn't": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return &s3.GetBucketPolicyOutput{Policy: aws.String(samplePolicy)}, nil + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: NeedsDeletion, + }, + }, + "ok - backend has different policy": { + fields: fields{ + backendStore: func() *backendstore.BackendStore { + differentPolicy := "{}" + fake := backendstorefakes.FakeS3Client{ + GetBucketPolicyStub: func(ctx context.Context, in *s3.GetBucketPolicyInput, f ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) { + return &s3.GetBucketPolicyOutput{Policy: aws.String(differentPolicy)}, nil + }, + } + + bs := backendstore.NewBackendStore() + bs.AddOrUpdateBackend("s3-backend-1", &fake, nil, true, apisv1alpha1.HealthStatusHealthy) + + return bs + }(), + }, + args: args{ + bucket: &v1alpha1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bucket", + }, + Spec: v1alpha1.BucketSpec{ + ForProvider: v1alpha1.BucketParameters{ + Policy: samplePolicy, + }, + }, + }, + backendName: "s3-backend-1", + }, + want: want{ + status: NeedsUpdate, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + + c := NewPolicyClient( + tc.fields.backendStore, + s3clienthandler.NewHandler( + s3clienthandler.WithAssumeRoleArn(nil), + s3clienthandler.WithBackendStore(tc.fields.backendStore), + ), + logging.NewNopLogger(), + ) + + got, err := c.observeBackend(context.Background(), tc.args.bucket, tc.args.backendName) + require.ErrorIs(t, err, tc.want.err, "unexpected error") + assert.Equal(t, tc.want.status, got, "unexpected status") + }) + } +} diff --git a/internal/controller/bucket/subresources.go b/internal/controller/bucket/subresources.go index cc18078b..987950b2 100644 --- a/internal/controller/bucket/subresources.go +++ b/internal/controller/bucket/subresources.go @@ -37,6 +37,7 @@ func NewSubresourceClients(b *backendstore.BackendStore, h *s3clienthandler.Hand return []SubresourceClient{ NewLifecycleConfigurationClient(b, h, l.WithValues("lifecycle-configuration-client", managed.ControllerName(v1alpha1.BucketGroupKind))), NewACLClient(b, h, l.WithValues("acl-client", managed.ControllerName(v1alpha1.BucketGroupKind))), + NewPolicyClient(b, h, l.WithValues("policy-client", managed.ControllerName(v1alpha1.BucketGroupKind))), } } diff --git a/internal/rgw/policy.go b/internal/rgw/policy.go new file mode 100644 index 00000000..494629ac --- /dev/null +++ b/internal/rgw/policy.go @@ -0,0 +1,60 @@ +package rgw + +import ( + "context" + + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/crossplane/crossplane-runtime/pkg/errors" + "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" + "github.com/linode/provider-ceph/internal/backendstore" + "github.com/linode/provider-ceph/internal/otel/traces" + "go.opentelemetry.io/otel" +) + +const ( + errGetBucketPolicy = "failed to get bucket policy" + errPutBucketPolicy = "failed to put bucket policy" + errDeleteBucketPolicy = "failed to delete bucket policy" +) + +func GetBucketPolicy(ctx context.Context, s3Backend backendstore.S3Client, bucketName *string) (*awss3.GetBucketPolicyOutput, error) { + ctx, span := otel.Tracer("").Start(ctx, "GetBucketPolicy") + defer span.End() + + resp, err := s3Backend.GetBucketPolicy(ctx, &awss3.GetBucketPolicyInput{Bucket: bucketName}) + if err != nil { + traces.SetAndRecordError(span, err) + + return resp, errors.Wrap(err, errGetBucketPolicy) + } + + return resp, nil +} + +func PutBucketPolicy(ctx context.Context, s3Backend backendstore.S3Client, b *v1alpha1.Bucket) (*awss3.PutBucketPolicyOutput, error) { + ctx, span := otel.Tracer("").Start(ctx, "PutBucketPolicy") + defer span.End() + + resp, err := s3Backend.PutBucketPolicy(ctx, &awss3.PutBucketPolicyInput{Bucket: &b.Name, Policy: &b.Spec.ForProvider.Policy}) + if err != nil { + traces.SetAndRecordError(span, err) + + return resp, errors.Wrap(err, errPutBucketPolicy) + } + + return resp, nil +} + +func DeleteBucketPolicy(ctx context.Context, s3Backend backendstore.S3Client, bucketName *string) error { + ctx, span := otel.Tracer("").Start(ctx, "DeleteBucketPolicy") + defer span.End() + + _, err := s3Backend.DeleteBucketPolicy(ctx, &awss3.DeleteBucketPolicyInput{Bucket: bucketName}) + if err != nil { + traces.SetAndRecordError(span, err) + + return errors.Wrap(err, errDeleteBucketPolicy) + } + + return nil +} diff --git a/package/crds/provider-ceph.ceph.crossplane.io_buckets.yaml b/package/crds/provider-ceph.ceph.crossplane.io_buckets.yaml index 032838ed..dadcef1b 100644 --- a/package/crds/provider-ceph.ceph.crossplane.io_buckets.yaml +++ b/package/crds/provider-ceph.ceph.crossplane.io_buckets.yaml @@ -479,6 +479,12 @@ spec: don't specify an ACL or bucket owner full control ACLs, such as the bucket-owner-full-control canned ACL or an equivalent form of this ACL expressed in the XML format. type: string + policy: + description: |- + Policy is a JSON string of BucketPolicy. + If it is set, Provider-Ceph calls PutBucketPolicy API after creating the bucket. + Before adding it, you should validate the JSON string. + type: string type: object lifecycleConfigurationDisabled: description: |-