Skip to content

Commit

Permalink
feat: PodSpec changes now are applied only to pods on creation (#41 s…
Browse files Browse the repository at this point in the history
…tep 7) (#118)

The proxy configuration is now only applied when a pod is created. The old algorithm allowed updates to the PodSpec
to apply changes after the pod was created. This PR simplifies the logic that applies the proxy configuration to a
PodSpec, mainly removing the code that would update the pod spec multiple times.
  • Loading branch information
hessjcg authored Dec 6, 2022
1 parent cfffe91 commit df1f322
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 531 deletions.
265 changes: 12 additions & 253 deletions internal/workload/podspec_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package workload

import (
"encoding/json"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -115,7 +114,6 @@ func (e *ConfigErrorDetail) Error() string {
e.WorkloadKind.String(),
e.WorkloadNamespace,
e.WorkloadName)

}

// defaultContainerResources used when the AuthProxyWorkload resource is not specified.
Expand Down Expand Up @@ -188,7 +186,7 @@ func (u *Updater) filterMatchingInstances(pl *cloudsqlapi.AuthProxyWorkloadList,

// UpdateWorkloadContainers applies the proxy containers from all of the
// instances listed in matchingAuthProxyWorkloads to the workload
func (u *Updater) UpdateWorkloadContainers(wl Workload, matches []*cloudsqlapi.AuthProxyWorkload) (bool, error) {
func (u *Updater) UpdateWorkloadContainers(wl *PodWorkload, matches []*cloudsqlapi.AuthProxyWorkload) (bool, error) {
state := updateState{
updater: u,
nextDBPort: DefaultFirstPort,
Expand Down Expand Up @@ -238,9 +236,7 @@ func dbInst(namespace, name, connectionString string) dbInstance {
// with one or more DBInstances.
type updateState struct {
err ConfigError
oldMods workloadMods
mods workloadMods
removed []*dbInstance
nextDBPort int32
updater *Updater
}
Expand Down Expand Up @@ -379,26 +375,6 @@ func (s *updateState) addWorkloadEnvVar(p *cloudsqlapi.AuthProxyWorkload, is *cl
})
}

// loadOldEnvVarState loads the state connecting EnvVar changes done by the
// AuthProxyWorkload workload webhook from an annotation on that workload. This
// enables changes to be checked and reverted when a AuthProxyWorkload is removed.
func (s *updateState) loadOldEnvVarState(wl Workload) {
ann := wl.Object().GetAnnotations()
if ann == nil {
return
}

val, exists := ann[cloudsqlapi.AnnotationPrefix+"/state"]
if !exists {
return
}

err := json.Unmarshal([]byte(val), &s.oldMods)
if err != nil {
l.Info("unable to unmarshal old environment workload vars", "error", err)
}
}

func (s *updateState) initState(pl []*cloudsqlapi.AuthProxyWorkload) {
// Reset the mods.DBInstances to the list of pl being
// applied right now.
Expand All @@ -416,74 +392,12 @@ func (s *updateState) initState(pl []*cloudsqlapi.AuthProxyWorkload) {
}
}

// Set s.removed to all removed db instances
for _, o := range s.oldMods.DBInstances {
var found bool
for _, n := range s.mods.DBInstances {
if n.AuthProxyWorkload.Name == o.AuthProxyWorkload.Name &&
n.AuthProxyWorkload.Namespace == o.AuthProxyWorkload.Namespace &&
n.ConnectionString == o.ConnectionString {
found = true
break
}
}

if !found {
s.removed = append(s.removed, o)
}
}

for _, old := range s.oldMods.EnvVars {
for _, n := range s.mods.DBInstances {
if old.Instance == *n {
// old value relates instance that still exists
s.mods.EnvVars = append(s.mods.EnvVars, old)
break
}
}
}

for _, old := range s.oldMods.Ports {
for _, n := range s.mods.DBInstances {
if old.Instance == *n {
// old value relates instance that still exists
s.mods.Ports = append(s.mods.Ports, old)
break
}
}
}

for _, old := range s.oldMods.VolumeMounts {
for _, n := range s.mods.DBInstances {
if old.Instance == *n {
// old value relates instance that still exists
s.mods.VolumeMounts = append(s.mods.VolumeMounts, old)
break
}
}
}

}

// saveEnvVarState saves the most recent state from updated workloads
func (s *updateState) saveEnvVarState(wl Workload) {
ann := wl.Object().GetAnnotations()
if ann == nil {
ann = map[string]string{}
}
bytes, err := json.Marshal(s.mods)
if err != nil {
l.Info("unable to marshal old environment workload vars, %v", err)
return
}
ann[cloudsqlapi.AnnotationPrefix+"/state"] = string(bytes)
wl.Object().SetAnnotations(ann)
}

// update Reconciles the state of a workload, applying the matching DBInstances
// and removing any out-of-date configuration related to deleted DBInstances
func (s *updateState) update(wl Workload, matches []*cloudsqlapi.AuthProxyWorkload) (bool, error) {
s.loadOldEnvVarState(wl)
func (s *updateState) update(wl *PodWorkload, matches []*cloudsqlapi.AuthProxyWorkload) (bool, error) {

s.initState(matches)
podSpec := wl.PodSpec()
containers := podSpec.Containers
Expand All @@ -506,53 +420,14 @@ func (s *updateState) update(wl Workload, matches []*cloudsqlapi.AuthProxyWorklo
// add all new containers and update existing containers
for i := range matches {
inst := matches[i]
var instContainer *corev1.Container

for j := range containers {
container := &containers[j]
if container.Name == ContainerName(inst) {
instContainer = container
break
}
}
if instContainer == nil {
newContainer := corev1.Container{}
s.updateContainer(inst, &newContainer)
containers = append(containers, newContainer)
} else {
s.updateContainer(inst, instContainer)
}
newContainer := corev1.Container{}
s.updateContainer(inst, wl, &newContainer)
containers = append(containers, newContainer)
updated = true
}

// remove all csql containers that don't relate to one of the matches
var filteredContainers []corev1.Container
var removedContainerNames []string

for j := range containers {
container := &containers[j]
if !strings.HasPrefix(container.Name, ContainerPrefix) {
filteredContainers = append(filteredContainers, *container)
continue
}

var found bool
for i := range matches {
if ContainerName(matches[i]) == container.Name {
found = true
break
}
}
if found {
filteredContainers = append(filteredContainers, *container)
} else {
// we're removing a container that doesn't match an csqlWorkload
updated = true
removedContainerNames = append(removedContainerNames, container.Name)
}
}

podSpec.Containers = filteredContainers
podSpec.Containers = containers

for i := range podSpec.Containers {
s.updateContainerEnv(&podSpec.Containers[i])
Expand All @@ -566,22 +441,15 @@ func (s *updateState) update(wl Workload, matches []*cloudsqlapi.AuthProxyWorklo
return updated, &s.err
}

// if this workload does not have a mutable pod template, do nothing.
mwl, ok := wl.(WithMutablePodTemplate)
if !ok {
return false, nil
}

if updated {
mwl.SetPodSpec(podSpec)
s.saveEnvVarState(wl)
}
wl.SetPodSpec(podSpec)

return updated, nil
}

// updateContainer Creates or updates the proxy container in the workload's PodSpec
func (s *updateState) updateContainer(p *cloudsqlapi.AuthProxyWorkload, c *corev1.Container) {
func (s *updateState) updateContainer(p *cloudsqlapi.AuthProxyWorkload, wl Workload, c *corev1.Container) {
l.Info("Updating wl {{wl}}, no update needed.", "name", client.ObjectKeyFromObject(wl.Object()))

// if the c was fully overridden, just use that c.
if p.Spec.AuthProxyContainer != nil && p.Spec.AuthProxyContainer.Container != nil {
p.Spec.AuthProxyContainer.Container.DeepCopyInto(c)
Expand Down Expand Up @@ -765,29 +633,13 @@ func (s *updateState) applyTelemetrySpec(p *cloudsqlapi.AuthProxyWorkload, cliAr

// updateContainerEnv applies global container state to all containers
func (s *updateState) updateContainerEnv(c *corev1.Container) {
// filter and restore csql env vars
for i := 0; i < len(s.oldMods.EnvVars); i++ {
oldEnvVar := s.oldMods.EnvVars[i]
s.filterOldEnvVar(c, oldEnvVar)
}

for i := 0; i < len(s.mods.EnvVars); i++ {
var found bool
operatorEnv := s.mods.EnvVars[i].OperatorManagedValue
oldManagedEnv := s.oldManagedEnv(operatorEnv.Name)

for j := 0; j < len(c.Env); j++ {
if operatorEnv.Name == c.Env[j].Name {
found = true

if oldManagedEnv == nil {
l.Info("Override env {{env}} on container {{container}} from {{old}} to {{new}}",
"env", operatorEnv.Name,
"container", c.Name,
"old", c.Env[j].Value,
"new", operatorEnv.Value)
s.mods.EnvVars[i].OriginalValues[c.Name] = c.Env[j].Value
}
c.Env[j] = operatorEnv
}
}
Expand All @@ -798,64 +650,6 @@ func (s *updateState) updateContainerEnv(c *corev1.Container) {

}

func (s *updateState) filterOldEnvVar(c *corev1.Container, oldEnvVar *managedEnvVar) {
// Check if this env var belongs to a removed workload
var workloadRemoved bool
for j := 0; j < len(s.removed); j++ {
if *s.removed[j] == oldEnvVar.Instance {
workloadRemoved = true
}
}

if !workloadRemoved {
return
}

// Check if this env var was replaced with a new one of the same name
var newEnvVarWithSameName bool
for j := 0; j < len(s.mods.EnvVars) && !newEnvVarWithSameName; j++ {
mev := s.mods.EnvVars[j]
if mev.OperatorManagedValue.Name == oldEnvVar.OperatorManagedValue.Name &&
mev.Instance.AuthProxyWorkload != oldEnvVar.Instance.AuthProxyWorkload {
newEnvVarWithSameName = true
}
}
if newEnvVarWithSameName {
return
}

// Check if the container has an env var with this name
var containerEnv *corev1.EnvVar
var index int
for j := 0; j < len(c.Env) && containerEnv == nil; j++ {
if oldEnvVar.OperatorManagedValue.Name == c.Env[j].Name {
containerEnv = &c.Env[j]
index = j
}
}
if containerEnv == nil {
return
}

// Restore the original value or remove the env var
originalValue, hasOriginalValue := oldEnvVar.OriginalValues[c.Name]
if hasOriginalValue {
l.Info("Restored {{env}} to original value {{val}} on {{container}}.",
"env", oldEnvVar.OperatorManagedValue.Name,
"val", originalValue,
"container", c.Name)
// replace the original value
containerEnv.Value = originalValue
} else {
// remove the element from the array
l.Info("Removed {{env}} on {{container}}.",
"env", oldEnvVar.OperatorManagedValue.Name,
"container", c.Name)
c.Env = append(c.Env[0:index], c.Env[index+1:]...)
}

}

// applyContainerVolumes applies all the VolumeMounts to this container.
func (s *updateState) applyContainerVolumes(c *corev1.Container) {
nameAccessor := func(v corev1.VolumeMount) string {
Expand All @@ -882,35 +676,9 @@ func (s *updateState) applyVolumes(ps *corev1.PodSpec) {
// VolumeMount and Volume on containers.
func applyVolumeThings[T corev1.VolumeMount | corev1.Volume](
s *updateState,
items []T,
newVols []T,
nameAccessor func(T) string,
thingAccessor func(*managedVolume) T) []T {
// make a list of all removed volume mounts
var removedVolumeMounts []*managedVolume
for _, oldMount := range s.oldMods.VolumeMounts {
for _, inst := range s.removed {
if oldMount.Instance == *inst {
removedVolumeMounts = append(removedVolumeMounts, oldMount)
break
}
}
}

// remove mounts from the list of items related to removed instances
var newVols []T
for i := 0; i < len(items); i++ {
var removed bool
for _, removedMount := range removedVolumeMounts {
removedName := nameAccessor(thingAccessor(removedMount))
if nameAccessor(items[i]) == removedName {
removed = true
break
}
}
if !removed {
newVols = append(newVols, items[i])
}
}

// add or replace items for all new volume mounts
for i := 0; i < len(s.mods.VolumeMounts); i++ {
Expand Down Expand Up @@ -976,15 +744,6 @@ func (s *updateState) addError(errorCode, description string, p *cloudsqlapi.Aut
s.err.add(errorCode, description, p)
}

func (s *updateState) oldManagedEnv(name string) *managedEnvVar {
for i := 0; i < len(s.oldMods.EnvVars); i++ {
if s.oldMods.EnvVars[i].OperatorManagedValue.Name == name {
return s.oldMods.EnvVars[i]
}
}
return nil
}

func (s *updateState) applyAuthenticationSpec(proxy *cloudsqlapi.AuthProxyWorkload, _ *corev1.Container, args []string) []string {
if proxy.Spec.Authentication == nil {
return args
Expand Down
Loading

0 comments on commit df1f322

Please sign in to comment.