Skip to content

Commit

Permalink
kube
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 28, 2024
1 parent a72824a commit ba315f2
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 16 deletions.
2 changes: 2 additions & 0 deletions backend/provisioner/runner_scaling_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func provisionRunner(scaling scaling.RunnerScaling, client ftlv1connect.Controll
RunnerUri: endpointURI,
DeploymentKey: deployment,
}
logger.Infof("previous deployment: %v", previous)
if previous != nil && previous.GetRunner().GetOutput().GetDeploymentKey() != deployment {
logger.Infof("terminating previous deployment: %s", previous.GetRunner().GetOutput().GetDeploymentKey())
err := scaling.TerminateDeployment(ctx, module, previous.GetRunner().GetOutput().GetDeploymentKey())
if err != nil {
logger.Errorf(err, "failed to terminate previous deployment")
Expand Down
100 changes: 84 additions & 16 deletions backend/provisioner/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
)

const controllerDeploymentName = "ftl-controller"
const deploymentLabel = "ftl-deployment"
const configMapName = "ftl-controller-deployment-config"
const deploymentTemplate = "deploymentTemplate"
const serviceTemplate = "serviceTemplate"
const serviceAccountTemplate = "serviceAccountTemplate"
const moduleLabel = "ftl.dev/module"
const deploymentLabel = "ftl.dev/deployment"
const deployTimeout = time.Minute * 5

var _ scaling.RunnerScaling = &k8sScaling{}

Expand Down Expand Up @@ -76,8 +78,24 @@ func (r *k8sScaling) StartDeployment(ctx context.Context, module string, deploym
if deploymentExists {
logger.Debugf("Updating deployment %s", deploymentKey)
return r.handleExistingDeployment(ctx, deployment)

}
err = r.handleNewDeployment(ctx, module, deploymentKey, sch)
if err != nil {
return err
}
err = r.waitForDeploymentReady(ctx, deploymentKey, deployTimeout)
if err != nil {
return err
}
return r.handleNewDeployment(ctx, deploymentKey, sch)
go func() {
time.Sleep(time.Second * 10)
err = r.deleteOldDeployments(ctx, module, deploymentKey)
if err != nil {
logger.Errorf(err, "Failed to delete old deployments")
}
}()
return nil
}

func (r *k8sScaling) TerminateDeployment(ctx context.Context, module string, deploymentKey string) error {
Expand Down Expand Up @@ -261,7 +279,7 @@ func (r *k8sScaling) thisContainerImage(ctx context.Context) (string, error) {

}

func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *schema.Module) error {
func (r *k8sScaling) handleNewDeployment(ctx context.Context, module string, name string, sch *schema.Module) error {
logger := log.FromContext(ctx)

cm, err := r.client.CoreV1().ConfigMaps(r.namespace).Get(ctx, configMapName, v1.GetOptions{})
Expand All @@ -288,10 +306,9 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
return fmt.Errorf("failed to decode service from configMap %s: %w", configMapName, err)
}
service.Name = name
service.Labels = addLabel(service.Labels, "app", name)
service.Labels = addLabel(service.Labels, deploymentLabel, name)
service.OwnerReferences = []v1.OwnerReference{{APIVersion: "apps/v1", Kind: "deployment", Name: controllerDeploymentName, UID: thisDeployment.UID}}
service.Spec.Selector = map[string]string{"app": name}
addLabels(&service.ObjectMeta, module, name)
service, err = servicesClient.Create(ctx, service, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service %s: %w", name, err)
Expand All @@ -315,8 +332,8 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
return fmt.Errorf("failed to decode service account from configMap %s: %w", configMapName, err)
}
serviceAccount.Name = name
serviceAccount.Labels = addLabel(serviceAccount.Labels, "app", name)
serviceAccount.OwnerReferences = []v1.OwnerReference{{APIVersion: "v1", Kind: "service", Name: name, UID: service.UID}}
addLabels(&serviceAccount.ObjectMeta, module, name)
_, err = serviceAccountClient.Create(ctx, serviceAccount, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service account%s: %w", name, err)
Expand All @@ -328,7 +345,7 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *

// Sync the istio policy if applicable
if sec, ok := r.istioSecurity.Get(); ok {
err = r.syncIstioPolicy(ctx, sec, name, service, thisDeployment)
err = r.syncIstioPolicy(ctx, sec, module, name, service, thisDeployment)
if err != nil {
return err
}
Expand Down Expand Up @@ -385,7 +402,6 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
deployment.Spec.Template.ObjectMeta.Labels = map[string]string{}
}

deployment.Spec.Template.ObjectMeta.Labels = addLabel(deployment.Spec.Template.ObjectMeta.Labels, "app", name)
deployment.Spec.Template.Spec.ServiceAccountName = name
changes, err := r.syncDeployment(ctx, thisImage, deployment, 1)

Expand All @@ -396,9 +412,9 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *

change(deployment)
}
deployment.Labels = addLabel(deployment.Labels, deploymentLabel, name)
deployment.Labels = addLabel(deployment.Labels, "app", name)

addLabels(&deployment.ObjectMeta, module, name)
addLabels(&deployment.Spec.Template.ObjectMeta, module, name)
deployment, err = deploymentClient.Create(ctx, deployment, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err)
Expand All @@ -408,12 +424,13 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, name string, sch *
return nil
}

func addLabel(labels map[string]string, key string, value string) map[string]string {
if labels == nil {
labels = map[string]string{}
func addLabels(obj *v1.ObjectMeta, module string, deployment string) {
if obj.Labels == nil {
obj.Labels = map[string]string{}
}
labels[key] = value
return labels
obj.Labels["app"] = deployment
obj.Labels[deploymentLabel] = deployment
obj.Labels[moduleLabel] = module
}

func decodeBytesToObject(bytes []byte, deployment runtime.Object) error {
Expand Down Expand Up @@ -521,7 +538,7 @@ func (r *k8sScaling) updateEnvVar(deployment *kubeapps.Deployment, envVerName st
return changes
}

func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Clientset, name string, service *kubecore.Service, controllerDeployment *kubeapps.Deployment) error {
func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Clientset, module string, name string, service *kubecore.Service, controllerDeployment *kubeapps.Deployment) error {
logger := log.FromContext(ctx)
logger.Debugf("Creating new istio policy for %s", name)
var update func(policy *istiosec.AuthorizationPolicy) error
Expand Down Expand Up @@ -551,6 +568,7 @@ func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Client
return nil
}
}
addLabels(&policy.ObjectMeta, module, name)
policy.OwnerReferences = []v1.OwnerReference{{APIVersion: "v1", Kind: "service", Name: name, UID: service.UID}}
// At present we only allow ingress from the controller
policy.Spec.Selector = &v1beta1.WorkloadSelector{MatchLabels: map[string]string{"app": name}}
Expand All @@ -570,6 +588,56 @@ func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Client
return update(policy)
}

func (r *k8sScaling) waitForDeploymentReady(ctx context.Context, key string, timeout time.Duration) error {
logger := log.FromContext(ctx)
deploymentClient := r.client.AppsV1().Deployments(r.namespace)
watch, err := deploymentClient.Watch(ctx, v1.ListOptions{LabelSelector: deploymentLabel + "=" + key})
if err != nil {
return err

Check failure on line 596 in backend/provisioner/scaling/k8sscaling/k8s_scaling.go

View workflow job for this annotation

GitHub Actions / Lint

error returned from interface method should be wrapped: sig: func (k8s.io/client-go/kubernetes/typed/apps/v1.DeploymentInterface).Watch(ctx context.Context, opts k8s.io/apimachinery/pkg/apis/meta/v1.ListOptions) (k8s.io/apimachinery/pkg/watch.Interface, error) (wrapcheck)
}
watch.ResultChan()
end := time.After(timeout)
for {
deployent, err := deploymentClient.Get(ctx, key, v1.GetOptions{})
if err != nil {
return err

Check failure on line 603 in backend/provisioner/scaling/k8sscaling/k8s_scaling.go

View workflow job for this annotation

GitHub Actions / Lint

error returned from interface method should be wrapped: sig: func (k8s.io/client-go/kubernetes/typed/apps/v1.DeploymentInterface).Get(ctx context.Context, name string, opts k8s.io/apimachinery/pkg/apis/meta/v1.GetOptions) (*k8s.io/api/apps/v1.Deployment, error) (wrapcheck)
}
if deployent.Status.ReadyReplicas == *deployent.Spec.Replicas {
logger.Debugf("Deployment %s is ready", key)
return nil
}
for _, condition := range deployent.Status.Conditions {
if condition.Type == kubeapps.DeploymentReplicaFailure && condition.Status == kubecore.ConditionTrue {
return fmt.Errorf("Deployment %s is in error state: %s\n", deployent, condition.Message)

Check warning on line 611 in backend/provisioner/scaling/k8sscaling/k8s_scaling.go

View workflow job for this annotation

GitHub Actions / Lint

error-strings: error strings should not be capitalized or end with punctuation or a newline (revive)
}
}
select {
case <-end:
return fmt.Errorf("deployment %s did not become ready in time", key)
case <-watch.ResultChan():
}
}
}

func (r *k8sScaling) deleteOldDeployments(ctx context.Context, module string, deployment string) error {
logger := log.FromContext(ctx)
deploymentClient := r.client.AppsV1().Deployments(r.namespace)
deployments, err := deploymentClient.List(ctx, v1.ListOptions{LabelSelector: moduleLabel + "=" + module})
if err != nil {
return err

Check failure on line 627 in backend/provisioner/scaling/k8sscaling/k8s_scaling.go

View workflow job for this annotation

GitHub Actions / Lint

error returned from interface method should be wrapped: sig: func (k8s.io/client-go/kubernetes/typed/apps/v1.DeploymentInterface).List(ctx context.Context, opts k8s.io/apimachinery/pkg/apis/meta/v1.ListOptions) (*k8s.io/api/apps/v1.DeploymentList, error) (wrapcheck)
}
for _, deploy := range deployments.Items {
if deploy.Name != deployment {
logger.Debugf("Deleting old deployment %s", deploy.Name)
err = deploymentClient.Delete(ctx, deploy.Name, v1.DeleteOptions{})
if err != nil {
logger.Errorf(err, "Failed to delete deployment %s", deploy.Name)
}
}
}
return nil
}

func extractTag(image string) (string, error) {
idx := strings.LastIndex(image, ":")
if idx == -1 {
Expand Down

0 comments on commit ba315f2

Please sign in to comment.