diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index c69fcac1a79..c0015044ab6 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -25,10 +25,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -261,8 +264,7 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr // - COMPACT r5 // - WATCH rev=r5 // -// We should get the DELETE event (r5) followed by the PUT event (r6). However, currently we only -// get the PUT event with returned revision of r6 (key=k, val=v6). +// We should get the DELETE event (r5) followed by the PUT event (r6). func TestDeleteEventDrop_Issue18089(t *testing.T) { e2e.BeforeTest(t) cfg := e2e.DefaultConfig() @@ -311,24 +313,178 @@ func TestDeleteEventDrop_Issue18089(t *testing.T) { watchChan := c.Watch(ctx, key, clientv3.WithRev(deleteResp.Header.Revision)) select { case watchResp := <-watchChan: - // TODO(MadhavJivrajani): update conditions once https://github.com/etcd-io/etcd/issues/18089 - // is resolved. The existing conditions do not mimic the desired behaviour and are there to - // test and reproduce etcd-io/etcd#18089. - if len(watchResp.Events) != 1 { - t.Fatalf("expected exactly one event in response, got: %d", len(watchResp.Events)) - } - if watchResp.Events[0].Type != mvccpb.PUT { - t.Fatalf("unexpected event type, expected: %s, got: %s", mvccpb.PUT, watchResp.Events[0].Type) - } - if string(watchResp.Events[0].Kv.Key) != key { - t.Fatalf("unexpected key, expected: %s, got: %s", key, string(watchResp.Events[0].Kv.Key)) - } - if string(watchResp.Events[0].Kv.Value) != v6 { - t.Fatalf("unexpected valye, expected: %s, got: %s", v6, string(watchResp.Events[0].Kv.Value)) - } + require.Len(t, watchResp.Events, 2) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, key, deletedKey) + + require.Equal(t, mvccpb.PUT, watchResp.Events[1].Type) + + updatedKey := string(watchResp.Events[1].Kv.Key) + require.Equal(t, key, updatedKey) + + require.Equal(t, v6, string(watchResp.Events[1].Kv.Value)) case <-time.After(100 * time.Millisecond): // we care only about the first response, but have an // escape hatch in case the watch response is delayed. t.Fatal("timed out getting watch response") } } + +func TestStartWatcherFromCompactedRevision(t *testing.T) { + t.Run("compaction on tombstone revision", func(t *testing.T) { + testStartWatcherFromCompactedRevision(t, true) + }) + t.Run("compaction on normal revision", func(t *testing.T) { + testStartWatcherFromCompactedRevision(t, false) + }) +} + +func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombstone bool) { + e2e.BeforeTest(t) + cfg := e2e.DefaultConfig() + cfg.Client = e2e.ClientConfig{ConnectionType: e2e.ClientTLS} + clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithConfig(cfg), e2e.WithClusterSize(1)) + require.NoError(t, err) + defer clus.Close() + + c := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c.Close() + + ctx := context.Background() + key := "foo" + totalRev := 100 + + type valueEvent struct { + value string + typ mvccpb.Event_EventType + } + + var ( + // requestedValues records all requested change + requestedValues = make([]valueEvent, 0) + // revisionChan sends each compacted revision via this channel + compactionRevChan = make(chan int64) + // compactionStep means that client performs a compaction on every 7 operations + compactionStep = 7 + ) + + // This goroutine will submit changes on $key $totalRev times. It will + // perform compaction after every $compactedAfterChanges changes. + // Except for first time, the watcher always receives the compacted + // revision as start. + go func() { + defer close(compactionRevChan) + + lastRevision := int64(1) + + compactionRevChan <- lastRevision + for vi := 1; vi <= totalRev; vi++ { + var respHeader *etcdserverpb.ResponseHeader + + if vi%compactionStep == 0 && performCompactOnTombstone { + t.Logf("DELETE key=%s", key) + + resp, derr := c.KV.Delete(ctx, key) + require.NoError(t, derr) + respHeader = resp.Header + + requestedValues = append(requestedValues, valueEvent{value: "", typ: mvccpb.DELETE}) + } else { + value := fmt.Sprintf("%d", vi) + + t.Logf("PUT key=%s, val=%s", key, value) + resp, perr := c.KV.Put(ctx, key, value) + require.NoError(t, perr) + respHeader = resp.Header + + requestedValues = append(requestedValues, valueEvent{value: value, typ: mvccpb.PUT}) + } + + lastRevision = respHeader.Revision + + if vi%compactionStep == 0 { + compactionRevChan <- lastRevision + + t.Logf("COMPACT rev=%d", lastRevision) + _, err = c.KV.Compact(ctx, lastRevision, clientv3.WithCompactPhysical()) + require.NoError(t, err) + } + } + }() + + receivedEvents := make([]*clientv3.Event, 0) + + fromCompactedRev := false + for fromRev := range compactionRevChan { + watchChan := c.Watch(ctx, key, clientv3.WithRev(fromRev)) + + prevEventCount := len(receivedEvents) + + // firstReceived represents this is first watch response. + // Just in case that ETCD sends event one by one. + firstReceived := true + + t.Logf("Start to watch key %s starting from revision %d", key, fromRev) + watchLoop: + for { + currentEventCount := len(receivedEvents) + if currentEventCount-prevEventCount == compactionStep || currentEventCount == totalRev { + break + } + + select { + case watchResp := <-watchChan: + t.Logf("Receive the number of events: %d", len(watchResp.Events)) + for i := range watchResp.Events { + ev := watchResp.Events[i] + + // If the $fromRev is the compacted revision, + // the first event should be the same as the last event receives in last watch response. + if firstReceived && fromCompactedRev { + firstReceived = false + + last := receivedEvents[prevEventCount-1] + + assert.Equal(t, last.Type, ev.Type, + "last received event type %s, but got event type %s", last.Type, ev.Type) + assert.Equal(t, string(last.Kv.Key), string(ev.Kv.Key), + "last received event key %s, but got event key %s", string(last.Kv.Key), string(ev.Kv.Key)) + assert.Equal(t, string(last.Kv.Value), string(ev.Kv.Value), + "last received event value %s, but got event value %s", string(last.Kv.Value), string(ev.Kv.Value)) + continue + } + receivedEvents = append(receivedEvents, ev) + } + + if len(watchResp.Events) == 0 { + require.Equal(t, v3rpc.ErrCompacted, watchResp.Err()) + break watchLoop + } + + case <-time.After(10 * time.Second): + t.Fatal("timed out getting watch response") + } + } + + fromCompactedRev = true + } + + t.Logf("Received total number of events: %d", len(receivedEvents)) + require.Len(t, requestedValues, totalRev) + require.Len(t, receivedEvents, totalRev, "should receive %d events", totalRev) + for idx, expected := range requestedValues { + ev := receivedEvents[idx] + + require.Equal(t, expected.typ, ev.Type, "#%d expected event %s", idx, expected.typ) + + updatedKey := string(ev.Kv.Key) + + require.Equal(t, key, updatedKey) + if expected.typ == mvccpb.PUT { + updatedValue := string(ev.Kv.Value) + require.Equal(t, expected.value, updatedValue) + } + } +}