Skip to content

Commit

Permalink
kube
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 28, 2024
1 parent b35f857 commit ae351a3
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 16 deletions.
2 changes: 2 additions & 0 deletions backend/provisioner/runner_scaling_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func provisionRunner(scaling scaling.RunnerScaling, client ftlv1connect.Controll
RunnerUri: endpointURI,
DeploymentKey: deployment,
}
logger.Infof("previous deployment: %v", previous)
if previous != nil && previous.GetRunner().GetOutput().GetDeploymentKey() != deployment {
logger.Infof("terminating previous deployment: %s", previous.GetRunner().GetOutput().GetDeploymentKey())
err := scaling.TerminateDeployment(ctx, module, previous.GetRunner().GetOutput().GetDeploymentKey())
if err != nil {
logger.Errorf(err, "failed to terminate previous deployment")
Expand Down
101 changes: 85 additions & 16 deletions backend/provisioner/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
)

const controllerDeploymentName = "ftl-controller"
const deploymentLabel = "ftl-deployment"
const configMapName = "ftl-controller-deployment-config"
const deploymentTemplate = "deploymentTemplate"
const serviceTemplate = "serviceTemplate"
const serviceAccountTemplate = "serviceAccountTemplate"
const moduleLabel = "ftl.dev/module"
const deploymentLabel = "ftl.dev/deployment"
const deployTimeout = time.Minute * 5

var _ scaling.RunnerScaling = &k8sScaling{}

Expand Down Expand Up @@ -76,8 +78,25 @@ func (r *k8sScaling) StartDeployment(ctx context.Context, module string, deploym
if deploymentExists {
logger.Debugf("Updating deployment %s", deploymentKey)
return r.handleExistingDeployment(ctx, deployment)

}
return r.handleNewDeployment(ctx, deploymentKey, sch)
err = r.handleNewDeployment(ctx, module, deploymentKey, sch)
if err != nil {
return err
}
err = r.waitForDeploymentReady(ctx, deploymentKey, deployTimeout)
if err != nil {
return err
}
delCtx := log.ContextWithLogger(context.Background(), logger)
go func() {
time.Sleep(time.Second * 10)
err := r.deleteOldDeployments(delCtx, module, deploymentKey)
if err != nil {
logger.Errorf(err, "Failed to delete old deployments")
}
}()
return nil
}

func (r *k8sScaling) TerminateDeployment(ctx context.Context, module string, deploymentKey string) error {
Expand Down Expand Up @@ -261,7 +280,7 @@ func (r *k8sScaling) thisContainerImage(ctx context.Context) (string, error) {

}

func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *schema.Module) error {
func (r *k8sScaling) handleNewDeployment(ctx context.Context, module string, name string, sch *schema.Module) error {
logger := log.FromContext(ctx)

cm, err := r.client.CoreV1().ConfigMaps(r.namespace).Get(ctx, configMapName, v1.GetOptions{})
Expand All @@ -288,10 +307,9 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
return fmt.Errorf("failed to decode service from configMap %s: %w", configMapName, err)
}
service.Name = name
service.Labels = addLabel(service.Labels, "app", name)
service.Labels = addLabel(service.Labels, deploymentLabel, name)
service.OwnerReferences = []v1.OwnerReference{{APIVersion: "apps/v1", Kind: "deployment", Name: controllerDeploymentName, UID: thisDeployment.UID}}
service.Spec.Selector = map[string]string{"app": name}
addLabels(&service.ObjectMeta, module, name)
service, err = servicesClient.Create(ctx, service, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service %s: %w", name, err)
Expand All @@ -315,8 +333,8 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
return fmt.Errorf("failed to decode service account from configMap %s: %w", configMapName, err)
}
serviceAccount.Name = name
serviceAccount.Labels = addLabel(serviceAccount.Labels, "app", name)
serviceAccount.OwnerReferences = []v1.OwnerReference{{APIVersion: "v1", Kind: "service", Name: name, UID: service.UID}}
addLabels(&serviceAccount.ObjectMeta, module, name)
_, err = serviceAccountClient.Create(ctx, serviceAccount, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service account%s: %w", name, err)
Expand All @@ -328,7 +346,7 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *

// Sync the istio policy if applicable
if sec, ok := r.istioSecurity.Get(); ok {
err = r.syncIstioPolicy(ctx, sec, name, service, thisDeployment)
err = r.syncIstioPolicy(ctx, sec, module, name, service, thisDeployment)
if err != nil {
return err
}
Expand Down Expand Up @@ -385,7 +403,6 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
deployment.Spec.Template.ObjectMeta.Labels = map[string]string{}
}

deployment.Spec.Template.ObjectMeta.Labels = addLabel(deployment.Spec.Template.ObjectMeta.Labels, "app", name)
deployment.Spec.Template.Spec.ServiceAccountName = name
changes, err := r.syncDeployment(ctx, thisImage, deployment, 1)

Expand All @@ -396,9 +413,9 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *

change(deployment)
}
deployment.Labels = addLabel(deployment.Labels, deploymentLabel, name)
deployment.Labels = addLabel(deployment.Labels, "app", name)

addLabels(&deployment.ObjectMeta, module, name)
addLabels(&deployment.Spec.Template.ObjectMeta, module, name)
deployment, err = deploymentClient.Create(ctx, deployment, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err)
Expand All @@ -408,12 +425,13 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
return nil
}

func addLabel(labels map[string]string, key string, value string) map[string]string {
if labels == nil {
labels = map[string]string{}
func addLabels(obj *v1.ObjectMeta, module string, deployment string) {
if obj.Labels == nil {
obj.Labels = map[string]string{}
}
labels[key] = value
return labels
obj.Labels["app"] = deployment
obj.Labels[deploymentLabel] = deployment
obj.Labels[moduleLabel] = module
}

func decodeBytesToObject(bytes []byte, deployment runtime.Object) error {
Expand Down Expand Up @@ -521,7 +539,7 @@ func (r *k8sScaling) updateEnvVar(deployment *kubeapps.Deployment, envVerName st
return changes
}

func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Clientset, name string, service *kubecore.Service, controllerDeployment *kubeapps.Deployment) error {
func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Clientset, module string, name string, service *kubecore.Service, controllerDeployment *kubeapps.Deployment) error {
logger := log.FromContext(ctx)
logger.Debugf("Creating new istio policy for %s", name)
var update func(policy *istiosec.AuthorizationPolicy) error
Expand Down Expand Up @@ -551,6 +569,7 @@ func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Client
return nil
}
}
addLabels(&policy.ObjectMeta, module, name)
policy.OwnerReferences = []v1.OwnerReference{{APIVersion: "v1", Kind: "service", Name: name, UID: service.UID}}
// At present we only allow ingress from the controller
policy.Spec.Selector = &v1beta1.WorkloadSelector{MatchLabels: map[string]string{"app": name}}
Expand All @@ -570,6 +589,56 @@ func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Client
return update(policy)
}

func (r *k8sScaling) waitForDeploymentReady(ctx context.Context, key string, timeout time.Duration) error {
logger := log.FromContext(ctx)
deploymentClient := r.client.AppsV1().Deployments(r.namespace)
watch, err := deploymentClient.Watch(ctx, v1.ListOptions{LabelSelector: deploymentLabel + "=" + key})
if err != nil {
return fmt.Errorf("failed to watch deployment %s: %w", key, err)
}
watch.ResultChan()
end := time.After(timeout)
for {
deployment, err := deploymentClient.Get(ctx, key, v1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment %s: %w", key, err)
}
if deployment.Status.ReadyReplicas == *deployment.Spec.Replicas {
logger.Debugf("Deployment %s is ready", key)
return nil
}
for _, condition := range deployment.Status.Conditions {
if condition.Type == kubeapps.DeploymentReplicaFailure && condition.Status == kubecore.ConditionTrue {
return fmt.Errorf("deployment %s is in error state: %s", deployment, condition.Message)
}
}
select {
case <-end:
return fmt.Errorf("deployment %s did not become ready in time", key)
case <-watch.ResultChan():
}
}
}

func (r *k8sScaling) deleteOldDeployments(ctx context.Context, module string, deployment string) error {
logger := log.FromContext(ctx)
deploymentClient := r.client.AppsV1().Deployments(r.namespace)
deployments, err := deploymentClient.List(ctx, v1.ListOptions{LabelSelector: moduleLabel + "=" + module})
if err != nil {
return fmt.Errorf("failed to list deployments: %w", err)
}
for _, deploy := range deployments.Items {
if deploy.Name != deployment {
logger.Debugf("Deleting old deployment %s", deploy.Name)
err = deploymentClient.Delete(ctx, deploy.Name, v1.DeleteOptions{})
if err != nil {
logger.Errorf(err, "Failed to delete deployment %s", deploy.Name)
}
}
}
return nil
}

func extractTag(image string) (string, error) {
idx := strings.LastIndex(image, ":")
if idx == -1 {
Expand Down

0 comments on commit ae351a3

Please sign in to comment.