Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update application CRD types, implement application manager draft #3

Merged
merged 1 commit into from
Feb 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ server-image:
docker build -t $(IMAGE_PREFIX)argocd-server:$(IMAGE_TAG) -f Dockerfile-argocd .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)argocd-server:$(IMAGE_TAG) ; fi

.PHONY: controller
controller:
go build -v -i -ldflags '${LDFLAGS}' -o ${DIST_DIR}/argocd-application-controller ./cmd/argocd-application-controller

.PHONY: lint
lint:
# CGO_ENABLED=0 required due to: # https://github.com/alecthomas/gometalinter/issues/149#issuecomment-351272924
Expand Down
94 changes: 94 additions & 0 deletions application/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package application

import (
"context"

"io/ioutil"

"fmt"

"time"

"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util/git"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Manager is responsible to retrieve application spec and compare it to actual application state.
type Manager struct {
gitClient git.Client
repoService repository.RepositoryServiceServer
statusRefreshTimeout time.Duration
}

// NeedRefreshAppStatus answers if application status needs to be refreshed. Returns true if application never been compared, has changed or comparison result has expired.
func (m *Manager) NeedRefreshAppStatus(app *v1alpha1.Application) bool {
return app.Status.ComparisonResult.Status == v1alpha1.ComparisonStatusUnknown ||
!app.Spec.Source.Equals(app.Status.ComparisonResult.ComparedTo) ||
app.Status.ComparisonResult.ComparedAt.Add(m.statusRefreshTimeout).Before(time.Now())
}

// RefreshAppStatus compares application against actual state in target cluster and returns updated status.
func (m *Manager) RefreshAppStatus(app *v1alpha1.Application) *v1alpha1.ApplicationStatus {
status, err := m.tryRefreshAppStatus(app)
if err != nil {
status = &v1alpha1.ApplicationStatus{
ComparisonResult: v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusError,
ComparisonErrorDetails: fmt.Sprintf("Failed to get application status for application '%s': %+v", app.Name, err),
ComparedTo: app.Spec.Source,
ComparedAt: metav1.Time{Time: time.Now().UTC()},
},
}
}
return status
}

func (m *Manager) tryRefreshAppStatus(app *v1alpha1.Application) (*v1alpha1.ApplicationStatus, error) {
repo, err := m.repoService.Get(context.Background(), &repository.RepoQuery{Repo: app.Spec.Source.RepoURL})
if err != nil {
return nil, err
}

appRepoPath, err := ioutil.TempDir("", app.Name)
if err != nil {
return nil, fmt.Errorf("unable to create temp repository directory for app '%s'", app.Name)
}

err = m.gitClient.CloneOrFetch(repo.Repo, repo.Username, repo.Password, appRepoPath)
if err != nil {
return nil, err
}

err = m.gitClient.Checkout(appRepoPath, app.Spec.Source.TargetRevision)
if err != nil {
return nil, err
}
comparisonResult, err := m.compareAppState(appRepoPath, app)
if err != nil {
return nil, err
}
return &v1alpha1.ApplicationStatus{
ComparisonResult: *comparisonResult,
}, nil
}

func (m *Manager) compareAppState(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
// TODO (amatyushentsev): Implement actual comparison
return &v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusEqual,
ComparedTo: app.Spec.Source,
ComparedAt: metav1.Time{Time: time.Now().UTC()},
}, nil
}

// NewAppManager creates new instance of app.Manager
func NewAppManager(gitClient git.Client, repoService repository.RepositoryServiceServer, statusRefreshTimeout time.Duration) *Manager {
return &Manager{
gitClient: gitClient,
repoService: repoService,
statusRefreshTimeout: statusRefreshTimeout,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ package main
import (
"context"
"fmt"
"github.com/argoproj/argo-cd/application/controller"
"os"
"time"

"github.com/argoproj/argo-cd/application"
"github.com/argoproj/argo-cd/cmd/argocd/commands"
"github.com/argoproj/argo-cd/controller"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
"github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util/git"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"os"
)

const (
Expand All @@ -25,13 +30,23 @@ func newCommand() *cobra.Command {
var command = cobra.Command{
Use: cliName,
Short: "application-controller is a controller to operate on applications CRD",
Run: func(c *cobra.Command, args []string) {
RunE: func(c *cobra.Command, args []string) error {
kubeConfig := commands.GetKubeConfig(kubeConfigPath, kubeConfigOverrides)

nativeGitClient, err := git.NewNativeGitClient()
if err != nil {
return err
}
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
appClient := appclientset.NewForConfigOrDie(kubeConfig)

appController := controller.NewApplicationController(kubeClient, appClient)
// TODO (amatyushentsev): Use config map to store controller configuration
namespace := "default"
appResyncPeriod := time.Minute * 10

appManager := application.NewAppManager(nativeGitClient, repository.NewServer(namespace, kubeClient, appClient), appResyncPeriod)

appController := controller.NewApplicationController(kubeClient, appClient, appManager, appResyncPeriod)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
66 changes: 51 additions & 15 deletions application/controller/controller.go → controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,44 @@ package controller
import (
"context"

appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
log "github.com/sirupsen/logrus"

"time"

"github.com/argoproj/argo-cd/application"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const (
appResyncPeriod = 10 * time.Minute
)

// ApplicationController is the controller for application resources.
type ApplicationController struct {
kubeclientset kubernetes.Interface
applicationclientset appclientset.Interface
appQueue workqueue.RateLimitingInterface
appManager *application.Manager

appInformer cache.SharedIndexInformer
kubeClientset kubernetes.Interface
applicationClientset appclientset.Interface
appQueue workqueue.RateLimitingInterface
appInformer cache.SharedIndexInformer
}

// NewApplicationController creates new instance of ApplicationController.
func NewApplicationController(kubeclientset kubernetes.Interface, applicationclientset appclientset.Interface) *ApplicationController {
func NewApplicationController(
kubeClientset kubernetes.Interface,
applicationClientset appclientset.Interface,
appManager *application.Manager,
appResyncPeriod time.Duration) *ApplicationController {
appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return &ApplicationController{
kubeclientset: kubeclientset,
applicationclientset: applicationclientset,
appManager: appManager,
kubeClientset: kubeClientset,
applicationClientset: applicationClientset,
appQueue: appQueue,
appInformer: newApplicationInformer(applicationclientset, appQueue),
appInformer: newApplicationInformer(applicationClientset, appQueue, appResyncPeriod),
}
}

Expand All @@ -61,10 +65,33 @@ func (ctrl *ApplicationController) Run(ctx context.Context, appWorkers int) {

func (ctrl *ApplicationController) processNextItem() bool {
appKey, shutdown := ctrl.appQueue.Get()
defer ctrl.appQueue.Done(appKey)
if shutdown {
return false
}

defer ctrl.appQueue.Done(appKey)

obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey.(string))
if err != nil {
log.Errorf("Failed to get application '%s' from informer index: %+v", appKey, err)
return true
}
if !exists {
// This happens after app was deleted, but the work queue still had an entry for it.
return true
}
app, ok := obj.(*appv1.Application)
if !ok {
log.Warnf("Key '%s' in index is not an application", appKey)
return true
}

updatedApp := app.DeepCopy()
if ctrl.appManager.NeedRefreshAppStatus(updatedApp) {
updatedApp.Status = *ctrl.appManager.RefreshAppStatus(updatedApp)
ctrl.persistApp(updatedApp)
}

return true
}

Expand All @@ -73,9 +100,18 @@ func (ctrl *ApplicationController) runWorker() {
}
}

func newApplicationInformer(appclientset appclientset.Interface, appQueue workqueue.RateLimitingInterface) cache.SharedIndexInformer {
func (ctrl *ApplicationController) persistApp(app *appv1.Application) {
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace)
_, err := appClient.Update(app)
if err != nil {
log.Warnf("Error updating application: %v", err)
}
log.Info("Application update successful")
}

func newApplicationInformer(appClientset appclientset.Interface, appQueue workqueue.RateLimitingInterface, appResyncPeriod time.Duration) cache.SharedIndexInformer {
appInformerFactory := appinformers.NewSharedInformerFactory(
appclientset,
appClientset,
appResyncPeriod,
)
informer := appInformerFactory.Argoproj().V1alpha1().Applications().Informer()
Expand Down
Loading