-
Notifications
You must be signed in to change notification settings - Fork 383
/
Copy pathshutdown_manager.go
228 lines (192 loc) · 7.05 KB
/
shutdown_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.
package envoy
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"golang.org/x/sys/unix"
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/xds/bootstrap"
)
var logger = logging.DefaultLogger(egv1a1.LogLevelInfo).WithName("shutdown-manager")
const (
// ShutdownManagerPort is the port Envoy shutdown manager will listen on.
ShutdownManagerPort = 19002
// ShutdownManagerHealthCheckPath is the path used for health checks.
ShutdownManagerHealthCheckPath = "/healthz"
// ShutdownManagerReadyPath is the path used to indicate shutdown readiness.
ShutdownManagerReadyPath = "/shutdown/ready"
// ShutdownReadyFile is the file used to indicate shutdown readiness.
ShutdownReadyFile = "/tmp/shutdown-ready"
)
// ShutdownManager serves shutdown manager process for Envoy proxies.
func ShutdownManager(readyTimeout time.Duration) error {
// Setup HTTP handler
handler := http.NewServeMux()
handler.HandleFunc(ShutdownManagerHealthCheckPath, func(_ http.ResponseWriter, _ *http.Request) {})
handler.HandleFunc(ShutdownManagerReadyPath, func(w http.ResponseWriter, _ *http.Request) {
shutdownReadyHandler(w, readyTimeout, ShutdownReadyFile)
})
// Setup HTTP server
srv := http.Server{
Handler: handler,
Addr: fmt.Sprintf(":%d", ShutdownManagerPort),
ReadTimeout: 5 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}
// Setup signal handling
c := make(chan struct{})
go func() {
s := make(chan os.Signal, 1)
signal.Notify(s, os.Interrupt, syscall.SIGTERM)
r := <-s
logger.Info(fmt.Sprintf("received %s", unix.SignalName(r.(syscall.Signal))))
// Shutdown HTTP server without interrupting active connections
if err := srv.Shutdown(context.Background()); err != nil {
logger.Error(err, "server shutdown error")
}
close(c)
}()
// Start HTTP server
logger.Info("starting shutdown manager")
if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
logger.Error(err, "starting shutdown manager failed")
}
// Wait until done
<-c
return nil
}
// shutdownReadyHandler handles the endpoint used by a preStop hook on the Envoy
// container to block until ready to terminate. After the graceful drain process
// has completed a file will be written to indicate shutdown readiness.
func shutdownReadyHandler(w http.ResponseWriter, readyTimeout time.Duration, readyFile string) {
startTime := time.Now()
logger.Info("received shutdown ready request")
// Poll for shutdown readiness
for {
// Check if ready timeout is exceeded
elapsedTime := time.Since(startTime)
if elapsedTime > readyTimeout {
logger.Info("shutdown readiness timeout exceeded")
w.WriteHeader(http.StatusInternalServerError)
return
}
_, err := os.Stat(readyFile)
switch {
case os.IsNotExist(err):
time.Sleep(1 * time.Second)
case err != nil:
logger.Error(err, "error checking for shutdown readiness")
default:
logger.Info("shutdown readiness detected")
return
}
}
}
// Shutdown is called from a preStop hook on the shutdown-manager container where
// it will initiate a graceful drain sequence on the Envoy proxy and block until
// connections are drained or a timeout is exceeded.
func Shutdown(drainTimeout time.Duration, minDrainDuration time.Duration, exitAtConnections int) error {
startTime := time.Now()
allowedToExit := false
// Reconfigure logger to write to stdout of main process if running in Kubernetes
if _, k8s := os.LookupEnv("KUBERNETES_SERVICE_HOST"); k8s && os.Getpid() != 1 {
logger = logging.FileLogger("/proc/1/fd/1", "shutdown-manager", egv1a1.LogLevelInfo)
}
logger.Info(fmt.Sprintf("initiating graceful drain with %.0f second minimum drain period and %.0f second timeout",
minDrainDuration.Seconds(), drainTimeout.Seconds()))
// Start failing active health checks
if err := postEnvoyAdminAPI("healthcheck/fail"); err != nil {
logger.Error(err, "error failing active health checks")
}
// Initiate graceful drain sequence
if err := postEnvoyAdminAPI("drain_listeners?graceful&skip_exit"); err != nil {
logger.Error(err, "error initiating graceful drain")
}
// Poll total connections from Envoy admin API until minimum drain period has
// been reached and total connections reaches threshold or timeout is exceeded
for {
elapsedTime := time.Since(startTime)
conn, err := getTotalConnections()
if err != nil {
logger.Error(err, "error getting total connections")
}
if elapsedTime > minDrainDuration && !allowedToExit {
logger.Info(fmt.Sprintf("minimum drain period reached; will exit when total connections reaches %d", exitAtConnections))
allowedToExit = true
}
if elapsedTime > drainTimeout {
logger.Info("graceful drain sequence timeout exceeded")
break
} else if allowedToExit && conn != nil && *conn <= exitAtConnections {
logger.Info("graceful drain sequence completed")
break
}
time.Sleep(1 * time.Second)
}
// Signal to shutdownReadyHandler that drain process is complete
if _, err := os.Create(ShutdownReadyFile); err != nil {
logger.Error(err, "error creating shutdown ready file")
return err
}
return nil
}
// postEnvoyAdminAPI sends a POST request to the Envoy admin API
func postEnvoyAdminAPI(path string) error {
if resp, err := http.Post(fmt.Sprintf("http://%s:%d/%s",
bootstrap.EnvoyAdminAddress, bootstrap.EnvoyAdminPort, path), "application/json", nil); err != nil {
return err
} else {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response status: %s", resp.Status)
}
return nil
}
}
// getTotalConnections retrieves the total number of open connections from Envoy's server.total_connections stat
func getTotalConnections() (*int, error) {
// Send request to Envoy admin API to retrieve server.total_connections stat
if resp, err := http.Get(fmt.Sprintf("http://%s:%d//stats?filter=^server\\.total_connections$&format=json",
bootstrap.EnvoyAdminAddress, bootstrap.EnvoyAdminPort)); err != nil {
return nil, err
} else {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response status: %s", resp.Status)
} else {
// Define struct to decode JSON response into; expecting a single stat in the response in the format:
// {"stats":[{"name":"server.total_connections","value":123}]}
var r *struct {
Stats []struct {
Name string
Value int
}
}
// Decode JSON response into struct
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, err
}
// Defensive check for empty stats
if len(r.Stats) == 0 {
return nil, fmt.Errorf("no stats found")
}
// Log and return total connections
c := r.Stats[0].Value
logger.Info(fmt.Sprintf("total connections: %d", c))
return &c, nil
}
}
}