Skip to content

Commit

Permalink
feat: Configure proxy container for graceful termination. (#425)
Browse files Browse the repository at this point in the history
Add the following configuration to the workload pods so that the proxy can gracefully exit
when the main container is done.

Configure the proxy container to exit 0 when it is terminated. Send a SIGTERM to the proxy container. We want
the proxy to exit with code 0, indicating a clean termination. Without this change the proxy container would
exit with exit code 140 (meaning terminated), which would cause kubernetes to report the
pod as "exited in an error state."

Configure a workload lifecycle handler so that kubernetes calls GET /quitquitquit before terminating the proxy 
container. This should give the proxy container the chance to exit gracefully before kubernetes sends a SIGTERM
to the proxy process.

Always enable the /quitquitquit proxy api.

Always set the CSQL_QUIT_URLS environment variable to a space-separated list of proxy quitquitquit urls. 
This way, when the main workload container is ready to exit, it can on workload pods. When a job or cronjob's 
main process is done, that container can iterate over

```
echo Starting job

# execute the job process
run_job

# Tell proxy containers to shut down gracefully
for url in $CSQL_QUIT_URLS ; do 
   wget --post-data '' $url
done
```
Fixes #361
  • Loading branch information
hessjcg authored Sep 19, 2023
1 parent dc06ceb commit 0e0bb40
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 15 deletions.
17 changes: 17 additions & 0 deletions internal/testhelpers/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ func BuildJob(name types.NamespacedName, appLabel string) *batchv1.Job {
},
}
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
podCmd := fmt.Sprintf("echo Container 1 is Running \n"+
"sleep %d \n"+
"for url in $CSQL_QUIT_URLS ; do \n"+
" wget --post-data '' $url \n"+
"done", 30)
job.Spec.Template.Spec.Containers[0].Command = []string{"sh", "-c", podCmd}

return job
}

Expand Down Expand Up @@ -322,6 +329,12 @@ func BuildCronJob(name types.NamespacedName, appLabel string) *batchv1.CronJob {
},
}
job.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
podCmd := fmt.Sprintf("echo Container 1 is Running \n"+
"sleep %d \n"+
"for url in $CSQL_QUIT_URLS ; do \n"+
" wget --post-data '' $url \n"+
"done", 30)
job.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"sh", "-c", podCmd}
return job

}
Expand Down Expand Up @@ -694,6 +707,10 @@ func (cc *TestCaseClient) ConfigureResources(proxy *cloudsqlapi.AuthProxyWorkloa
corev1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalExponent),
},
},
AdminServer: &cloudsqlapi.AdminServerSpec{
Port: 9092,
EnableAPIs: []string{"QuitQuitQuit"},
},
}
}

Expand Down
60 changes: 52 additions & 8 deletions internal/workload/podspec_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ const (
// and kubernetes health checks.
DefaultHealthCheckPort int32 = 9801

// DefaultAdminPort is the used by the proxy to expose prometheus
// and kubernetes health checks.
DefaultAdminPort int32 = 9802
// DefaultAdminPort is the used by the proxy to expose the quitquitquit
// and debug api endpoints
DefaultAdminPort int32 = 9091
)

var l = logf.Log.WithName("internal.workload")
Expand Down Expand Up @@ -308,6 +308,7 @@ type workloadMods struct {
EnvVars []*managedEnvVar `json:"envVars"`
VolumeMounts []*managedVolume `json:"volumeMounts"`
Ports []*managedPort `json:"ports"`
AdminPorts []int32 `json:"adminPorts"`
}

func (s *updateState) addWorkloadPort(p int32) {
Expand Down Expand Up @@ -394,6 +395,24 @@ func (s *updateState) useInstancePort(p *cloudsqlapi.AuthProxyWorkload, is *clou
return port
}

func (s *updateState) addAdminPort(p int32) {
s.mods.AdminPorts = append(s.mods.AdminPorts, p)
}

func (s *updateState) addQuitEnvVar() {
urls := make([]string, len(s.mods.AdminPorts))
for i := 0; i < len(s.mods.AdminPorts); i++ {
urls[i] = fmt.Sprintf("http://localhost:%d/quitquitquit", s.mods.AdminPorts[i])
}
v := strings.Join(urls, " ")

s.addEnvVar(nil, managedEnvVar{
OperatorManagedValue: corev1.EnvVar{
Name: "CSQL_QUIT_URLS",
Value: v,
}})
}

func (s *updateState) addPort(p int32, instance proxyInstanceID) {
var mp *managedPort

Expand Down Expand Up @@ -528,6 +547,8 @@ func (s *updateState) update(wl *PodWorkload, matches []*cloudsqlapi.AuthProxyWo
k, v := s.updater.PodAnnotation(inst)
ann[k] = v
}
// Add the envvar containing the proxy quit urls to the workloads
s.addQuitEnvVar()

podSpec.Containers = containers

Expand Down Expand Up @@ -778,8 +799,9 @@ func (s *updateState) updateContainerEnv(c *corev1.Container) {
}

// addHealthCheck adds the health check declaration to this workload.
func (s *updateState) addHealthCheck(p *cloudsqlapi.AuthProxyWorkload, c *corev1.Container) {
func (s *updateState) addHealthCheck(p *cloudsqlapi.AuthProxyWorkload, c *corev1.Container) int32 {
var portPtr *int32
var adminPortPtr *int32

cs := p.Spec.AuthProxyContainer

Expand Down Expand Up @@ -815,6 +837,32 @@ func (s *updateState) addHealthCheck(p *cloudsqlapi.AuthProxyWorkload, c *corev1
s.addProxyContainerEnvVar(p, "CSQL_PROXY_HTTP_PORT", fmt.Sprintf("%d", port))
s.addProxyContainerEnvVar(p, "CSQL_PROXY_HTTP_ADDRESS", "0.0.0.0")
s.addProxyContainerEnvVar(p, "CSQL_PROXY_HEALTH_CHECK", "true")
// For graceful exits as a sidecar, the proxy should exit with exit code 0
// when it receives a SIGTERM.
s.addProxyContainerEnvVar(p, "CSQL_PROXY_EXIT_ZERO_ON_SIGTERM", "true")

// Also the operator will enable the /quitquitquit endpoint for graceful exit.
// If the AdminServer.Port is set, use it, otherwise use the default
// admin port.
if cs != nil && cs.AdminServer != nil && cs.AdminServer.Port != 0 {
adminPortPtr = &cs.AdminServer.Port
}
adminPort := s.usePort(adminPortPtr, DefaultAdminPort, p)
s.addAdminPort(adminPort)
s.addProxyContainerEnvVar(p, "CSQL_PROXY_QUITQUITQUIT", "true")
s.addProxyContainerEnvVar(p, "CSQL_PROXY_ADMIN_PORT", fmt.Sprintf("%d", adminPort))

// Configure the pre-stop hook for /quitquitquit
c.Lifecycle = &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.IntOrString{IntVal: adminPort},
Path: "/quitquitquit",
Host: "localhost",
},
},
}
return adminPort
}

func (s *updateState) addAdminServer(p *cloudsqlapi.AuthProxyWorkload) {
Expand All @@ -824,14 +872,10 @@ func (s *updateState) addAdminServer(p *cloudsqlapi.AuthProxyWorkload) {
}

cs := p.Spec.AuthProxyContainer.AdminServer
s.addProxyPort(cs.Port, p)
s.addProxyContainerEnvVar(p, "CSQL_PROXY_ADMIN_PORT", fmt.Sprintf("%d", cs.Port))
for _, name := range cs.EnableAPIs {
switch name {
case "Debug":
s.addProxyContainerEnvVar(p, "CSQL_PROXY_DEBUG", "true")
case "QuitQuitQuit":
s.addProxyContainerEnvVar(p, "CSQL_PROXY_QUITQUITQUIT", "true")
}
}

Expand Down
106 changes: 99 additions & 7 deletions internal/workload/podspec_updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -535,11 +536,14 @@ func TestProxyCLIArgs(t *testing.T) {
}},
},
wantWorkloadEnv: map[string]string{
"CSQL_PROXY_STRUCTURED_LOGS": "true",
"CSQL_PROXY_HEALTH_CHECK": "true",
"CSQL_PROXY_HTTP_PORT": fmt.Sprintf("%d", workload.DefaultHealthCheckPort),
"CSQL_PROXY_HTTP_ADDRESS": "0.0.0.0",
"CSQL_PROXY_USER_AGENT": "cloud-sql-proxy-operator/dev",
"CSQL_PROXY_STRUCTURED_LOGS": "true",
"CSQL_PROXY_HEALTH_CHECK": "true",
"CSQL_PROXY_QUITQUITQUIT": "true",
"CSQL_PROXY_EXIT_ZERO_ON_SIGTERM": "true",
"CSQL_PROXY_HTTP_PORT": fmt.Sprintf("%d", workload.DefaultHealthCheckPort),
"CSQL_PROXY_HTTP_ADDRESS": "0.0.0.0",
"CSQL_PROXY_USER_AGENT": "cloud-sql-proxy-operator/dev",
"CSQL_PROXY_ADMIN_PORT": fmt.Sprintf("%d", workload.DefaultAdminPort),
},
},
{
Expand Down Expand Up @@ -691,7 +695,7 @@ func TestProxyCLIArgs(t *testing.T) {
},
},
{
desc: "No admin port enabled when AdminServerSpec is nil",
desc: "Default admin port enabled when AdminServerSpec is nil",
proxySpec: cloudsqlapi.AuthProxyWorkloadSpec{
AuthProxyContainer: &cloudsqlapi.AuthProxyContainerSpec{},
Instances: []cloudsqlapi.InstanceSpec{{
Expand All @@ -704,8 +708,9 @@ func TestProxyCLIArgs(t *testing.T) {
},
wantWorkloadEnv: map[string]string{
"CSQL_PROXY_HEALTH_CHECK": "true",
"CSQL_PROXY_ADMIN_PORT": fmt.Sprintf("%d", workload.DefaultAdminPort),
},
dontWantEnvSet: []string{"CSQL_PROXY_DEBUG", "CSQL_PROXY_ADMIN_PORT"},
dontWantEnvSet: []string{"CSQL_PROXY_DEBUG"},
},
{
desc: "port conflict with other instance causes error",
Expand Down Expand Up @@ -911,6 +916,93 @@ func TestPodTemplateAnnotations(t *testing.T) {

}

func TestQuitURLEnvVar(t *testing.T) {

var (
u = workload.NewUpdater("cloud-sql-proxy-operator/dev", workload.DefaultProxyImage)
)

// Create a pod
wl := podWorkload()
wl.Pod.Spec.Containers[0].Ports =
[]corev1.ContainerPort{{Name: "http", ContainerPort: 8080}}

// Create a AuthProxyWorkload that matches the deployment
csqls := []*cloudsqlapi.AuthProxyWorkload{
simpleAuthProxy("instance1", "project:server:db"),
simpleAuthProxy("instance2", "project:server2:db2"),
simpleAuthProxy("instance3", "project:server3:db3")}

csqls[0].ObjectMeta.Generation = 1
csqls[1].ObjectMeta.Generation = 2
csqls[2].ObjectMeta.Generation = 3

var wantQuitURLSEnv = strings.Join(
[]string{
fmt.Sprintf("http://localhost:%d/quitquitquit", workload.DefaultAdminPort),
fmt.Sprintf("http://localhost:%d/quitquitquit", workload.DefaultAdminPort+1),
fmt.Sprintf("http://localhost:%d/quitquitquit", workload.DefaultAdminPort+2),
},
" ",
)

// update the containers
err := configureProxies(u, wl, csqls)
if err != nil {
t.Fatal(err)
}

// test that envvar was set
ev, err := findEnvVar(wl, "busybox", "CSQL_QUIT_URLS")
if err != nil {
t.Fatal("can't find env var", err)
}
if ev.Value != wantQuitURLSEnv {
t.Fatal("got", ev.Value, "want", wantQuitURLSEnv)
}
}

func TestPreStopHook(t *testing.T) {

var u = workload.NewUpdater("cloud-sql-proxy-operator/dev", workload.DefaultProxyImage)

// Create a pod
wl := podWorkload()
wl.Pod.Spec.Containers[0].Ports =
[]corev1.ContainerPort{{Name: "http", ContainerPort: 8080}}

// Create a AuthProxyWorkload that matches the deployment
csqls := []*cloudsqlapi.AuthProxyWorkload{
simpleAuthProxy("instance1", "project:server:db")}

csqls[0].ObjectMeta.Generation = 1

// update the containers
err := configureProxies(u, wl, csqls)
if err != nil {
t.Fatal(err)
}

// test that prestop hook was set
c, err := findContainer(wl, workload.ContainerName(csqls[0]))
if err != nil {
t.Fatal("can't find proxy container", err)
}
if c.Lifecycle.PreStop == nil || c.Lifecycle.PreStop.HTTPGet == nil {
t.Fatal("got nil, want lifecycle.preStop.HTTPGet")
}
get := c.Lifecycle.PreStop.HTTPGet
if get.Port.IntVal != workload.DefaultAdminPort {
t.Error("got", get.Port, "want", workload.DefaultAdminPort)
}
if get.Path != "/quitquitquit" {
t.Error("got", get.Path, "want", "/quitquitquit")
}
if get.Host != "localhost" {
t.Error("got", get.Host, "want", "localhost")
}
}

func TestPodAnnotation(t *testing.T) {
now := metav1.Now()
server := &cloudsqlapi.AuthProxyWorkload{ObjectMeta: metav1.ObjectMeta{Name: "instance1", Generation: 1}}
Expand Down

0 comments on commit 0e0bb40

Please sign in to comment.