Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Added target API tests (+goleak)." #5297

Merged
merged 1 commit into from
Apr 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down Expand Up @@ -159,7 +160,6 @@ require (
github.com/google/uuid v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
16 changes: 11 additions & 5 deletions pkg/targets/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,15 @@ scrape_configs:
requestedState targetspb.TargetsRequest_State
expectedErr error
}{
{requestedState: targetspb.TargetsRequest_ANY},
{requestedState: targetspb.TargetsRequest_ACTIVE},
{requestedState: targetspb.TargetsRequest_DROPPED},
{
requestedState: targetspb.TargetsRequest_ANY,
},
{
requestedState: targetspb.TargetsRequest_ACTIVE,
},
{
requestedState: targetspb.TargetsRequest_DROPPED,
},
} {
t.Run(tcase.requestedState.String(), func(t *testing.T) {
targets, w, err := grpcClient.Targets(context.Background(), &targetspb.TargetsRequest{
Expand All @@ -138,9 +144,9 @@ scrape_configs:

switch tcase.requestedState {
case targetspb.TargetsRequest_ACTIVE:
expectedTargets.DroppedTargets = nil
expectedTargets.DroppedTargets = expectedTargets.DroppedTargets[:0]
case targetspb.TargetsRequest_DROPPED:
expectedTargets.ActiveTargets = nil
expectedTargets.ActiveTargets = expectedTargets.ActiveTargets[:0]
}

for i := range targets.ActiveTargets {
Expand Down
138 changes: 33 additions & 105 deletions pkg/targets/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,139 +7,67 @@ import (
"context"
"io"
"os"
"sort"
"testing"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/testutil"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
)

func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type testTargetRecv struct {
// A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125.
err error
resp *targetspb.TargetDiscovery
}

type testTargetsClient struct {
grpc.ClientStream
targetsErr error
recvs []testTargetRecv
i int
targetsErr, recvErr error
response *targetspb.TargetsResponse
sentResponse bool
}

func (t *testTargetsClient) String() string {
return "test"
}

func (t *testTargetsClient) Recv() (*targetspb.TargetsResponse, error) {
if t.i+1 >= len(t.recvs) {
// A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125.
if t.recvErr != nil {
return nil, t.recvErr
}

if t.sentResponse {
return nil, io.EOF
}
t.i++
return targetspb.NewTargetsResponse(t.recvs[t.i].resp), t.recvs[t.i].err
t.sentResponse = true

return t.response, nil
}

func (t *testTargetsClient) Targets(context.Context, *targetspb.TargetsRequest, ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) {
t.i = -1
func (t *testTargetsClient) Targets(ctx context.Context, in *targetspb.TargetsRequest, opts ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) {
return t, t.targetsErr
}

var _ targetspb.TargetsClient = &testTargetsClient{}

func TestProxy(t *testing.T) {
activeTarget1 := &targetspb.ActiveTarget{GlobalUrl: "test1"}
activeTarget2 := &targetspb.ActiveTarget{GlobalUrl: "test2"}
activeTarget3 := &targetspb.ActiveTarget{GlobalUrl: "test3"}

for _, tcase := range []struct {
clients []targetspb.TargetsClient
expectedWarnNum int
expectedDiscovery *targetspb.TargetDiscovery
}{
{
clients: []targetspb.TargetsClient{
&testTargetsClient{targetsErr: errors.New("err1")},
&testTargetsClient{targetsErr: errors.New("err2")},
},
expectedWarnNum: 2,
},
{
clients: []targetspb.TargetsClient{
&testTargetsClient{targetsErr: errors.New("err1")},
&testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}}},
},
expectedWarnNum: 2,
},
{
clients: []targetspb.TargetsClient{
&testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}, {resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1}}}}},
},
expectedWarnNum: 1,
},
{
clients: []targetspb.TargetsClient{
&testTargetsClient{recvs: []testTargetRecv{
{resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1}}},
{resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget2}}},
}},
&testTargetsClient{recvs: []testTargetRecv{
{resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget3}}},
}},
&testTargetsClient{recvs: []testTargetRecv{
{err: errors.New("err")},
}},
},
expectedWarnNum: 1,
expectedDiscovery: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1, activeTarget2, activeTarget3}},
},
{
// Reproduced the concurrent data race bug we had ( go test -race -run TestProxyDataRace -v ).
clients: func() []targetspb.TargetsClient {
size := 100
endpoints := make([]targetspb.TargetsClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, &testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}}})
}
return endpoints
}(),
expectedWarnNum: 100,
},
} {
if ok := t.Run("", func(t *testing.T) {
if tcase.expectedDiscovery == nil {
tcase.expectedDiscovery = &targetspb.TargetDiscovery{}
}

logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []targetspb.TargetsClient { return tcase.clients })
req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_ANY,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}

// Test idempotency.
for i := 0; i < 3; i++ {
s := &targetsServer{ctx: context.Background(), targets: &targetspb.TargetDiscovery{}}
testutil.Ok(t, p.Targets(req, s))
testutil.Equals(t, tcase.expectedWarnNum, len(s.warnings))

sort.Slice(s.targets.ActiveTargets, func(i, j int) bool {
return s.targets.ActiveTargets[i].GlobalUrl < s.targets.ActiveTargets[j].GlobalUrl
})
testutil.Equals(t, tcase.expectedDiscovery, s.targets)
}
}); !ok {
return
// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ).
func TestProxyDataRace(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []targetspb.TargetsClient {
es := &testTargetsClient{
recvErr: errors.New("err"),
}
size := 100
endpoints := make([]targetspb.TargetsClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, es)
}
return endpoints
})
req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_ANY,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}

s := &targetsServer{
ctx: context.Background(),
}
_ = p.Targets(req, s)
}
5 changes: 4 additions & 1 deletion pkg/targets/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string)
}

func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) {
resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{}}
resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{
ActiveTargets: make([]*targetspb.ActiveTarget, 0),
DroppedTargets: make([]*targetspb.DroppedTarget, 0),
}}

if err := rr.proxy.Targets(req, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Targets")
Expand Down