Skip to content

Commit

Permalink
Merge pull request #2264 from FabianKramm/fix-host-event
Browse files Browse the repository at this point in the history
refactor: update object cache correctly
  • Loading branch information
FabianKramm authored Nov 5, 2024
2 parents 4d966a7 + a6fb204 commit e41811d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 29 deletions.
14 changes: 0 additions & 14 deletions pkg/patcher/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func CreateVirtualObject(ctx *synccontext.SyncContext, pObj, vObj client.Object,
namespaceName = pObj.GetNamespace() + "/" + pObj.GetName()
}

if ctx.ObjectCache != nil {
ctx.ObjectCache.Host().Put(pObj.DeepCopyObject().(client.Object))
}
err = ApplyObject(ctx, nil, vObj, synccontext.SyncHostToVirtual, hasStatus)
if err != nil {
ctx.Log.Infof("error syncing %s %s to virtual cluster: %v", gvk.Kind, namespaceName, err)
Expand All @@ -57,9 +54,6 @@ func CreateHostObject(ctx *synccontext.SyncContext, vObj, pObj client.Object, ev
namespaceName = vObj.GetNamespace() + "/" + vObj.GetName()
}

if ctx.ObjectCache != nil {
ctx.ObjectCache.Virtual().Put(vObj.DeepCopyObject().(client.Object))
}
err = ApplyObject(ctx, nil, pObj, synccontext.SyncVirtualToHost, hasStatus)
if err != nil {
ctx.Log.Infof("error syncing %s %s to host cluster: %v", gvk.Kind, namespaceName, err)
Expand Down Expand Up @@ -176,14 +170,6 @@ func ApplyObject(ctx *synccontext.SyncContext, beforeObject, afterObject client.

func ApplyObjectPatch(ctx *synccontext.SyncContext, objPatch patch.Patch, obj client.Object, direction synccontext.SyncDirection, hasStatus bool) error {
if objPatch.IsEmpty() {
if ctx.ObjectCache != nil {
if direction == synccontext.SyncHostToVirtual {
ctx.ObjectCache.Virtual().Put(obj)
} else if direction == synccontext.SyncVirtualToHost {
ctx.ObjectCache.Host().Put(obj)
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/filters/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func callAdmissionWebhooks(req *http.Request, info *request.RequestInfo, paramet
err := validatingAdmission.Validate(req.Context(), admission.NewAttributesRecord(opts, nil, kind, info.Namespace, info.Name, corev1.SchemeGroupVersion.WithResource(info.Resource), info.Subresource, admission.Connect, nil, false, userInfo), NewFakeObjectInterfaces(uncachedVirtualClient.Scheme(), uncachedVirtualClient.RESTMapper()))
if err != nil {
klog.Infof("Admission validate failed for %s: %v", info.Path, err)
return err
return kerrors.NewForbidden(corev1.SchemeGroupVersion.WithResource(info.Resource+"/"+info.Subresource).GroupResource(), info.Name, err)
}

klog.V(1).Info("Allowed pod request", "subresource", info.Subresource, "path", info.Path)
Expand Down
61 changes: 47 additions & 14 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,15 @@ func (r *SyncController) newSyncContext(ctx context.Context, logName string) *sy

func (r *SyncController) Reconcile(ctx context.Context, vReq reconcile.Request) (res ctrl.Result, retErr error) {
// extract request
r.hostNameRequestLookupLock.Lock()
pReq, ok := r.hostNameRequestLookup[vReq]
pReq, ok := r.getHostRequest(vReq)
if ok {
delete(r.hostNameRequestLookup, vReq)

// put this into the cache again if we requeue
defer func() {
if res.Requeue || res.RequeueAfter > 0 || retErr != nil {
r.hostNameRequestLookupLock.Lock()
_, ok := r.hostNameRequestLookup[vReq]
if !ok {
r.hostNameRequestLookup[vReq] = pReq
}
r.hostNameRequestLookupLock.Unlock()
r.setHostRequest(vReq, pReq)
}
}()
}
r.hostNameRequestLookupLock.Unlock()

// create sync context
syncContext := r.newSyncContext(ctx, vReq.Name)
Expand All @@ -158,6 +149,11 @@ func (r *SyncController) Reconcile(ctx context.Context, vReq reconcile.Request)
if err != nil {
return ctrl.Result{}, err
}
defer func() {
if !res.Requeue && res.RequeueAfter == 0 && retErr == nil {
r.updateObjectCache(vObjOld, vObj, pObjOld, pObj)
}
}()

// check if the resource version is correct
if pObjOld != nil && pObj != nil && newerResourceVersion(pObjOld, pObj) {
Expand Down Expand Up @@ -417,9 +413,7 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
pReq := reconcile.Request{NamespacedName: client.ObjectKeyFromObject(obj)}
vReq := reconcile.Request{NamespacedName: r.syncer.HostToVirtual(syncContext, pReq.NamespacedName, obj)}
if vReq.Name != "" {
r.hostNameRequestLookupLock.Lock()
r.hostNameRequestLookup[vReq] = pReq
r.hostNameRequestLookupLock.Unlock()
r.setHostRequest(vReq, pReq)
q.Add(vReq)
}
}
Expand Down Expand Up @@ -460,6 +454,45 @@ func (r *SyncController) Register(ctx *synccontext.RegisterContext) error {
return err
}

func (r *SyncController) updateObjectCache(vObjOld, vObj, pObjOld, pObj client.Object) {
if r.objectCache == nil {
return
}

if vObjOld != nil && vObj != nil && newerResourceVersion(vObj, vObjOld) {
newVObjOld, ok := r.objectCache.Virtual().Get(client.ObjectKeyFromObject(vObj))
if ok && newVObjOld.GetResourceVersion() == vObjOld.GetResourceVersion() {
r.objectCache.Virtual().Put(vObj)
}
}

if pObjOld != nil && pObj != nil && newerResourceVersion(pObj, pObjOld) {
newPObjOld, ok := r.objectCache.Host().Get(client.ObjectKeyFromObject(pObj))
if ok && newPObjOld.GetResourceVersion() == pObjOld.GetResourceVersion() {
r.objectCache.Host().Put(pObj)
}
}
}

func (r *SyncController) setHostRequest(vReq, pReq reconcile.Request) {
r.hostNameRequestLookupLock.Lock()
defer r.hostNameRequestLookupLock.Unlock()

r.hostNameRequestLookup[vReq] = pReq
}

func (r *SyncController) getHostRequest(vReq reconcile.Request) (reconcile.Request, bool) {
r.hostNameRequestLookupLock.Lock()
defer r.hostNameRequestLookupLock.Unlock()

pReq, ok := r.hostNameRequestLookup[vReq]
if ok {
delete(r.hostNameRequestLookup, vReq)
}

return pReq, ok
}

func newerResourceVersion(oldObject, newObject client.Object) bool {
oldResourceVersion, _ := strconv.Atoi(oldObject.GetResourceVersion())
newResourceVersion, _ := strconv.Atoi(newObject.GetResourceVersion())
Expand Down

0 comments on commit e41811d

Please sign in to comment.