Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Template support for resources replication #694

Merged
merged 4 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 102 additions & 106 deletions controllers/resources/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/hashicorp/go-multierror"
"github.com/valyala/fasttemplate"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -130,148 +131,143 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant

syncErr := new(multierror.Error)

for nsIndex, item := range spec.NamespacedItems {
keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace}
// A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned
// Namespace: this must be blocked by checking it this is the case.
if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) {
log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Global", keysAndValues...)
codecFactory := serializer.NewCodecFactory(r.client.Scheme())

continue
}
// Namespaced Items are relying on selecting resources, rather than specifying a specific name:
// creating it to get used by the client List action.
itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector)
if err != nil {
log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...)
for _, ns := range namespaces.Items {
for nsIndex, item := range spec.NamespacedItems {
keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace}
// A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned
// Namespace: this must be blocked by checking it this is the case.
if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) {
log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Tenant", keysAndValues...)

continue
}
continue
}
// Namespaced Items are relying on selecting resources, rather than specifying a specific name:
// creating it to get used by the client List action.
itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector)
if selectorErr != nil {
log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...)

objs := unstructured.UnstructuredList{}
objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind)))
syncErr = multierror.Append(syncErr, selectorErr)

if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil {
log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...)
continue
}

syncErr = multierror.Append(syncErr, clientErr)
objs := unstructured.UnstructuredList{}
objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind)))

continue
}
if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil {
log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...)

multiErr := new(multierror.Group)
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
obj := o
syncErr = multierror.Append(syncErr, clientErr)

multiErr.Go(func() error {
nsItems, nsErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if nsErr != nil {
log.Error(err, "unable to sync namespacedItems", keysAndValues...)
continue
}

return nsErr
}
multiErr := new(multierror.Group)
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
obj := o
obj.SetNamespace(ns.Name)

processed.Insert(nsItems...)
multiErr.Go(func() error {
kv := keysAndValues
kv = append(kv, []interface{}{"resource", fmt.Sprintf("%s/%s", obj.GetNamespace(), obj.GetNamespace())})

return nil
})
}
if opErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); opErr != nil {
log.Error(opErr, "unable to sync namespacedItems", kv...)

if objsErr := multiErr.Wait(); objsErr != nil {
syncErr = multierror.Append(syncErr, objsErr)
}
}
return opErr
}

codecFactory := serializer.NewCodecFactory(r.client.Scheme())

for rawIndex, item := range spec.RawItems {
obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex}
log.Info("resource has been replicated", kv...)

if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil {
log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...)
replicatedItem := &capsulev1beta2.ObjectReferenceStatus{}
replicatedItem.Name = obj.GetName()
replicatedItem.Kind = obj.GetKind()
replicatedItem.Namespace = ns.Name
replicatedItem.APIVersion = obj.GetAPIVersion()

syncErr = multierror.Append(syncErr, decodeErr)
processed.Insert(replicatedItem.String())

continue
}
return nil
})
}

syncedRaw, rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if rawErr != nil {
log.Info("unable to sync rawItem", keysAndValues...)
// In case of error processing an item in one of any selected Namespaces, storing it to report it lately
// to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation.
syncErr = multierror.Append(syncErr, rawErr)
} else {
processed.Insert(syncedRaw...)
if objsErr := multiErr.Wait(); objsErr != nil {
syncErr = multierror.Append(syncErr, objsErr)
}
}
}

return processed.List(), syncErr.ErrorOrNil()
}
for rawIndex, item := range spec.RawItems {
template := string(item.Raw)

// createOrUpdate replicates the provided unstructured object to all the provided Namespaces:
// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated,
// along adding the additional metadata, if required.
func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string, namespaces corev1.NamespaceList) ([]string, error) {
log := ctrllog.FromContext(ctx)
t := fasttemplate.New(template, "{{ ", " }}")

errGroup := new(multierror.Group)
tmplString := t.ExecuteString(map[string]interface{}{
"tenant.name": tnt.Name,
"namespace": ns.Name,
})

var items []string
obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex}

for _, item := range namespaces.Items {
ns := item.GetName()
if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode([]byte(tmplString), nil, &obj); decodeErr != nil {
log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...)

errGroup.Go(func() (err error) {
actual, desired := obj.DeepCopy(), obj.DeepCopy()
// Using a deferred function to properly log the results, and adding the item to the processed set.
defer func() {
keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())}
syncErr = multierror.Append(syncErr, decodeErr)

if err != nil {
log.Error(err, "unable to replicate resource", keysAndValues...)
continue
}

return
}
obj.SetNamespace(ns.Name)

if rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); rawErr != nil {
log.Info("unable to sync rawItem", keysAndValues...)
// In case of error processing an item in one of any selected Namespaces, storing it to report it lately
// to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation.
syncErr = multierror.Append(syncErr, rawErr)
} else {
log.Info("resource has been replicated", keysAndValues...)

replicatedItem := &capsulev1beta2.ObjectReferenceStatus{
Name: obj.GetName(),
}
replicatedItem := &capsulev1beta2.ObjectReferenceStatus{}
replicatedItem.Name = obj.GetName()
replicatedItem.Kind = obj.GetKind()
replicatedItem.Namespace = ns
replicatedItem.Namespace = ns.Name
replicatedItem.APIVersion = obj.GetAPIVersion()

items = append(items, replicatedItem.String())
}()
processed.Insert(replicatedItem.String())
}
}
}

actual.SetNamespace(ns)
return processed.List(), syncErr.ErrorOrNil()
}

_, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error {
UID := actual.GetUID()
rv := actual.GetResourceVersion()
// createOrUpdate replicates the provided unstructured object to all the provided Namespaces:
// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated,
// along adding the additional metadata, if required.
func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string) (err error) {
actual, desired := &unstructured.Unstructured{}, obj.DeepCopy()

actual.SetUnstructuredContent(desired.Object)
actual.SetNamespace(ns)
actual.SetLabels(labels)
actual.SetAnnotations(annotations)
actual.SetResourceVersion(rv)
actual.SetUID(UID)
actual.SetAPIVersion(desired.GetAPIVersion())
actual.SetKind(desired.GetKind())
actual.SetNamespace(desired.GetNamespace())
actual.SetName(desired.GetName())

return nil
})
_, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error {
UID := actual.GetUID()
rv := actual.GetResourceVersion()

return
})
}
// Wait returns *multierror.Error that implements stdlib error:
// the nil check must be performed down here rather than at the caller level to avoid wrong casting.
if err := errGroup.Wait(); err != nil {
return items, err
}
actual.SetUnstructuredContent(desired.Object)
actual.SetLabels(labels)
actual.SetAnnotations(annotations)
actual.SetResourceVersion(rv)
actual.SetUID(UID)

return nil
})

return items, nil
return err
}
22 changes: 22 additions & 0 deletions e2e/tenantresource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ var _ = Describe("Creating a TenantResource object", func() {
Name: "raw-secret-1",
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
"{{ tenant.name }}": []byte("Cg=="),
"{{ namespace }}": []byte("Cg=="),
},
},
},
},
Expand All @@ -118,6 +122,10 @@ var _ = Describe("Creating a TenantResource object", func() {
Name: "raw-secret-2",
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
"{{ tenant.name }}": []byte("Cg=="),
"{{ namespace }}": []byte("Cg=="),
},
},
},
},
Expand All @@ -132,6 +140,10 @@ var _ = Describe("Creating a TenantResource object", func() {
Name: "raw-secret-3",
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
"{{ tenant.name }}": []byte("Cg=="),
"{{ namespace }}": []byte("Cg=="),
},
},
},
},
Expand Down Expand Up @@ -220,6 +232,16 @@ var _ = Describe("Creating a TenantResource object", func() {
return secrets.Items
}, defaultTimeoutInterval, defaultPollInterval).Should(HaveLen(4))
})

By(fmt.Sprintf("ensuring raw items are templated in %s Namespace", ns), func() {
for _, name := range []string{"raw-secret-1", "raw-secret-2", "raw-secret-3"} {
secret := corev1.Secret{}
Expect(k8sClient.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, &secret)).ToNot(HaveOccurred())

Expect(secret.Data).To(HaveKey(solar.Name))
Expect(secret.Data).To(HaveKey(ns))
}
})
}

By("using a Namespace selector", func() {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down