diff --git a/.github/workflows/linearizability.yaml b/.github/workflows/linearizability.yaml new file mode 100644 index 00000000000..1a1a2feafc9 --- /dev/null +++ b/.github/workflows/linearizability.yaml @@ -0,0 +1,17 @@ +name: Linearizability +on: [push, pull_request] +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: "1.19.1" + - run: | + mkdir -p /tmp/linearizability + EXPECT_DEBUG=true GO_TEST_FLAGS=-v RESULTS_DIR=/tmp/linearizability make test-linearizability + - uses: actions/upload-artifact@v2 + if: always() + with: + path: /tmp/linearizability/* diff --git a/Makefile b/Makefile index fb05e5ca505..4c3bcfab52b 100644 --- a/Makefile +++ b/Makefile @@ -7,25 +7,31 @@ build: # Tests +GO_TEST_FLAGS?= + .PHONY: test test: - PASSES="unit integration release e2e" ./scripts/test.sh + PASSES="unit integration release e2e" ./scripts/test.sh $(GO_TEST_FLAGS) .PHONY: test-unit test-unit: - PASSES="unit" ./scripts/test.sh + PASSES="unit" ./scripts/test.sh $(GO_TEST_FLAGS) .PHONY: test-integration test-integration: - PASSES="integration" ./scripts/test.sh + PASSES="integration" ./scripts/test.sh $(GO_TEST_FLAGS) .PHONY: test-e2e test-e2e: build - PASSES="e2e" ./scripts/test.sh + PASSES="e2e" ./scripts/test.sh $(GO_TEST_FLAGS) .PHONY: test-e2e-release test-e2e-release: build - PASSES="release e2e" ./scripts/test.sh + PASSES="release e2e" ./scripts/test.sh $(GO_TEST_FLAGS) + +.PHONY: test-linearizability +test-linearizability: build + PASSES="linearizability" ./scripts/test.sh $(GO_TEST_FLAGS) # Static analysis diff --git a/bill-of-materials.json b/bill-of-materials.json index fae8496354b..9c865b55c30 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -8,6 +8,15 @@ } ] }, + { + "project": "github.com/anishathalye/porcupine", + "licenses": [ + { + "type": "MIT License", + "confidence": 1 + } + ] + }, { "project": "github.com/benbjohnson/clock", "licenses": [ diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 3eb636aacbc..c565a73bfbc 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -182,6 +182,11 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error { return ep.cmd.Process.Signal(sig) } +func (ep *ExpectProcess) Wait() error { + _, err := ep.cmd.Process.Wait() + return err +} + // Close waits for the expect process to exit. // Close currently does not return error if process exited with !=0 status. // TODO: Close should expose underlying process failure by default. diff --git a/scripts/test.sh b/scripts/test.sh index 1be4c03578a..eab6539fe5f 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -124,6 +124,11 @@ function e2e_pass { run_for_module "tests" go_test "./common/..." "keep_going" : --tags=e2e -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@" } +function linearizability_pass { + # e2e tests are running pre-build binary. Settings like --race,-cover,-cpu does not have any impact. + run_for_module "tests" go_test "./linearizability/..." "keep_going" : -timeout="${TIMEOUT:-30m}" "${RUN_ARG[@]}" "$@" +} + function integration_e2e_pass { run_pass "integration" "${@}" run_pass "e2e" "${@}" diff --git a/tests/framework/e2e.go b/tests/framework/e2e.go index 1935cbe0f6d..f10e453e6f5 100644 --- a/tests/framework/e2e.go +++ b/tests/framework/e2e.go @@ -109,6 +109,10 @@ func (c *e2eCluster) Client(cfg clientv3.AuthConfig) (Client, error) { return e2eClient{etcdctl}, nil } +func (c *e2eCluster) Endpoints() []string { + return c.EndpointsV3() +} + func (c *e2eCluster) Members() (ms []Member) { for _, proc := range c.EtcdProcessCluster.Procs { ms = append(ms, e2eMember{EtcdProcess: proc, Cfg: c.Cfg}) diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 36042f287a1..3cdfac6981a 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -103,6 +103,14 @@ func (p *proxyEtcdProcess) Logs() LogsExpect { return p.etcdProc.Logs() } +func (p *proxyEtcdProcess) Kill() error { + return p.etcdProc.Kill() +} + +func (p *proxyEtcdProcess) Wait() error { + return p.etcdProc.Wait() +} + type proxyProc struct { lg *zap.Logger name string diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index b355669c01a..c7bc43bf93d 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "os" + "syscall" "testing" "time" @@ -38,12 +39,14 @@ type EtcdProcess interface { EndpointsV3() []string EndpointsMetrics() []string + Wait() error Start(ctx context.Context) error Restart(ctx context.Context) error Stop() error Close() error Config() *EtcdServerProcessConfig Logs() LogsExpect + Kill() error } type LogsExpect interface { @@ -173,6 +176,22 @@ func (ep *EtcdServerProcess) Logs() LogsExpect { return ep.proc } +func (ep *EtcdServerProcess) Kill() error { + ep.cfg.lg.Info("killing server...", zap.String("name", ep.cfg.Name)) + return ep.proc.Signal(syscall.SIGKILL) +} + +func (ep *EtcdServerProcess) Wait() error { + err := ep.proc.Wait() + if err != nil { + ep.cfg.lg.Error("failed to wait for server exit", zap.String("name", ep.cfg.Name)) + return err + } + ep.cfg.lg.Info("server exited", zap.String("name", ep.cfg.Name)) + ep.proc = nil + return nil +} + func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { t.Helper() var err error diff --git a/tests/framework/interface.go b/tests/framework/interface.go index 696879be9f2..3f49b3fd764 100644 --- a/tests/framework/interface.go +++ b/tests/framework/interface.go @@ -33,6 +33,7 @@ type Cluster interface { Client(cfg clientv3.AuthConfig) (Client, error) WaitLeader(t testing.TB) int Close() error + Endpoints() []string } type Member interface { diff --git a/tests/go.mod b/tests/go.mod index 572e232e81f..dad4d071c3b 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -15,6 +15,7 @@ replace ( ) require ( + github.com/anishathalye/porcupine v0.1.2 github.com/coreos/go-semver v0.3.0 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 diff --git a/tests/go.sum b/tests/go.sum index 7c4432c9c84..1d758e57df6 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -43,6 +43,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/anishathalye/porcupine v0.1.2 h1:eqWNeLcnTzXt6usipDJ4RFn6XOWqY5wEqBYVG3yFLSE= +github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go new file mode 100644 index 00000000000..1b3f9f72ef4 --- /dev/null +++ b/tests/linearizability/client.go @@ -0,0 +1,87 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "context" + "time" + + "github.com/anishathalye/porcupine" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +type recordingClient struct { + client clientv3.Client + id int + + operations []porcupine.Operation +} + +func NewClient(endpoints []string, id int) (*recordingClient, error) { + cc, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return nil, err + } + return &recordingClient{ + client: *cc, + id: id, + operations: []porcupine.Operation{}, + }, nil +} + +func (c *recordingClient) Close() error { + return c.client.Close() +} + +func (c *recordingClient) Get(ctx context.Context, key string) error { + callTime := time.Now() + resp, err := c.client.Get(ctx, key) + returnTime := time.Now() + if err != nil { + return err + } + var readData string + if len(resp.Kvs) == 1 { + readData = string(resp.Kvs[0].Value) + } + c.operations = append(c.operations, porcupine.Operation{ + ClientId: c.id, + Input: etcdRequest{op: Get, key: key}, + Call: callTime.UnixNano(), + Output: etcdResponse{getData: readData}, + Return: returnTime.UnixNano(), + }) + return nil +} + +func (c *recordingClient) Put(ctx context.Context, key, value string) error { + callTime := time.Now() + _, err := c.client.Put(ctx, key, value) + returnTime := time.Now() + c.operations = append(c.operations, porcupine.Operation{ + ClientId: c.id, + Input: etcdRequest{op: Put, key: key, putData: value}, + Call: callTime.UnixNano(), + Output: etcdResponse{err: err}, + Return: returnTime.UnixNano(), + }) + return nil +} diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go new file mode 100644 index 00000000000..eddbd53c8f3 --- /dev/null +++ b/tests/linearizability/failpoints.go @@ -0,0 +1,49 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "context" + "math/rand" + + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +var ( + KillFailpoint Failpoint = killFailpoint{} +) + +type Failpoint interface { + Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error +} + +type killFailpoint struct{} + +func (f killFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error { + member := clus.Procs[rand.Int()%len(clus.Procs)] + err := member.Kill() + if err != nil { + return err + } + err = member.Wait() + if err != nil { + return err + } + err = member.Start(ctx) + if err != nil { + return err + } + return nil +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go new file mode 100644 index 00000000000..fc4d0629db3 --- /dev/null +++ b/tests/linearizability/linearizability_test.go @@ -0,0 +1,183 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/anishathalye/porcupine" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "golang.org/x/time/rate" +) + +const ( + // minimalQPS is used to validate if enough traffic is send to make tests accurate. + minimalQPS = 100.0 + // maximalQPS limits number of requests send to etcd to avoid linearizability analysis taking too long. + maximalQPS = 200.0 + // failpointTriggersCount + failpointTriggersCount = 60 + // waitBetweenFailpointTriggers + waitBetweenFailpointTriggers = time.Second +) + +func TestLinearizability(t *testing.T) { + testRunner.BeforeTest(t) + tcs := []struct { + name string + failpoint Failpoint + config e2e.EtcdProcessClusterConfig + }{ + { + name: "KillClusterOfSize1", + failpoint: KillFailpoint, + config: e2e.EtcdProcessClusterConfig{ + ClusterSize: 1, + }, + }, + { + name: "KillClusterOfSize3", + failpoint: KillFailpoint, + config: e2e.EtcdProcessClusterConfig{ + ClusterSize: 3, + }, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + failpoint := FailpointConfig{ + failpoint: tc.failpoint, + count: failpointTriggersCount, + waitBetweenTriggers: waitBetweenFailpointTriggers, + } + traffic := trafficConfig{ + minimalQPS: minimalQPS, + maximalQPS: maximalQPS, + clientCount: 8, + traffic: PutGetTraffic, + } + testLinearizability(context.Background(), t, tc.config, failpoint, traffic) + }) + } +} + +func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) { + clus, err := e2e.NewEtcdProcessCluster(ctx, t, &config) + if err != nil { + t.Fatal(err) + } + defer clus.Close() + ctx, cancel := context.WithCancel(ctx) + go func() { + defer cancel() + err := triggerFailpoints(ctx, clus, failpoint) + if err != nil { + t.Error(err) + } + }() + operations := simulateTraffic(ctx, t, clus, traffic) + clus.Close() + + linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0) + if linearizable != porcupine.Ok { + t.Error("Model is not linearizable") + } + + path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.Replace(t.Name(), "/", "_", -1)+".html")) + if err != nil { + t.Error(err) + } + err = porcupine.VisualizePath(etcdModel, info, path) + if err != nil { + t.Errorf("Failed to visualize, err: %v", err) + } + t.Logf("saving visualization to %q", path) +} + +func triggerFailpoints(ctx context.Context, clus *e2e.EtcdProcessCluster, config FailpointConfig) error { + var err error + successes := 0 + failures := 0 + time.Sleep(config.waitBetweenTriggers) + for successes < config.count && failures < config.count { + err = config.failpoint.Trigger(ctx, clus) + if err != nil { + failures++ + continue + } + successes++ + time.Sleep(config.waitBetweenTriggers) + } + if successes < config.count || failures >= config.count { + return fmt.Errorf("failed to trigger failpoints enough times, err: %v", err) + } + return nil +} + +type FailpointConfig struct { + failpoint Failpoint + count int + waitBetweenTriggers time.Duration +} + +func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, config trafficConfig) (operations []porcupine.Operation) { + mux := sync.Mutex{} + endpoints := clus.EndpointsV3() + + limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) + + startTime := time.Now() + wg := sync.WaitGroup{} + for i := 0; i < config.clientCount; i++ { + wg.Add(1) + endpoints := []string{endpoints[i%len(endpoints)]} + c, err := NewClient(endpoints, i) + if err != nil { + t.Fatal(err) + } + go func(c *recordingClient) { + defer wg.Done() + defer c.Close() + + config.traffic.Run(ctx, c, limiter) + mux.Lock() + operations = append(operations, c.operations...) + mux.Unlock() + }(c) + } + wg.Wait() + endTime := time.Now() + t.Logf("Recorded %d operations", len(operations)) + + qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second) + t.Logf("Average traffic: %f qps", qps) + if qps < config.minimalQPS { + t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps) + } + return operations +} + +type trafficConfig struct { + minimalQPS float64 + maximalQPS float64 + clientCount int + traffic Traffic +} diff --git a/tests/linearizability/main_test.go b/tests/linearizability/main_test.go new file mode 100644 index 00000000000..63ee784eca6 --- /dev/null +++ b/tests/linearizability/main_test.go @@ -0,0 +1,41 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "os" + "path/filepath" + "testing" + + "go.etcd.io/etcd/tests/v3/framework" +) + +var testRunner = framework.E2eTestRunner +var resultsDirectory string + +func TestMain(m *testing.M) { + var ok bool + var err error + resultsDirectory, ok = os.LookupEnv("RESULTS_DIR") + if !ok { + resultsDirectory = "/tmp/" + } + resultsDirectory, err = filepath.Abs(resultsDirectory) + if err != nil { + panic(err) + } + + testRunner.TestMain(m) +} diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go new file mode 100644 index 00000000000..389f2fd8953 --- /dev/null +++ b/tests/linearizability/model.go @@ -0,0 +1,120 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "encoding/json" + "fmt" + + "github.com/anishathalye/porcupine" +) + +type Operation int8 + +const Get Operation = 0 +const Put Operation = 1 + +type etcdRequest struct { + op Operation + key string + putData string +} + +type etcdResponse struct { + getData string + err error +} + +type EtcdState struct { + Key string + Value string + FailedWrites map[string]struct{} +} + +var etcdModel = porcupine.Model{ + Init: func() interface{} { return "{}" }, + Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { + var state EtcdState + err := json.Unmarshal([]byte(st.(string)), &state) + if err != nil { + panic(err) + } + if state.FailedWrites == nil { + state.FailedWrites = map[string]struct{}{} + } + ok, state := step(state, in.(etcdRequest), out.(etcdResponse)) + data, err := json.Marshal(state) + if err != nil { + panic(err) + } + return ok, string(data) + }, + DescribeOperation: func(in, out interface{}) string { + request := in.(etcdRequest) + response := out.(etcdResponse) + var resp string + switch request.op { + case Get: + if response.err != nil { + resp = response.err.Error() + } else { + resp = response.getData + } + return fmt.Sprintf("get(%q) -> %q", request.key, resp) + case Put: + if response.err != nil { + resp = response.err.Error() + } else { + resp = "ok" + } + return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, resp) + default: + return "" + } + }, +} + +func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { + if request.key == "" { + panic("Invalid request") + } + if state.Key == "" { + state.Key = request.key + } + if state.Key != request.key { + panic("Multiple keys not supported") + } + switch request.op { + case Get: + if state.Value == response.getData { + return true, state + } + for write := range state.FailedWrites { + if write == response.getData { + state.Value = response.getData + delete(state.FailedWrites, write) + return true, state + } + } + case Put: + if response.err == nil { + state.Value = request.putData + } else { + state.FailedWrites[request.putData] = struct{}{} + } + return true, state + } + return false, state +} diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model_test.go new file mode 100644 index 00000000000..1e29b070cb3 --- /dev/null +++ b/tests/linearizability/model_test.go @@ -0,0 +1,83 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "errors" + "github.com/anishathalye/porcupine" + "testing" +) + +func TestModel(t *testing.T) { + tcs := []struct { + name string + okOperations []porcupine.Operation + failOperation *porcupine.Operation + }{ + { + name: "Etcd must return what was written", + okOperations: []porcupine.Operation{ + {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{}}, + {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, + }, + failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "2"}}, + }, + { + name: "Etcd can crash after storing result but before returning success to client", + okOperations: []porcupine.Operation{ + {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, + {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, + }, + }, + { + name: "Etcd can crash before storing result", + okOperations: []porcupine.Operation{ + {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, + {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, + }, + }, + { + name: "Etcd can continue errored request after it failed", + okOperations: []porcupine.Operation{ + {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, + {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, + {Input: etcdRequest{op: Put, key: "key"}, Output: etcdResponse{getData: "2"}}, + {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, + }, + failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, + }, + } + for _, tc := range tcs { + var ok bool + t.Run(tc.name, func(t *testing.T) { + state := etcdModel.Init() + for _, op := range tc.okOperations { + t.Logf("state: %v", state) + ok, state = etcdModel.Step(state, op.Input, op.Output) + if !ok { + t.Errorf("Unexpected failed operation: %s", etcdModel.DescribeOperation(op.Input, op.Output)) + } + } + if tc.failOperation != nil { + t.Logf("state: %v", state) + ok, state = etcdModel.Step(state, tc.failOperation.Input, tc.failOperation.Output) + if ok { + t.Errorf("Unexpected succesfull operation: %s", etcdModel.DescribeOperation(tc.failOperation.Input, tc.failOperation.Output)) + } + + } + }) + } +} diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go new file mode 100644 index 00000000000..880f55714c3 --- /dev/null +++ b/tests/linearizability/traffic.go @@ -0,0 +1,64 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linearizability + +import ( + "context" + "fmt" + "time" + + "golang.org/x/time/rate" +) + +var ( + PutGetTraffic Traffic = putGetTraffic{} +) + +type Traffic interface { + Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) +} + +type putGetTraffic struct{} + +func (t putGetTraffic) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) { + maxOperationsPerClient := 1000000 + id := maxOperationsPerClient * c.id + key := "key" + + for i := 0; i < maxOperationsPerClient; { + select { + case <-ctx.Done(): + return + default: + } + getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) + err := c.Get(getCtx, key) + cancel() + if err != nil { + continue + } + limiter.Wait(ctx) + putData := fmt.Sprintf("%d", id+i) + putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) + err = c.Put(putCtx, key, putData) + cancel() + if err != nil { + continue + } + limiter.Wait(ctx) + i++ + } + return +}