Skip to content

Commit

Permalink
feat: Application dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis committed Sep 13, 2023
1 parent f33005b commit 9686dbf
Show file tree
Hide file tree
Showing 26 changed files with 8,703 additions and 815 deletions.
75 changes: 75 additions & 0 deletions assets/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -5836,6 +5836,37 @@
}
}
},
"v1alpha1ApplicationDependency": {
"type": "object",
"title": "ApplicationDependency defines",
"properties": {
"blockOnEmpty": {
"type": "boolean",
"title": "BlockOnEmpty defines whether to block sync when the list of applications determined by the selector is empty"
},
"refreshDependencies": {
"type": "boolean",
"title": "RefreshDependencies defines whether all dependencies should be refreshed before starting a sync"
},
"selectors": {
"type": "array",
"title": "Selectors defines conditions for matching application's dependencies",
"items": {
"$ref": "#/definitions/v1alpha1ApplicationSelector"
}
},
"syncDelay": {
"type": "string",
"format": "int64",
"title": "SyncDelay specifies the duration in seconds to wait before starting to sync when dependencies are defined"
},
"timeout": {
"type": "string",
"format": "int64",
"title": "Timeout defines the maximum duration in seconds to wait on dependencies before the sync fails"
}
}
},
"v1alpha1ApplicationDestination": {
"type": "object",
"title": "ApplicationDestination holds information about the application's destination",
Expand Down Expand Up @@ -5903,6 +5934,22 @@
}
}
},
"v1alpha1ApplicationSelector": {
"type": "object",
"title": "ApplicationSelector specifies which applications this Application depends on",
"properties": {
"labelSelector": {
"$ref": "#/definitions/v1LabelSelector"
},
"namePattern": {
"type": "array",
"title": "NamePattern selects applications by matching their names",
"items": {
"type": "string"
}
}
}
},
"v1alpha1ApplicationSet": {
"type": "object",
"title": "ApplicationSet is a set of Application resources\n+genclient\n+genclient:noStatus\n+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object\n+kubebuilder:resource:path=applicationsets,shortName=appset;appsets\n+kubebuilder:subresource:status",
Expand Down Expand Up @@ -6458,6 +6505,9 @@
"description": "ApplicationSpec represents desired application state. Contains link to repository with application definition and additional parameters link definition revision.",
"type": "object",
"properties": {
"dependsOn": {
"$ref": "#/definitions/v1alpha1ApplicationDependency"
},
"destination": {
"$ref": "#/definitions/v1alpha1ApplicationDestination"
},
Expand Down Expand Up @@ -7402,6 +7452,10 @@
"type": "object",
"title": "OperationState contains information about state of a running operation",
"properties": {
"blockedOnEmpty": {
"type": "boolean",
"title": "BlockedOnEmpty is true when the application is waiting for any dependency to be created"
},
"finishedAt": {
"$ref": "#/definitions/v1Time"
},
Expand All @@ -7426,6 +7480,13 @@
},
"syncResult": {
"$ref": "#/definitions/v1alpha1SyncOperationResult"
},
"waitingFor": {
"type": "array",
"title": "WaitingFor specifies a list of applications that this operation is waiting for",
"items": {
"$ref": "#/definitions/v1alpha1SyncDependency"
}
}
}
},
Expand Down Expand Up @@ -8728,6 +8789,20 @@
}
}
},
"v1alpha1SyncDependency": {
"type": "object",
"properties": {
"applicationName": {
"type": "string"
},
"applicationNamespace": {
"type": "string"
},
"refreshedAt": {
"$ref": "#/definitions/v1Time"
}
}
},
"v1alpha1SyncOperation": {
"description": "SyncOperation contains details about a sync operation.",
"type": "object",
Expand Down
2 changes: 1 addition & 1 deletion cmd/argocd/commands/admin/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func reconcileApplications(
)

appStateManager := controller.NewAppStateManager(
argoDB, appClientset, repoServerClient, namespace, kubeutil.NewKubectl(), settingsMgr, stateCache, projInformer, server, cache, time.Second, argo.NewResourceTracking(), false)
argoDB, appClientset, repoServerClient, namespace, kubeutil.NewKubectl(), settingsMgr, stateCache, projInformer, appLister, server, cache, time.Second, argo.NewResourceTracking(), false)

appsList, err := appClientset.ArgoprojV1alpha1().Applications(namespace).List(ctx, v1.ListOptions{LabelSelector: selector})
if err != nil {
Expand Down
131 changes: 125 additions & 6 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ const (
updateOperationStateTimeout = 1 * time.Second
// orphanedIndex contains application which monitor orphaned resources by namespace
orphanedIndex = "orphaned"
// refreshAfterForDependencies defines the interval for refresh while waiting for dependency application
)

var refreshAfterForDependencies time.Duration = 2 * time.Second

type CompareWith int

const (
Expand Down Expand Up @@ -216,7 +219,7 @@ func NewApplicationController(
}
}
stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterFilter, argo.NewResourceTracking())
appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth)
appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, appLister, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth)
ctrl.appInformer = appInformer
ctrl.appLister = appLister
ctrl.projInformer = projInformer
Expand Down Expand Up @@ -1132,6 +1135,42 @@ func (ctrl *ApplicationController) setAppCondition(app *appv1.Application, condi
}
}

func (ctrl *ApplicationController) refreshDependencies(app *appv1.Application, state *appv1.OperationState) {
logCtx := log.WithField("application", app.QualifiedName())
for i, dep := range state.WaitingFor {

a, err := ctrl.appLister.Applications(dep.ApplicationNamespace).Get(dep.ApplicationName)
if err != nil {
logCtx.Errorf("Could not retrieve dependency %s: %v", dep.ApplicationName, err)
continue
}

if app.Spec.GetProject() != a.Spec.GetProject() {
logCtx.Infof("Not refreshing dependency app %s because AppProject mismatches: is %s, must be %s", a.QualifiedName(), a.Spec.GetProject(), app.Spec.GetProject())
continue
}

// Either an operation has been requested or is already in progress.
// We do not request a new one.
if a.Operation != nil || (a.Status.OperationState != nil && a.Status.OperationState.Phase == synccommon.OperationRunning) {
logCtx.Debugf("Dependency %s: Operation already in progress, not going to trigger another one", dep.QualifiedName())
continue
}

if dep.RefreshedAt.IsZero() {
logCtx.Infof("Requesting refresh for app %s on behalf of dependency resolution", dep.QualifiedName())
ctrl.requestAppRefresh(dep.QualifiedName(), CompareWithLatest.Pointer(), nil)
dep.RefreshedAt = &metav1.Time{Time: time.Now()}
} else {
logCtx.Debugf("Already requested a refresh for dependency %s", dep.QualifiedName())
}

state.WaitingFor[i] = dep
}

ctrl.setOperationState(app, state)
}

func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
logCtx := log.WithField("application", app.QualifiedName())
var state *appv1.OperationState
Expand Down Expand Up @@ -1188,6 +1227,9 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
state.Message = err.Error()
} else {
ctrl.appStateManager.SyncAppState(app, state)
if state != nil && len(state.WaitingFor) > 0 {
ctrl.refreshDependencies(app, state)
}
}

// Check whether application is allowed to use project
Expand Down Expand Up @@ -1215,6 +1257,24 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
// SyncAppState will operate in a Terminating phase, allowing the worker to perform
// cleanup (e.g. delete jobs, workflows, etc...)
}

// If the operation is in progress, and we are blocked on waiting
// for dependencies, immediately refresh the application so that
// we enter a refresh cycle.
if freshApp.Status.OperationState != nil && freshApp.Status.OperationState.Phase == synccommon.OperationRunning {
refresh := false
if freshApp.Status.OperationState.BlockedOnEmpty {
logCtx.Infof("Requesting app refresh for blocked app")
refresh = true
} else if len(freshApp.Status.OperationState.WaitingFor) > 0 {
logCtx.Infof("Requesting app refresh because waiting on dependencies")
refresh = true
}
if refresh {
ctrl.appRefreshQueue.AddAfter(fmt.Sprintf("%s/%d", freshApp.QualifiedName(), ComparisonWithNothing), refreshAfterForDependencies)
ctrl.appOperationQueue.AddAfter(freshApp.QualifiedName(), refreshAfterForDependencies)
}
}
}
} else if state.Phase == synccommon.OperationFailed || state.Phase == synccommon.OperationError {
if !terminating && (state.RetryCount < state.Operation.Retry.Limit || state.Operation.Retry.Limit < 0) {
Expand Down Expand Up @@ -1277,11 +1337,20 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
logCtx.Errorf("error marshaling json: %v", err)
return
}
if app.Status.OperationState != nil && app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
return
if app.Status.OperationState != nil {
if app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
return
}
}
if len(app.Status.OperationState.WaitingFor) > 0 && len(state.WaitingFor) == 0 {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"waitingFor": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
return
}
}
}

Expand Down Expand Up @@ -1502,6 +1571,53 @@ func currentSourceEqualsSyncedSource(app *appv1.Application) bool {
return app.Spec.Source.Equals(&app.Status.Sync.ComparedTo.Source)
}

// shouldRefreshForDependency returns whether an app should be refreshed for a
// change in the state of one of its dependencies.
func (ctrl *ApplicationController) shouldRefreshForDependency(app *appv1.Application) bool {
logCtx := log.WithField("application", app.QualifiedName())

// No need to refresh when we're not waiting for dependencies to sync
if app.Spec.DependsOn == nil || !app.IsWaiting() {
logCtx.Debugf("not waiting for dependencies, skipping refresh")
return false
}

// If we're waiting for any dependency to be created, we need refresh to
// see whether they're created by now.
if app.Status.OperationState.BlockedOnEmpty {
return true
}

needRefresh := false
numWaiting := len(app.Status.OperationState.WaitingFor)
numRemoved := 0
for _, syncDep := range app.Status.OperationState.WaitingFor {
depApp, err := ctrl.appLister.Applications(syncDep.ApplicationNamespace).Get(syncDep.ApplicationName)
if err != nil {
// The application could have been deleted meanwhile, it's not any
// of the dependencies anymore.
if apierr.IsNotFound(err) {
logCtx.Infof("Depencency application %s has been removed", syncDep.QualifiedName())
numRemoved += 1
continue
} else {
logCtx.Warnf("Error getting sync dependency %s: %v", syncDep.QualifiedName(), err)
return false
}
}

if depApp.Status.Health.Status == health.HealthStatusHealthy && depApp.Status.Sync.Status == appv1.SyncStatusCodeSynced {
logCtx.Debugf("Sync dependency %s has become healthy and synced", syncDep.QualifiedName())
needRefresh = true
}
}

// We need to refresh when either one of our dependencies changed state to
// healthy, or when all dependencies we were waiting for have been
// removed meanwhile.
return needRefresh || (numWaiting > 0 && numRemoved == numWaiting)
}

// needRefreshAppStatus answers if application status needs to be refreshed.
// Returns true if application never been compared, has changed or comparison result has expired.
// Additionally, it returns whether full refresh was requested or not.
Expand Down Expand Up @@ -1548,6 +1664,9 @@ func (ctrl *ApplicationController) needRefreshAppStatus(app *appv1.Application,
} else if requested, level := ctrl.isRefreshRequested(app.QualifiedName()); requested {
compareWith = level
reason = "controller refresh requested"
} else if ctrl.shouldRefreshForDependency(app) {
compareWith = ComparisonWithNothing
reason = "refreshing for change in dependencies status"
}
}

Expand Down
Loading

0 comments on commit 9686dbf

Please sign in to comment.