Skip to content

Commit

Permalink
clientv3/integration: test watch response fragmentation
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Feb 13, 2018
1 parent 6c7f556 commit 8795633
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions clientv3/integration/watch_fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

// TestWatchFragmentEnable ensures that large watch
// response exceeding server-side request limit arrive
// with watch response fragmentation.
func TestWatchFragmentEnable(t *testing.T) {
testWatchFragment(t, true, false)
}

// TestWatchFragmentDisable ensures that large watch
// response exceeding server-side request limit can
// arrive even without watch response fragmentation.
func TestWatchFragmentDisable(t *testing.T) {
testWatchFragment(t, false, false)
}

// TestWatchFragmentEnableWithGRPCLimit verifies
// large watch response exceeding server-side request
// limit and client-side gRPC response receive limit
// can arrive only when watch events are fragmented.
func TestWatchFragmentEnableWithGRPCLimit(t *testing.T) {
testWatchFragment(t, true, true)
}

// TestWatchFragmentDisableWithGRPCLimit verifies
// large watch response exceeding server-side request
// limit and client-side gRPC response receive limit
// cannot arrive without watch events fragmentation,
// because multiple events exceed client-side gRPC
// response receive limit.
func TestWatchFragmentDisableWithGRPCLimit(t *testing.T) {
testWatchFragment(t, false, true)
}

// testWatchFragment triggers watch response that spans over multiple
// revisions exceeding server request limits when combined.
func testWatchFragment(t *testing.T, fragment, overGRPCLimit bool) {
cfg := &integration.ClusterConfig{
Size: 1,
MaxRequestBytes: 1.5 * 1024 * 1024,
}
if overGRPCLimit {
cfg.ClientMaxCallRecvMsgSize = 1.5 * 1024 * 1024
}
clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)

cli := clus.Client(0)
errc := make(chan error)
for i := 0; i < 10; i++ {
go func(i int) {
_, err := cli.Put(context.TODO(),
fmt.Sprint("foo", i),
strings.Repeat("a", 1024*1024),
)
errc <- err
}(i)
}
for i := 0; i < 10; i++ {
if err := <-errc; err != nil {
t.Errorf("failed to put: %v", err)
}
}

opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithRev(1),
}
if !fragment {
opts = append(opts, clientv3.WithNoFragment())
}
wch := cli.Watch(context.TODO(), "foo", opts...)

// expect 10 MiB watch response
select {
case ws := <-wch:
// without fragment, should exceed gRPC client receive limit
if !fragment && overGRPCLimit {
if len(ws.Events) != 0 {
t.Errorf("expected 0 events with watch fragmentation, got %d", len(ws.Events))
}
exp := "code = ResourceExhausted desc = grpc: received message larger than max ("
if !strings.Contains(ws.Err().Error(), exp) {
t.Errorf("expected 'ResourceExhausted' error, got %v", ws.Err())
}
} else {
if len(ws.Events) != 10 {
t.Errorf("expected 10 events with watch fragmentation, got %d", len(ws.Events))
}
if ws.Err() != nil {
t.Errorf("unexpected error %v", ws.Err())
}
}
case <-time.After(testutil.RequestTimeout):
t.Fatalf("took too long to receive events")
}
}

0 comments on commit 8795633

Please sign in to comment.