From 1a7d50b7fbbcc752c2ce244fce6a244f8bdf7a86 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Tue, 5 Dec 2023 16:47:59 -0800 Subject: [PATCH] [3.5] backport health check e2e tests. Signed-off-by: Siyuan Zhang --- pkg/expect/expect.go | 4 + server/etcdserver/server.go | 1 + server/mvcc/backend/batch_tx.go | 1 + tests/e2e/etcdctl.go | 35 +++ tests/e2e/http_health_check_test.go | 440 ++++++++++++++++++++++++++++ tests/framework/e2e/cluster.go | 60 +++- tests/framework/e2e/etcd_process.go | 196 ++++++++++++- tests/framework/e2e/util.go | 21 ++ 8 files changed, 743 insertions(+), 15 deletions(-) create mode 100644 tests/e2e/http_health_check_test.go diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 3c4018780733..95bc30823f29 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -203,3 +203,7 @@ func (ep *ExpectProcess) Lines() []string { defer ep.mu.Unlock() return ep.lines } + +func (ep *ExpectProcess) IsRunning() bool { + return ep.cmd != nil +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 5f62ddc26523..82576a5f87a1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2152,6 +2152,7 @@ func (s *EtcdServer) apply( zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: + // gofail: var beforeApplyOneEntryNormal struct{} s.applyEntryNormal(&e) s.setAppliedIndex(e.Index) s.setTerm(e.Term) diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index 9c025d79e1e7..84dbd8dc0e34 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -307,6 +307,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered { func (t *batchTxBuffered) Unlock() { if t.pending != 0 { t.backend.readTx.Lock() // blocks txReadBuffer for writing. + // gofail: var beforeWritebackBuf struct{} t.buf.writeback(&t.backend.readTx.buf) t.backend.readTx.Unlock() if t.pending >= t.backend.batchLimit { diff --git a/tests/e2e/etcdctl.go b/tests/e2e/etcdctl.go index 1955d3f9aa11..ff6514aa3eab 100644 --- a/tests/e2e/etcdctl.go +++ b/tests/e2e/etcdctl.go @@ -55,6 +55,15 @@ func (ctl *Etcdctl) Put(key, value string) error { return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK") } +func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error { + if ctl.v2 { + panic("Unsupported method for v2") + } + args := ctl.cmdArgs() + args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value) + return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK") +} + func (ctl *Etcdctl) Set(key, value string) error { if !ctl.v2 { panic("Unsupported method for v3") @@ -72,6 +81,32 @@ func (ctl *Etcdctl) Set(key, value string) error { return nil } +func (ctl *Etcdctl) AuthEnable() error { + args := ctl.cmdArgs("auth", "enable") + return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled") +} + +func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) { + var resp clientv3.AuthUserGrantRoleResponse + err := ctl.spawnJsonCmd(&resp, "user", "grant-role", user, role) + return &resp, err +} + +func (ctl *Etcdctl) UserAdd(name, password string) (*clientv3.AuthUserAddResponse, error) { + args := []string{"user", "add"} + if password == "" { + args = append(args, name) + args = append(args, "--no-password") + } else { + args = append(args, fmt.Sprintf("%s:%s", name, password)) + } + args = append(args, "--interactive=false") + + var resp clientv3.AuthUserAddResponse + err := ctl.spawnJsonCmd(&resp, args...) + return &resp, err +} + func (ctl *Etcdctl) AlarmList() (*clientv3.AlarmResponse, error) { if ctl.v2 { panic("Unsupported method for v2") diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go new file mode 100644 index 000000000000..89c13d2b0449 --- /dev/null +++ b/tests/e2e/http_health_check_test.go @@ -0,0 +1,440 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +const ( + healthCheckTimeout = 2 * time.Second + putCommandTimeout = 200 * time.Millisecond +) + +type healthCheckConfig struct { + url string + expectedStatusCode int + expectedTimeoutError bool + expectedRespSubStrings []string +} + +type injectFailure func(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) + +func TestHTTPHealthHandler(t *testing.T) { + e2e.BeforeTest(t) + client := &http.Client{} + tcs := []struct { + name string + injectFailure injectFailure + clusterConfig e2e.EtcdProcessClusterConfig + healthChecks []healthCheckConfig + }{ + { + name: "no failures", // happy case + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1}, + healthChecks: []healthCheckConfig{ + { + url: "/health", + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "activated no space alarm", + injectFailure: triggerNoSpaceAlarm, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, QuotaBackendBytes: int64(13 * os.Getpagesize())}, + healthChecks: []healthCheckConfig{ + { + url: "/health", + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + url: "/health?exclude=NOSPACE", + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "overloaded server slow apply", + injectFailure: triggerSlowApply, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, GoFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + }, + }, + }, + { + name: "network partitioned", + injectFailure: blackhole, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, IsPeerTLS: true, PeerProxy: true}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + // old leader may return "etcdserver: leader changed" error with 503 in ReadIndex leaderChangedNotifier + }, + }, + }, + { + name: "raft loop deadlock", + injectFailure: triggerRaftLoopDeadLock, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + // current kubeadm etcd liveness check failed to detect raft loop deadlock in steady state + // ref. https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/phases/etcd/local.go#L225-L226 + // current liveness probe depends on the etcd /health check has a flaw that new /livez check should resolve. + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + }, + }, + }, + // verify that auth enabled serializable read must go through mvcc + { + name: "slow buffer write back with auth enabled", + injectFailure: triggerSlowBufferWriteBackWithAuth, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedTimeoutError: true, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + clus, err := e2e.NewEtcdProcessCluster(t, &tc.clusterConfig) + require.NoError(t, err) + defer clus.Close() + e2e.ExecuteUntil(ctx, t, func() { + if tc.injectFailure != nil { + // guaranteed that failure point is active until all the health checks timeout. + duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout + tc.injectFailure(ctx, t, clus, duration) + } + + for _, hc := range tc.healthChecks { + requestURL := clus.Procs[0].EndpointsHTTP()[0] + hc.url + t.Logf("health check URL is %s", requestURL) + doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings) + } + }) + }) + } +} + +var ( + defaultHealthCheckConfigs = []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`ok`}, + }, + { + url: "/readyz", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`ok`}, + }, + { + url: "/livez?verbose=true", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`[+]serializable_read ok`}, + }, + { + url: "/readyz?verbose=true", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{ + `[+]serializable_read ok`, + `[+]data_corruption ok`, + }, + }, + } +) + +func TestHTTPLivezReadyzHandler(t *testing.T) { + e2e.BeforeTest(t) + client := &http.Client{} + tcs := []struct { + name string + injectFailure injectFailure + clusterConfig e2e.EtcdProcessClusterConfig + healthChecks []healthCheckConfig + }{ + { + name: "no failures", // happy case + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1}, + healthChecks: defaultHealthCheckConfigs, + }, + { + name: "activated no space alarm", + injectFailure: triggerNoSpaceAlarm, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, QuotaBackendBytes: int64(13 * os.Getpagesize())}, + healthChecks: defaultHealthCheckConfigs, + }, + // Readiness is not an indicator of performance. Slow response is not covered by readiness. + // refer to https://tinyurl.com/livez-readyz-design-doc or https://github.com/etcd-io/etcd/issues/16007#issuecomment-1726541091 in case tinyurl is down. + { + name: "overloaded server slow apply", + injectFailure: triggerSlowApply, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, GoFailEnabled: true}, + // TODO expected behavior of readyz check should be 200 after ReadIndex check is implemented to replace linearizable read. + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + }, + }, + }, + { + name: "network partitioned", + injectFailure: blackhole, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, IsPeerTLS: true, PeerProxy: true}, + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + expectedRespSubStrings: []string{ + `[-]linearizable_read failed: etcdserver: leader changed`, + }, + }, + }, + }, + { + name: "raft loop deadlock", + injectFailure: triggerRaftLoopDeadLock, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true}, + // TODO expected behavior of livez check should be 503 or timeout after RaftLoopDeadLock check is implemented. + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedStatusCode: http.StatusOK, + }, + { + url: "/readyz", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + }, + }, + }, + // verify that auth enabled serializable read must go through mvcc + { + name: "slow buffer write back with auth enabled", + injectFailure: triggerSlowBufferWriteBackWithAuth, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 1, GoFailEnabled: true}, + healthChecks: []healthCheckConfig{ + { + url: "/livez", + expectedTimeoutError: true, + }, + { + url: "/readyz", + expectedTimeoutError: true, + }, + }, + }, + { + name: "corrupt", + injectFailure: triggerCorrupt, + clusterConfig: e2e.EtcdProcessClusterConfig{ClusterSize: 3, CorruptCheckTime: time.Second}, + healthChecks: []healthCheckConfig{ + { + url: "/livez?verbose=true", + expectedStatusCode: http.StatusOK, + expectedRespSubStrings: []string{`[+]serializable_read ok`}, + }, + { + url: "/readyz", + expectedStatusCode: http.StatusServiceUnavailable, + expectedRespSubStrings: []string{ + `[+]serializable_read ok`, + `[-]data_corruption failed: alarm activated: CORRUPT`, + }, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + clus, err := e2e.NewEtcdProcessCluster(t, &tc.clusterConfig) + require.NoError(t, err) + defer clus.Close() + e2e.ExecuteUntil(ctx, t, func() { + if tc.injectFailure != nil { + // guaranteed that failure point is active until all the health checks timeout. + duration := time.Duration(len(tc.healthChecks)+10) * healthCheckTimeout + tc.injectFailure(ctx, t, clus, duration) + } + + for _, hc := range tc.healthChecks { + requestURL := clus.Procs[0].EndpointsHTTP()[0] + hc.url + t.Logf("health check URL is %s", requestURL) + doHealthCheckAndVerify(t, client, requestURL, hc.expectedTimeoutError, hc.expectedStatusCode, hc.expectedRespSubStrings) + } + }) + }) + } +} + +func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expectTimeoutError bool, expectStatusCode int, expectRespSubStrings []string) { + ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + require.NoErrorf(t, err, "failed to creat request %+v", err) + resp, herr := client.Do(req) + cancel() + if expectTimeoutError { + if herr != nil && strings.Contains(herr.Error(), context.DeadlineExceeded.Error()) { + return + } + } + require.NoErrorf(t, herr, "failed to get response %+v", err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoErrorf(t, err, "failed to read response %+v", err) + + t.Logf("health check response body is:\n%s", body) + require.Equal(t, expectStatusCode, resp.StatusCode) + for _, expectRespSubString := range expectRespSubStrings { + require.Contains(t, string(body), expectRespSubString) + } +} + +func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { + buf := strings.Repeat("b", os.Getpagesize()) + etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + for { + if err := etcdctl.Put("foo", buf); err != nil { + if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") { + t.Fatal(err) + } + break + } + } +} + +func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { + // the following proposal will be blocked at applying stage + // because when apply index < committed index, linearizable read would time out. + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration))) + etcdctl := NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl.Put("foo", "bar") +} + +func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { + member := clus.Procs[0] + proxy := member.PeerProxy() + t.Logf("Blackholing traffic from and to member %q", member.Config().Name) + proxy.BlackholeTx() + proxy.BlackholeRx() +} + +func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration))) + etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + etcdctl.Put("foo", "bar") +} + +func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { + etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + + _, err := etcdctl.UserAdd("root", "root") + require.NoError(t, err) + _, err = etcdctl.UserGrantRole("root", "root") + require.NoError(t, err) + require.NoError(t, etcdctl.AuthEnable()) + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeWritebackBuf", fmt.Sprintf(`sleep("%s")`, duration))) + etcdctl.PutWithAuth("foo", "bar", "root", "root") +} + +func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { + etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false) + for i := 0; i < 10; i++ { + require.NoError(t, etcdctl.Put("foo", "bar")) + } + err := clus.Procs[0].Stop() + require.NoError(t, err) + err = testutil.CorruptBBolt(path.Join(clus.Procs[0].Config().DataDirPath, "member", "snap", "db")) + require.NoError(t, err) + err = clus.Procs[0].Start() + for { + time.Sleep(time.Second) + select { + case <-ctx.Done(): + require.NoError(t, err) + default: + } + response, err := etcdctl.AlarmList() + if err != nil { + continue + } + if len(response.Alarms) == 0 { + continue + } + require.Len(t, response.Alarms, 1) + if response.Alarms[0].Alarm == etcdserverpb.AlarmType_CORRUPT { + break + } + } +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index f1cd942dd8c9..649c9f7051e1 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "go.etcd.io/etcd/pkg/v3/proxy" "go.etcd.io/etcd/server/v3/etcdserver" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -136,10 +137,12 @@ type EtcdProcessCluster struct { } type EtcdProcessClusterConfig struct { - ExecPath string - DataDirPath string - KeepDataDir bool - EnvVars map[string]string + ExecPath string + DataDirPath string + KeepDataDir bool + GoFailEnabled bool + PeerProxy bool + EnvVars map[string]string ClusterSize int @@ -189,7 +192,7 @@ func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdPr return nil, err } - return StartEtcdProcessCluster(epc, cfg) + return StartEtcdProcessCluster(t, epc, cfg) } // InitEtcdProcessCluster initializes a new cluster based on the given config. @@ -217,7 +220,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP } // StartEtcdProcessCluster launches a new cluster from etcd processes. -func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { +func StartEtcdProcessCluster(t testing.TB, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { if cfg.RollingStart { if err := epc.RollingStart(); err != nil { return nil, fmt.Errorf("Cannot rolling-start: %v", err) @@ -227,6 +230,13 @@ func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterCon return nil, fmt.Errorf("Cannot start: %v", err) } } + + for _, proc := range epc.Procs { + if cfg.GoFailEnabled && !proc.Failpoints().Enabled() { + epc.Close() + t.Skip("please run 'make gofail-enable && make build' before running the test") + } + } return epc, nil } @@ -268,6 +278,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* var curl string port := cfg.BasePort + 5*i clientPort := port + peerPort := port + 1 + peer2Port := port + 3 clientHttpPort := port + 4 if cfg.ClientTLS == ClientTLSAndNonTLS { @@ -278,20 +290,34 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* curls = []string{curl} } - purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} + purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + var proxyCfg *proxy.ServerConfig + if cfg.PeerProxy { + if !cfg.IsPeerTLS { + panic("Can't use peer proxy without peer TLS as it can result in malformed packets") + } + peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port) + proxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: purl, + From: peerAdvertiseUrl, + } + } + name := fmt.Sprintf("test-%d", i) dataDirPath := cfg.DataDirPath if cfg.DataDirPath == "" { dataDirPath = tb.TempDir() } - initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) + initialCluster[i] = fmt.Sprintf("%s=%s", name, peerAdvertiseUrl.String()) args := []string{ "--name", name, "--listen-client-urls", strings.Join(curls, ","), "--advertise-client-urls", strings.Join(curls, ","), "--listen-peer-urls", purl.String(), - "--initial-advertise-peer-urls", purl.String(), + "--initial-advertise-peer-urls", peerAdvertiseUrl.String(), "--initial-cluster-token", cfg.InitialToken, "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount), @@ -362,20 +388,32 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) } + envVars := map[string]string{} + for key, value := range cfg.EnvVars { + envVars[key] = value + } + var gofailPort int + if cfg.GoFailEnabled { + gofailPort = (i+1)*10000 + 2381 + envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath, Args: args, - EnvVars: cfg.EnvVars, + EnvVars: envVars, TlsArgs: cfg.TlsArgs(), DataDirPath: dataDirPath, KeepDataDir: cfg.KeepDataDir, Name: name, - Purl: purl, + Purl: peerAdvertiseUrl, Acurl: curl, Murl: murl, InitialToken: cfg.InitialToken, ClientHttpUrl: clientHttpUrl, + GoFailPort: gofailPort, + Proxy: proxyCfg, } } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 959342d10db9..c3eaa188d6ab 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -15,12 +15,20 @@ package e2e import ( + "bytes" + "context" + "errors" "fmt" + "io" + "net/http" "net/url" "os" + "strings" + "time" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/pkg/v3/proxy" "go.uber.org/zap" ) @@ -46,6 +54,10 @@ type EtcdProcess interface { WithStopSignal(sig os.Signal) os.Signal Config() *EtcdServerProcessConfig Logs() LogsExpect + + PeerProxy() proxy.Server + Failpoints() *BinaryFailpoints + IsRunning() bool } type LogsExpect interface { @@ -55,9 +67,11 @@ type LogsExpect interface { } type EtcdServerProcess struct { - cfg *EtcdServerProcessConfig - proc *expect.ExpectProcess - donec chan struct{} // closed when Interact() terminates + cfg *EtcdServerProcessConfig + proc *expect.ExpectProcess + proxy proxy.Server + failpoints *BinaryFailpoints + donec chan struct{} // closed when Interact() terminates } type EtcdServerProcessConfig struct { @@ -80,6 +94,8 @@ type EtcdServerProcessConfig struct { InitialToken string InitialCluster string + GoFailPort int + Proxy *proxy.ServerConfig } func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { @@ -91,7 +107,11 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err return nil, err } } - return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil + ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})} + if cfg.GoFailPort != 0 { + ep.failpoints = &BinaryFailpoints{member: ep} + } + return ep, nil } func (ep *EtcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() } @@ -109,6 +129,15 @@ func (ep *EtcdServerProcess) Start() error { if ep.proc != nil { panic("already started") } + if ep.cfg.Proxy != nil && ep.proxy == nil { + ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String())) + ep.proxy = proxy.NewServer(*ep.cfg.Proxy) + select { + case <-ep.proxy.Ready(): + case err := <-ep.proxy.Error(): + return err + } + } ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars) if err != nil { @@ -154,6 +183,14 @@ func (ep *EtcdServerProcess) Stop() (err error) { } } ep.cfg.lg.Info("stopped server.", zap.String("name", ep.cfg.Name)) + if ep.proxy != nil { + ep.cfg.lg.Info("stopping proxy...", zap.String("name", ep.cfg.Name)) + err = ep.proxy.Close() + ep.proxy = nil + if err != nil { + return err + } + } return nil } @@ -188,3 +225,154 @@ func (ep *EtcdServerProcess) Logs() LogsExpect { } return ep.proc } + +func (ep *EtcdServerProcess) PeerProxy() proxy.Server { + return ep.proxy +} + +func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints { + return ep.failpoints +} + +func (ep *EtcdServerProcess) IsRunning() bool { + if ep.proc == nil { + return false + } + + if ep.proc.IsRunning() { + return true + } + + ep.cfg.lg.Info("server exited", + zap.String("name", ep.cfg.Name)) + ep.proc = nil + return false +} + +type BinaryFailpoints struct { + member EtcdProcess + availableCache map[string]string +} + +func (f *BinaryFailpoints) SetupEnv(failpoint, payload string) error { + if f.member.IsRunning() { + return errors.New("cannot setup environment variable while process is running") + } + f.member.Config().EnvVars["GOFAIL_FAILPOINTS"] = fmt.Sprintf("%s=%s", failpoint, payload) + return nil +} + +func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload string) error { + host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: host, + Path: failpoint, + } + r, err := http.NewRequestWithContext(ctx, "PUT", failpointUrl.String(), bytes.NewBuffer([]byte(payload))) + if err != nil { + return err + } + resp, err := httpClient.Do(r) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %d", resp.StatusCode) + } + return nil +} + +func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) error { + host := fmt.Sprintf("127.0.0.1:%d", f.member.Config().GoFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: host, + Path: failpoint, + } + r, err := http.NewRequestWithContext(ctx, "DELETE", failpointUrl.String(), nil) + if err != nil { + return err + } + resp, err := httpClient.Do(r) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %d", resp.StatusCode) + } + return nil +} + +var httpClient = http.Client{ + Timeout: 1 * time.Second, +} + +func (f *BinaryFailpoints) Enabled() bool { + _, err := failpoints(f.member) + if err != nil { + return false + } + return true +} + +func (f *BinaryFailpoints) Available(failpoint string) bool { + if f.availableCache == nil { + fs, err := failpoints(f.member) + if err != nil { + panic(err) + } + f.availableCache = fs + } + _, found := f.availableCache[failpoint] + return found +} + +func failpoints(member EtcdProcess) (map[string]string, error) { + body, err := fetchFailpointsBody(member) + if err != nil { + return nil, err + } + defer body.Close() + return parseFailpointsBody(body) +} + +func fetchFailpointsBody(member EtcdProcess) (io.ReadCloser, error) { + address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort) + failpointUrl := url.URL{ + Scheme: "http", + Host: address, + } + resp, err := http.Get(failpointUrl.String()) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("invalid status code, %d", resp.StatusCode) + } + return resp.Body, nil +} + +func parseFailpointsBody(body io.Reader) (map[string]string, error) { + data, err := io.ReadAll(body) + if err != nil { + return nil, err + } + lines := strings.Split(string(data), "\n") + failpoints := map[string]string{} + for _, line := range lines { + // Format: + // failpoint=value + parts := strings.SplitN(line, "=", 2) + failpoint := parts[0] + var value string + if len(parts) == 2 { + value = parts[1] + } + failpoints[failpoint] = value + } + return failpoints, nil +} diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index f62fda5fbf55..c9ffccbb2249 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -15,6 +15,7 @@ package e2e import ( + "context" "encoding/json" "fmt" "math/rand" @@ -126,3 +127,23 @@ func ToTLS(s string) string { func SkipInShortMode(t testing.TB) { testutil.SkipTestIfShortMode(t, "e2e tests are not running in --short mode") } + +func ExecuteUntil(ctx context.Context, t *testing.T, f func()) { + deadline, deadlineSet := ctx.Deadline() + timeout := time.Until(deadline) + donec := make(chan struct{}) + go func() { + defer close(donec) + f() + }() + + select { + case <-ctx.Done(): + msg := ctx.Err().Error() + if deadlineSet { + msg = fmt.Sprintf("test timed out after %v, err: %v", timeout, msg) + } + testutil.FatalStack(t, msg) + case <-donec: + } +}