From 1e11adc49f079141a186a96c24d913392aa776d3 Mon Sep 17 00:00:00 2001 From: nolancon Date: Thu, 20 Jun 2024 16:01:58 +0000 Subject: [PATCH] add subresource versioning WIP --- examples/sample/bucket.yaml | 4 +- internal/backendstore/backend.go | 2 + .../backendstorefakes/fake_s3client.go | 166 ++++++++++++++ internal/controller/bucket/bucket_backends.go | 59 ++++- internal/controller/bucket/consts.go | 4 + internal/controller/bucket/helpers.go | 6 + internal/controller/bucket/subresources.go | 1 + internal/controller/bucket/update.go | 11 +- .../bucket/versioningconfiguration.go | 214 ++++++++++++++++++ internal/rgw/versioningconfiguration.go | 48 ++++ .../rgw/versioningconfiguration_helpers.go | 32 +++ 11 files changed, 543 insertions(+), 4 deletions(-) create mode 100644 internal/controller/bucket/versioningconfiguration.go create mode 100644 internal/rgw/versioningconfiguration.go create mode 100644 internal/rgw/versioningconfiguration_helpers.go diff --git a/examples/sample/bucket.yaml b/examples/sample/bucket.yaml index 082fa502..e78f52e0 100644 --- a/examples/sample/bucket.yaml +++ b/examples/sample/bucket.yaml @@ -3,4 +3,6 @@ kind: Bucket metadata: name: test-bucket spec: - forProvider: {} + forProvider: + versioningConfiguration: + status: Enabled diff --git a/internal/backendstore/backend.go b/internal/backendstore/backend.go index e152210b..069c76c8 100644 --- a/internal/backendstore/backend.go +++ b/internal/backendstore/backend.go @@ -44,6 +44,8 @@ type S3Client interface { PutBucketPolicy(context.Context, *s3.PutBucketPolicyInput, ...func(*s3.Options)) (*s3.PutBucketPolicyOutput, error) GetBucketPolicy(context.Context, *s3.GetBucketPolicyInput, ...func(*s3.Options)) (*s3.GetBucketPolicyOutput, error) DeleteBucketPolicy(context.Context, *s3.DeleteBucketPolicyInput, ...func(*s3.Options)) (*s3.DeleteBucketPolicyOutput, error) + PutBucketVersioning(context.Context, *s3.PutBucketVersioningInput, ...func(*s3.Options)) (*s3.PutBucketVersioningOutput, error) + GetBucketVersioning(context.Context, *s3.GetBucketVersioningInput, ...func(*s3.Options)) (*s3.GetBucketVersioningOutput, error) } //counterfeiter:generate . STSClient diff --git a/internal/backendstore/backendstorefakes/fake_s3client.go b/internal/backendstore/backendstorefakes/fake_s3client.go index 90c41468..844e9c0c 100644 --- a/internal/backendstore/backendstorefakes/fake_s3client.go +++ b/internal/backendstore/backendstorefakes/fake_s3client.go @@ -130,6 +130,21 @@ type FakeS3Client struct { result1 *s3.GetBucketPolicyOutput result2 error } + GetBucketVersioningStub func(context.Context, *s3.GetBucketVersioningInput, ...func(*s3.Options)) (*s3.GetBucketVersioningOutput, error) + getBucketVersioningMutex sync.RWMutex + getBucketVersioningArgsForCall []struct { + arg1 context.Context + arg2 *s3.GetBucketVersioningInput + arg3 []func(*s3.Options) + } + getBucketVersioningReturns struct { + result1 *s3.GetBucketVersioningOutput + result2 error + } + getBucketVersioningReturnsOnCall map[int]struct { + result1 *s3.GetBucketVersioningOutput + result2 error + } GetObjectStub func(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) getObjectMutex sync.RWMutex getObjectArgsForCall []struct { @@ -235,6 +250,21 @@ type FakeS3Client struct { result1 *s3.PutBucketPolicyOutput result2 error } + PutBucketVersioningStub func(context.Context, *s3.PutBucketVersioningInput, ...func(*s3.Options)) (*s3.PutBucketVersioningOutput, error) + putBucketVersioningMutex sync.RWMutex + putBucketVersioningArgsForCall []struct { + arg1 context.Context + arg2 *s3.PutBucketVersioningInput + arg3 []func(*s3.Options) + } + putBucketVersioningReturns struct { + result1 *s3.PutBucketVersioningOutput + result2 error + } + putBucketVersioningReturnsOnCall map[int]struct { + result1 *s3.PutBucketVersioningOutput + result2 error + } PutObjectStub func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) putObjectMutex sync.RWMutex putObjectArgsForCall []struct { @@ -782,6 +812,72 @@ func (fake *FakeS3Client) GetBucketPolicyReturnsOnCall(i int, result1 *s3.GetBuc }{result1, result2} } +func (fake *FakeS3Client) GetBucketVersioning(arg1 context.Context, arg2 *s3.GetBucketVersioningInput, arg3 ...func(*s3.Options)) (*s3.GetBucketVersioningOutput, error) { + fake.getBucketVersioningMutex.Lock() + ret, specificReturn := fake.getBucketVersioningReturnsOnCall[len(fake.getBucketVersioningArgsForCall)] + fake.getBucketVersioningArgsForCall = append(fake.getBucketVersioningArgsForCall, struct { + arg1 context.Context + arg2 *s3.GetBucketVersioningInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.GetBucketVersioningStub + fakeReturns := fake.getBucketVersioningReturns + fake.recordInvocation("GetBucketVersioning", []interface{}{arg1, arg2, arg3}) + fake.getBucketVersioningMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) GetBucketVersioningCallCount() int { + fake.getBucketVersioningMutex.RLock() + defer fake.getBucketVersioningMutex.RUnlock() + return len(fake.getBucketVersioningArgsForCall) +} + +func (fake *FakeS3Client) GetBucketVersioningCalls(stub func(context.Context, *s3.GetBucketVersioningInput, ...func(*s3.Options)) (*s3.GetBucketVersioningOutput, error)) { + fake.getBucketVersioningMutex.Lock() + defer fake.getBucketVersioningMutex.Unlock() + fake.GetBucketVersioningStub = stub +} + +func (fake *FakeS3Client) GetBucketVersioningArgsForCall(i int) (context.Context, *s3.GetBucketVersioningInput, []func(*s3.Options)) { + fake.getBucketVersioningMutex.RLock() + defer fake.getBucketVersioningMutex.RUnlock() + argsForCall := fake.getBucketVersioningArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) GetBucketVersioningReturns(result1 *s3.GetBucketVersioningOutput, result2 error) { + fake.getBucketVersioningMutex.Lock() + defer fake.getBucketVersioningMutex.Unlock() + fake.GetBucketVersioningStub = nil + fake.getBucketVersioningReturns = struct { + result1 *s3.GetBucketVersioningOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) GetBucketVersioningReturnsOnCall(i int, result1 *s3.GetBucketVersioningOutput, result2 error) { + fake.getBucketVersioningMutex.Lock() + defer fake.getBucketVersioningMutex.Unlock() + fake.GetBucketVersioningStub = nil + if fake.getBucketVersioningReturnsOnCall == nil { + fake.getBucketVersioningReturnsOnCall = make(map[int]struct { + result1 *s3.GetBucketVersioningOutput + result2 error + }) + } + fake.getBucketVersioningReturnsOnCall[i] = struct { + result1 *s3.GetBucketVersioningOutput + result2 error + }{result1, result2} +} + func (fake *FakeS3Client) GetObject(arg1 context.Context, arg2 *s3.GetObjectInput, arg3 ...func(*s3.Options)) (*s3.GetObjectOutput, error) { fake.getObjectMutex.Lock() ret, specificReturn := fake.getObjectReturnsOnCall[len(fake.getObjectArgsForCall)] @@ -1244,6 +1340,72 @@ func (fake *FakeS3Client) PutBucketPolicyReturnsOnCall(i int, result1 *s3.PutBuc }{result1, result2} } +func (fake *FakeS3Client) PutBucketVersioning(arg1 context.Context, arg2 *s3.PutBucketVersioningInput, arg3 ...func(*s3.Options)) (*s3.PutBucketVersioningOutput, error) { + fake.putBucketVersioningMutex.Lock() + ret, specificReturn := fake.putBucketVersioningReturnsOnCall[len(fake.putBucketVersioningArgsForCall)] + fake.putBucketVersioningArgsForCall = append(fake.putBucketVersioningArgsForCall, struct { + arg1 context.Context + arg2 *s3.PutBucketVersioningInput + arg3 []func(*s3.Options) + }{arg1, arg2, arg3}) + stub := fake.PutBucketVersioningStub + fakeReturns := fake.putBucketVersioningReturns + fake.recordInvocation("PutBucketVersioning", []interface{}{arg1, arg2, arg3}) + fake.putBucketVersioningMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeS3Client) PutBucketVersioningCallCount() int { + fake.putBucketVersioningMutex.RLock() + defer fake.putBucketVersioningMutex.RUnlock() + return len(fake.putBucketVersioningArgsForCall) +} + +func (fake *FakeS3Client) PutBucketVersioningCalls(stub func(context.Context, *s3.PutBucketVersioningInput, ...func(*s3.Options)) (*s3.PutBucketVersioningOutput, error)) { + fake.putBucketVersioningMutex.Lock() + defer fake.putBucketVersioningMutex.Unlock() + fake.PutBucketVersioningStub = stub +} + +func (fake *FakeS3Client) PutBucketVersioningArgsForCall(i int) (context.Context, *s3.PutBucketVersioningInput, []func(*s3.Options)) { + fake.putBucketVersioningMutex.RLock() + defer fake.putBucketVersioningMutex.RUnlock() + argsForCall := fake.putBucketVersioningArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeS3Client) PutBucketVersioningReturns(result1 *s3.PutBucketVersioningOutput, result2 error) { + fake.putBucketVersioningMutex.Lock() + defer fake.putBucketVersioningMutex.Unlock() + fake.PutBucketVersioningStub = nil + fake.putBucketVersioningReturns = struct { + result1 *s3.PutBucketVersioningOutput + result2 error + }{result1, result2} +} + +func (fake *FakeS3Client) PutBucketVersioningReturnsOnCall(i int, result1 *s3.PutBucketVersioningOutput, result2 error) { + fake.putBucketVersioningMutex.Lock() + defer fake.putBucketVersioningMutex.Unlock() + fake.PutBucketVersioningStub = nil + if fake.putBucketVersioningReturnsOnCall == nil { + fake.putBucketVersioningReturnsOnCall = make(map[int]struct { + result1 *s3.PutBucketVersioningOutput + result2 error + }) + } + fake.putBucketVersioningReturnsOnCall[i] = struct { + result1 *s3.PutBucketVersioningOutput + result2 error + }{result1, result2} +} + func (fake *FakeS3Client) PutObject(arg1 context.Context, arg2 *s3.PutObjectInput, arg3 ...func(*s3.Options)) (*s3.PutObjectOutput, error) { fake.putObjectMutex.Lock() ret, specificReturn := fake.putObjectReturnsOnCall[len(fake.putObjectArgsForCall)] @@ -1329,6 +1491,8 @@ func (fake *FakeS3Client) Invocations() map[string][][]interface{} { defer fake.getBucketLifecycleConfigurationMutex.RUnlock() fake.getBucketPolicyMutex.RLock() defer fake.getBucketPolicyMutex.RUnlock() + fake.getBucketVersioningMutex.RLock() + defer fake.getBucketVersioningMutex.RUnlock() fake.getObjectMutex.RLock() defer fake.getObjectMutex.RUnlock() fake.headBucketMutex.RLock() @@ -1343,6 +1507,8 @@ func (fake *FakeS3Client) Invocations() map[string][][]interface{} { defer fake.putBucketLifecycleConfigurationMutex.RUnlock() fake.putBucketPolicyMutex.RLock() defer fake.putBucketPolicyMutex.RUnlock() + fake.putBucketVersioningMutex.RLock() + defer fake.putBucketVersioningMutex.RUnlock() fake.putObjectMutex.RLock() defer fake.putObjectMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/internal/controller/bucket/bucket_backends.go b/internal/controller/bucket/bucket_backends.go index 5b03d3bf..08dd1f09 100644 --- a/internal/controller/bucket/bucket_backends.go +++ b/internal/controller/bucket/bucket_backends.go @@ -81,6 +81,36 @@ func (b *bucketBackends) getLifecycleConfigCondition(bucketName, backendName str return b.backends[bucketName][backendName].LifecycleConfigurationCondition } +func (b *bucketBackends) setVersioningConfigCondition(bucketName, backendName string, c *xpv1.Condition) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.backends[bucketName] == nil { + b.backends[bucketName] = make(v1alpha1.Backends) + } + + if b.backends[bucketName][backendName] == nil { + b.backends[bucketName][backendName] = &v1alpha1.BackendInfo{} + } + + b.backends[bucketName][backendName].VersioningConfigurationCondition = c +} + +func (b *bucketBackends) getVersioningConfigCondition(bucketName, backendName string) *xpv1.Condition { + b.mu.RLock() + defer b.mu.RUnlock() + + if _, ok := b.backends[bucketName]; !ok { + return nil + } + + if _, ok := b.backends[bucketName][backendName]; !ok { + return nil + } + + return b.backends[bucketName][backendName].VersioningConfigurationCondition +} + func (b *bucketBackends) deleteBackend(bucketName, backendName string) { b.mu.Lock() defer b.mu.Unlock() @@ -143,7 +173,7 @@ func (b *bucketBackends) countBucketsAvailableOnBackends(bucket *v1alpha1.Bucket return i } -// isLifecycleConfigAvailableOnBackends checks the backends listed in Spec.Providers against +// isLifecycleConfigAvailableOnBackends checks the backends listed in providerNames against // bucketBackends to ensure lifecycle configurations are considered Available on all desired backends. func (b *bucketBackends) isLifecycleConfigAvailableOnBackends(bucket *v1alpha1.Bucket, providerNames []string, c map[string]backendstore.S3Client) bool { for _, backendName := range providerNames { @@ -168,7 +198,7 @@ func (b *bucketBackends) isLifecycleConfigAvailableOnBackends(bucket *v1alpha1.B return true } -// isLifecycleConfigRemovedFromBackends checks the backends listed in Spec.Providers against +// isLifecycleConfigRemovedFromBackends checks the backends listed in providerNames against // bucketBackends to ensure lifecycle configurations are removed from all desired backends. func (b *bucketBackends) isLifecycleConfigRemovedFromBackends(bucket *v1alpha1.Bucket, providerNames []string, c map[string]backendstore.S3Client) bool { for _, backendName := range providerNames { @@ -187,3 +217,28 @@ func (b *bucketBackends) isLifecycleConfigRemovedFromBackends(bucket *v1alpha1.B return true } + +// isVersioningConfigAvailableOnBackends checks the backends listed in providerNames against +// bucketBackends to ensure versioning configurations are considered Available on all desired backends. +func (b *bucketBackends) isVersioningConfigAvailableOnBackends(bucket *v1alpha1.Bucket, providerNames []string, c map[string]backendstore.S3Client) bool { + for _, backendName := range providerNames { + if _, ok := c[backendName]; !ok { + // This backend does not exist in the list of available backends. + // The backend may be offline, so it is skipped. + continue + } + + vCondition := b.getVersioningConfigCondition(bucket.Name, backendName) + if vCondition == nil { + // The versioningconfig has not been created on this backend. + return false + } + + if !vCondition.Equal(xpv1.Available()) { + // The versioningconfig is not Available on this backend. + return false + } + } + + return true +} diff --git a/internal/controller/bucket/consts.go b/internal/controller/bucket/consts.go index 8ab0bfef..3c7f1df7 100644 --- a/internal/controller/bucket/consts.go +++ b/internal/controller/bucket/consts.go @@ -20,6 +20,10 @@ const ( errObserveLifecycleConfig = "failed to observe bucket lifecycle configuration" errHandleLifecycleConfig = "failed to handle bucket lifecycle configuration" + // Versioning configuration error messages. + errObserveVersioningConfig = "failed to observe bucket versioning configuration" + errHandleVersioningConfig = "failed to handle bucket versioning configuration" + // ACL error messages. errObserveAcl = "failed to observe bucket acl" errHandleAcl = "failed to handle bucket acl" diff --git a/internal/controller/bucket/helpers.go b/internal/controller/bucket/helpers.go index 5121d979..ccab1375 100644 --- a/internal/controller/bucket/helpers.go +++ b/internal/controller/bucket/helpers.go @@ -52,6 +52,12 @@ func isPauseRequired(bucket *v1alpha1.Bucket, providerNames []string, minReplica return false } + // If versioning config is specified in the spec, we should only pause once the + // versioning config is available on all backends. + if bucket.Spec.ForProvider.VersioningConfiguration != nil && !bb.isVersioningConfigAvailableOnBackends(bucket, providerNames, c) { + return false + } + return (bucket.Spec.AutoPause || autopauseEnabled) && // Only return true if this label value is "". // This is to allow the user to delete a paused bucket with autopause enabled. diff --git a/internal/controller/bucket/subresources.go b/internal/controller/bucket/subresources.go index 987950b2..0aa67d95 100644 --- a/internal/controller/bucket/subresources.go +++ b/internal/controller/bucket/subresources.go @@ -38,6 +38,7 @@ func NewSubresourceClients(b *backendstore.BackendStore, h *s3clienthandler.Hand NewLifecycleConfigurationClient(b, h, l.WithValues("lifecycle-configuration-client", managed.ControllerName(v1alpha1.BucketGroupKind))), NewACLClient(b, h, l.WithValues("acl-client", managed.ControllerName(v1alpha1.BucketGroupKind))), NewPolicyClient(b, h, l.WithValues("policy-client", managed.ControllerName(v1alpha1.BucketGroupKind))), + NewVersioningConfigurationClient(b, h, l.WithValues("versioning-configuration-client", managed.ControllerName(v1alpha1.BucketGroupKind))), } } diff --git a/internal/controller/bucket/update.go b/internal/controller/bucket/update.go index 9e7016f5..69d71d0e 100644 --- a/internal/controller/bucket/update.go +++ b/internal/controller/bucket/update.go @@ -83,8 +83,18 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext // Whether buckets are updated successfully or not on backends, we need to update the // Bucket CR Status in all cases to represent the conditions of each individual bucket. + cls := c.backendStore.GetBackendS3Clients(allBackendsToUpdateOn) if err := c.updateBucketCR(ctx, bucket, func(bucketDeepCopy, bucketLatest *v1alpha1.Bucket) UpdateRequired { + // If a VersioningConfiguration is specified in the Spec, but is not available on all + // backends, set the VersioningConfiguration in the Status to nil as we cannot consider + // this operation to have been a success. This value was preliminarily set by the + // VersioningConfiguration subresource client. + if bucketDeepCopy.Spec.ForProvider.VersioningConfiguration != nil && + !bucketBackends.isVersioningConfigAvailableOnBackends(bucketDeepCopy, allBackendsToUpdateOn, cls) { + bucketLatest.Status.AtProvider.VersioningConfiguration = nil + } + setBucketStatus(bucketLatest, bucketBackends, allBackendsToUpdateOn, c.minReplicas) return NeedsStatusUpdate @@ -107,7 +117,6 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext // criteria is met before pausing a Bucket CR. Otherwise we check to see if there are // backends that the bucket was not updated on and if so, we set the updateAllErr // which will be returned at the end of this function, triggering a requeue. - cls := c.backendStore.GetBackendS3Clients(allBackendsToUpdateOn) if isPauseRequired(bucketLatest, allBackendsToUpdateOn, c.minReplicas, cls, bucketBackends, c.autoPauseBucket) { c.log.Info("Auto pausing bucket", consts.KeyBucketName, bucket.Name) bucketLatest.Labels[meta.AnnotationKeyReconciliationPaused] = True diff --git a/internal/controller/bucket/versioningconfiguration.go b/internal/controller/bucket/versioningconfiguration.go new file mode 100644 index 00000000..e6a870c3 --- /dev/null +++ b/internal/controller/bucket/versioningconfiguration.go @@ -0,0 +1,214 @@ +package bucket + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/errors" + "github.com/crossplane/crossplane-runtime/pkg/logging" + + "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" + apisv1alpha1 "github.com/linode/provider-ceph/apis/v1alpha1" + "github.com/linode/provider-ceph/internal/backendstore" + "github.com/linode/provider-ceph/internal/consts" + "github.com/linode/provider-ceph/internal/controller/s3clienthandler" + "github.com/linode/provider-ceph/internal/otel/traces" + "github.com/linode/provider-ceph/internal/rgw" + + "github.com/google/go-cmp/cmp" + + "go.opentelemetry.io/otel" +) + +// VersioningConfigurationClient is the client for API methods and reconciling the VersioningConfiguration +type VersioningConfigurationClient struct { + backendStore *backendstore.BackendStore + s3ClientHandler *s3clienthandler.Handler + log logging.Logger +} + +func NewVersioningConfigurationClient(b *backendstore.BackendStore, h *s3clienthandler.Handler, l logging.Logger) *VersioningConfigurationClient { + return &VersioningConfigurationClient{backendStore: b, s3ClientHandler: h, log: l} +} + +//nolint:dupl // VersioningConfiguration and Policy are different feature. +func (l *VersioningConfigurationClient) Observe(ctx context.Context, bucket *v1alpha1.Bucket, backendNames []string) (ResourceStatus, error) { + ctx, span := otel.Tracer("").Start(ctx, "bucket.VersioningConfigurationClient.Observe") + defer span.End() + + if bucket.Spec.ForProvider.VersioningConfiguration == nil { + // Versioning cannot be disabled after it has been enabled. It can only be suspended. + // Therefore we do not need to perform a cleanup in the event that the user has specified + // nil (which would normally equate to disabling a subresource). + l.log.Info("No VersioningConfiguration specifed. If Versioning has previously been enabled, it cannot be disabled") + + return Updated, nil + } + + observationChan := make(chan ResourceStatus) + errChan := make(chan error) + + for _, backendName := range backendNames { + beName := backendName + go func() { + observation, err := l.observeBackend(ctx, bucket, beName) + if err != nil { + errChan <- err + + return + } + observationChan <- observation + }() + } + + for i := 0; i < len(backendNames); i++ { + select { + case <-ctx.Done(): + l.log.Info("Context timeout during bucket versioning configuration observation", consts.KeyBucketName, bucket.Name) + err := errors.Wrap(ctx.Err(), errObserveVersioningConfig) + traces.SetAndRecordError(span, err) + + return NeedsUpdate, err + case observation := <-observationChan: + if observation != Updated { + return observation, nil + } + case err := <-errChan: + err = errors.Wrap(err, errObserveVersioningConfig) + traces.SetAndRecordError(span, err) + + return NeedsUpdate, err + } + } + + return Updated, nil +} + +//nolint:gocyclo,cyclop // Function requires multiple checks. +func (l *VersioningConfigurationClient) observeBackend(ctx context.Context, bucket *v1alpha1.Bucket, backendName string) (ResourceStatus, error) { + l.log.Info("Observing subresource versioning configuration on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) + + if l.backendStore.GetBackendHealthStatus(backendName) == apisv1alpha1.HealthStatusUnhealthy { + // If a backend is marked as unhealthy, we can ignore it for now by returning Updated. + // The backend may be down for some time and we do not want to block Create/Update/Delete + // calls on other backends. By returning NeedsUpdate here, we would never pass the Observe + // phase until the backend becomes Healthy or Disabled. + return Updated, nil + } + + s3Client, err := l.s3ClientHandler.GetS3Client(ctx, bucket, backendName) + if err != nil { + return NeedsUpdate, err + } + response, err := rgw.GetBucketVersioning(ctx, s3Client, aws.String(bucket.Name)) + if err != nil { + return NeedsUpdate, err + } + + external := &s3types.VersioningConfiguration{} + if response != nil { + external.Status = types.BucketVersioningStatus(response.Status) + external.MFADelete = types.MFADelete(response.MFADelete) + } + + if !cmp.Equal(external, rgw.GenerateVersioningConfiguration(bucket.Spec.ForProvider.VersioningConfiguration)) { + l.log.Info("Versioning configuration requires update on backend", consts.KeyBucketName, bucket.Name, consts.KeyBackendName, backendName) + + return NeedsUpdate, nil + } + + return Updated, nil +} + +func (l *VersioningConfigurationClient) Handle(ctx context.Context, b *v1alpha1.Bucket, backendName string, bb *bucketBackends) error { + ctx, span := otel.Tracer("").Start(ctx, "bucket.VersioningConfigurationClient.Handle") + defer span.End() + + if b.Spec.ForProvider.VersioningConfiguration == nil { + // Versioning cannot be disabled after it has been enabled. It can only be suspended. + // Therefore we do not need to perform a cleanup in the event that the user has specified + // nil (which would normally equate to disabling a subresource). + + return nil + } + + observation, err := l.observeBackend(ctx, b, backendName) + if err != nil { + err = errors.Wrap(err, errHandleVersioningConfig) + traces.SetAndRecordError(span, err) + + return err + } + + switch observation { + case Updated: + return nil + case NeedsDeletion: + // Versioning Configurations cannot be deleted, only suspended. + return nil + case NeedsUpdate: + if err := l.createOrUpdate(ctx, b, backendName); err != nil { + err = errors.Wrap(err, errHandleVersioningConfig) + unavailable := xpv1.Unavailable().WithMessage(err.Error()) + bb.setVersioningConfigCondition(b.Name, backendName, &unavailable) + + traces.SetAndRecordError(span, err) + + return err + } + available := xpv1.Available() + bb.setVersioningConfigCondition(b.Name, backendName, &available) + + // Preliminarily set the status of the VersioningConfiguration with + // the actual state retrieved from the S3 API. If we fail to perform + // the update on all backends, this Status will be removed in the + // parent Update loop when updating the overall Bucket Status. + if b.Status.AtProvider.VersioningConfiguration == nil { + vConfig, err := l.get(ctx, b, backendName) + if err != nil { + traces.SetAndRecordError(span, err) + + return err + } + b.Status.AtProvider.VersioningConfiguration = vConfig + } + } + + return nil +} + +func (l *VersioningConfigurationClient) createOrUpdate(ctx context.Context, b *v1alpha1.Bucket, backendName string) error { + l.log.Info("Updating versioniong configuration", consts.KeyBucketName, b.Name, consts.KeyBackendName, backendName) + s3Client, err := l.s3ClientHandler.GetS3Client(ctx, b, backendName) + if err != nil { + return err + } + + _, err = rgw.PutBucketVersioning(ctx, s3Client, b) + if err != nil { + return err + } + + return nil +} + +func (l *VersioningConfigurationClient) get(ctx context.Context, b *v1alpha1.Bucket, backendName string) (*v1alpha1.VersioningConfiguration, error) { + l.log.Info("Updating versioniong configuration", consts.KeyBucketName, b.Name, consts.KeyBackendName, backendName) + s3Client, err := l.s3ClientHandler.GetS3Client(ctx, b, backendName) + if err != nil { + return nil, err + } + + resp, err := rgw.GetBucketVersioning(ctx, s3Client, aws.String(b.Name)) + if err != nil { + return nil, err + } + return &v1alpha1.VersioningConfiguration{ + MFADelete: aws.String(string(resp.MFADelete)), + Status: aws.String(string(resp.Status)), + }, nil +} diff --git a/internal/rgw/versioningconfiguration.go b/internal/rgw/versioningconfiguration.go new file mode 100644 index 00000000..b80cf98d --- /dev/null +++ b/internal/rgw/versioningconfiguration.go @@ -0,0 +1,48 @@ +package rgw + +import ( + "context" + + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/crossplane/crossplane-runtime/pkg/errors" + "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" + "github.com/linode/provider-ceph/internal/backendstore" + "github.com/linode/provider-ceph/internal/otel/traces" + "go.opentelemetry.io/otel" +) + +const ( + errGetBucketVersioning = "failed to get bucket versioning" + errPutBucketVersioning = "failed to put bucket versioning" +) + +func PutBucketVersioning(ctx context.Context, s3Backend backendstore.S3Client, b *v1alpha1.Bucket) (*awss3.PutBucketVersioningOutput, error) { + ctx, span := otel.Tracer("").Start(ctx, "PutBucketVersioning") + defer span.End() + + resp, err := s3Backend.PutBucketVersioning(ctx, GeneratePutBucketVersioningInput(b.Name, b.Spec.ForProvider.VersioningConfiguration)) + if err != nil { + err := errors.Wrap(err, errPutBucketVersioning) + traces.SetAndRecordError(span, err) + + return resp, err + } + + return resp, nil +} + +func GetBucketVersioning(ctx context.Context, s3Backend backendstore.S3Client, bucketName *string) (*awss3.GetBucketVersioningOutput, error) { + ctx, span := otel.Tracer("").Start(ctx, "GetBucketVersioning") + defer span.End() + + resp, err := s3Backend.GetBucketVersioning(ctx, &awss3.GetBucketVersioningInput{Bucket: bucketName}) + if resource.IgnoreAny(err, IsBucketNotFound) != nil { + err = errors.Wrap(err, errGetBucketVersioning) + traces.SetAndRecordError(span, err) + + return resp, err + } + + return resp, nil +} diff --git a/internal/rgw/versioningconfiguration_helpers.go b/internal/rgw/versioningconfiguration_helpers.go new file mode 100644 index 00000000..d4a471d6 --- /dev/null +++ b/internal/rgw/versioningconfiguration_helpers.go @@ -0,0 +1,32 @@ +package rgw + +import ( + "github.com/aws/aws-sdk-go-v2/aws" + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/linode/provider-ceph/apis/provider-ceph/v1alpha1" +) + +// GeneratePutBucketVersioningInput creates the PutBucketVersioningInput for the AWS SDK +func GeneratePutBucketVersioningInput(name string, config *v1alpha1.VersioningConfiguration) *awss3.PutBucketVersioningInput { + return &awss3.PutBucketVersioningInput{ + Bucket: aws.String(name), + VersioningConfiguration: GenerateVersioningConfiguration(config), + } +} + +func GenerateVersioningConfiguration(inputConfig *v1alpha1.VersioningConfiguration) *types.VersioningConfiguration { + if inputConfig == nil { + return nil + } + + outputConfig := &types.VersioningConfiguration{} + if inputConfig.MFADelete != nil { + outputConfig.MFADelete = types.MFADelete(*inputConfig.MFADelete) + } + if inputConfig.Status != nil { + outputConfig.Status = types.BucketVersioningStatus(*inputConfig.Status) + } + + return outputConfig +}