Skip to content

Commit

Permalink
Merge pull request #366 from 0xff-dev/main
Browse files Browse the repository at this point in the history
feat: use workpool to reduce operator memory usage
  • Loading branch information
bjwswang authored Oct 10, 2023
2 parents bd57b1f + c9e97a3 commit 17133d6
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 73 deletions.
1 change: 1 addition & 0 deletions api/v1alpha1/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
ValuesConfigMapLabel = Group + "/component-name"
ValuesConfigMapKey = "values.yaml"
ImagesConfigMapKey = "images"
READMEConfigMapKey = "readme"
)

// ComponentVersionDiff When the version of a component changes,
Expand Down
2 changes: 1 addition & 1 deletion config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ spec:
resources:
limits:
cpu: 5000m
memory: 2048Mi
memory: 1536Mi
requests:
cpu: 10m
memory: 64Mi
Expand Down
91 changes: 22 additions & 69 deletions controllers/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,20 @@ package controllers
import (
"context"
"fmt"
"os"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/tools/record"
"k8s.io/utils/env"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -49,8 +45,9 @@ import (
// ComponentReconciler reconciles a Component object
type ComponentReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
Scheme *runtime.Scheme
Recorder record.EventRecorder
ChartWork helm.ChartWorker
}

//+kubebuilder:rbac:groups=core.kubebb.k8s.com.cn,resources=components,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -185,37 +182,14 @@ func (r *ComponentReconciler) UpdateValuesConfigmap(ctx context.Context, logger
BearerToken: &cfg.BearerToken,
Namespace: &component.Namespace,
}
g := new(errgroup.Group)
workers, err := env.GetInt("OCI_PULL_WORKER", 5) // Increase this number will download faster, but also more likely to trigger '429 Too Many Requests' error.
if err != nil {
workers = 5
}
g.SetLimit(workers)
limit := make(chan struct{}, 5)
for _, version := range component.Status.Versions {
versionStr := version.Version // https://golang.org/doc/faq#closures_and_goroutines
httpDonwloadURLs := version.URLs
g.Go(func() error {
cm := &v1.ConfigMap{}
cm.Name = corev1alpha1.GetComponentChartValuesConfigmapName(component.Name, versionStr)
cm.Namespace = component.Namespace
err := r.Client.Get(ctx, client.ObjectKeyFromObject(cm), cm)
createCm := false
if err != nil {
if !errors.IsNotFound(err) {
return err
}
createCm = true
}

_, ok1 := cm.Data[corev1alpha1.ValuesConfigMapKey]
_, ok2 := cm.Data[corev1alpha1.ImagesConfigMapKey]
if ok1 && ok2 {
return nil
}

if cm.Data == nil {
cm.Data = make(map[string]string)
}
go func(versionStr string, httpDonwloadURLs []string) {
limit <- struct{}{}
defer func() {
<-limit
}()
cmName := corev1alpha1.GetComponentChartValuesConfigmapName(component.Name, versionStr)
var pullURL string
u := strings.TrimSuffix(repo.Spec.URL, "/")
if repo.IsOCI() {
Expand All @@ -227,7 +201,7 @@ func (r *ComponentReconciler) UpdateValuesConfigmap(ctx context.Context, logger
} else {
if len(httpDonwloadURLs) == 0 {
logger.Error(fmt.Errorf("not found %s's urls", component.Status.Name), "")
return nil
return
}
if strings.HasPrefix(httpDonwloadURLs[0], "http") {
pullURL = httpDonwloadURLs[0]
Expand All @@ -241,39 +215,18 @@ func (r *ComponentReconciler) UpdateValuesConfigmap(ctx context.Context, logger
h, err := helm.NewCoreHelmWrapper(getter, component.Namespace, logger, r.Client, nil, repo, component)
if err != nil {
logger.Error(err, "failed to NewCoreHelmWrapper")
return nil
}
_, dir, entryName, err := h.Pull(ctx, pullURL, versionStr)
if err != nil {
logger.Error(err, "failed to Pull")
return nil
}
defer os.Remove(dir)
b, err := os.ReadFile(dir + "/" + entryName + "/values.yaml")
if err != nil {
logger.Error(err, "failed to read values.yaml")
return nil
}
cm.Data[corev1alpha1.ValuesConfigMapKey] = string(b)
rel, err := h.Template(ctx, versionStr, dir+"/"+entryName)
if err != nil {
logger.Error(err, "failed to Template")
return nil
}
_, images, err := corev1alpha1.GetResourcesAndImages(ctx, logger, r.Client, rel.Manifest, component.Namespace)
if err != nil {
logger.Error(err, "failed to GetResourcesAndImages")
return nil
}
cm.Data[corev1alpha1.ImagesConfigMapKey] = strings.Join(images, ",")
if err := controllerutil.SetOwnerReference(component, cm, r.Scheme); err != nil {
return err
return
}
if createCm {
return r.Client.Create(ctx, cm)
def := helm.ChartDef{
URL: pullURL,
Version: versionStr,
ConfigMapName: cmName,
Component: component,
H: h,
Scheme: r.Scheme,
}
return r.Update(ctx, cm)
})
r.ChartWork.Push(def)
}(version.Version, version.URLs)
}
return g.Wait()
return nil
}
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ func main() {
os.Exit(1)
}
if err = (&controllers.ComponentReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("component-reconcile"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("component-reconcile"),
ChartWork: helm.NewChartWorker(helm.WithContext(ctx), helm.WithK8sClient(mgr.GetClient())),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Component")
os.Exit(1)
Expand Down
Loading

0 comments on commit 17133d6

Please sign in to comment.