diff --git a/go.mod b/go.mod index 176db0e019..63401edc0b 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/googleapis/gax-go v2.0.2+incompatible github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1 + github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 @@ -159,7 +160,6 @@ require ( github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/pkg/targets/prometheus_test.go b/pkg/targets/prometheus_test.go index a97bc8f79b..c73846a7dc 100644 --- a/pkg/targets/prometheus_test.go +++ b/pkg/targets/prometheus_test.go @@ -118,9 +118,15 @@ scrape_configs: requestedState targetspb.TargetsRequest_State expectedErr error }{ - {requestedState: targetspb.TargetsRequest_ANY}, - {requestedState: targetspb.TargetsRequest_ACTIVE}, - {requestedState: targetspb.TargetsRequest_DROPPED}, + { + requestedState: targetspb.TargetsRequest_ANY, + }, + { + requestedState: targetspb.TargetsRequest_ACTIVE, + }, + { + requestedState: targetspb.TargetsRequest_DROPPED, + }, } { t.Run(tcase.requestedState.String(), func(t *testing.T) { targets, w, err := grpcClient.Targets(context.Background(), &targetspb.TargetsRequest{ @@ -138,9 +144,9 @@ scrape_configs: switch tcase.requestedState { case targetspb.TargetsRequest_ACTIVE: - expectedTargets.DroppedTargets = nil + expectedTargets.DroppedTargets = expectedTargets.DroppedTargets[:0] case targetspb.TargetsRequest_DROPPED: - expectedTargets.ActiveTargets = nil + expectedTargets.ActiveTargets = expectedTargets.ActiveTargets[:0] } for i := range targets.ActiveTargets { diff --git a/pkg/targets/proxy_test.go b/pkg/targets/proxy_test.go index 6f2598ad0b..08fc8701af 100644 --- a/pkg/targets/proxy_test.go +++ b/pkg/targets/proxy_test.go @@ -7,33 +7,21 @@ import ( "context" "io" "os" - "sort" "testing" "github.com/go-kit/log" "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/testutil" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets/targetspb" ) -func TestMain(m *testing.M) { - testutil.TolerantVerifyLeakMain(m) -} - -type testTargetRecv struct { - // A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. - err error - resp *targetspb.TargetDiscovery -} - type testTargetsClient struct { grpc.ClientStream - targetsErr error - recvs []testTargetRecv - i int + targetsErr, recvErr error + response *targetspb.TargetsResponse + sentResponse bool } func (t *testTargetsClient) String() string { @@ -41,105 +29,45 @@ func (t *testTargetsClient) String() string { } func (t *testTargetsClient) Recv() (*targetspb.TargetsResponse, error) { - if t.i+1 >= len(t.recvs) { + // A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + if t.recvErr != nil { + return nil, t.recvErr + } + + if t.sentResponse { return nil, io.EOF } - t.i++ - return targetspb.NewTargetsResponse(t.recvs[t.i].resp), t.recvs[t.i].err + t.sentResponse = true + + return t.response, nil } -func (t *testTargetsClient) Targets(context.Context, *targetspb.TargetsRequest, ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) { - t.i = -1 +func (t *testTargetsClient) Targets(ctx context.Context, in *targetspb.TargetsRequest, opts ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) { return t, t.targetsErr } var _ targetspb.TargetsClient = &testTargetsClient{} -func TestProxy(t *testing.T) { - activeTarget1 := &targetspb.ActiveTarget{GlobalUrl: "test1"} - activeTarget2 := &targetspb.ActiveTarget{GlobalUrl: "test2"} - activeTarget3 := &targetspb.ActiveTarget{GlobalUrl: "test3"} - - for _, tcase := range []struct { - clients []targetspb.TargetsClient - expectedWarnNum int - expectedDiscovery *targetspb.TargetDiscovery - }{ - { - clients: []targetspb.TargetsClient{ - &testTargetsClient{targetsErr: errors.New("err1")}, - &testTargetsClient{targetsErr: errors.New("err2")}, - }, - expectedWarnNum: 2, - }, - { - clients: []targetspb.TargetsClient{ - &testTargetsClient{targetsErr: errors.New("err1")}, - &testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}}}, - }, - expectedWarnNum: 2, - }, - { - clients: []targetspb.TargetsClient{ - &testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}, {resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1}}}}}, - }, - expectedWarnNum: 1, - }, - { - clients: []targetspb.TargetsClient{ - &testTargetsClient{recvs: []testTargetRecv{ - {resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1}}}, - {resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget2}}}, - }}, - &testTargetsClient{recvs: []testTargetRecv{ - {resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget3}}}, - }}, - &testTargetsClient{recvs: []testTargetRecv{ - {err: errors.New("err")}, - }}, - }, - expectedWarnNum: 1, - expectedDiscovery: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1, activeTarget2, activeTarget3}}, - }, - { - // Reproduced the concurrent data race bug we had ( go test -race -run TestProxyDataRace -v ). - clients: func() []targetspb.TargetsClient { - size := 100 - endpoints := make([]targetspb.TargetsClient, 0, size) - for i := 0; i < size; i++ { - endpoints = append(endpoints, &testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}}}) - } - return endpoints - }(), - expectedWarnNum: 100, - }, - } { - if ok := t.Run("", func(t *testing.T) { - if tcase.expectedDiscovery == nil { - tcase.expectedDiscovery = &targetspb.TargetDiscovery{} - } - - logger := log.NewLogfmtLogger(os.Stderr) - p := NewProxy(logger, func() []targetspb.TargetsClient { return tcase.clients }) - req := &targetspb.TargetsRequest{ - State: targetspb.TargetsRequest_ANY, - PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - } - - // Test idempotency. - for i := 0; i < 3; i++ { - s := &targetsServer{ctx: context.Background(), targets: &targetspb.TargetDiscovery{}} - testutil.Ok(t, p.Targets(req, s)) - testutil.Equals(t, tcase.expectedWarnNum, len(s.warnings)) - - sort.Slice(s.targets.ActiveTargets, func(i, j int) bool { - return s.targets.ActiveTargets[i].GlobalUrl < s.targets.ActiveTargets[j].GlobalUrl - }) - testutil.Equals(t, tcase.expectedDiscovery, s.targets) - } - }); !ok { - return +// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ). +func TestProxyDataRace(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + p := NewProxy(logger, func() []targetspb.TargetsClient { + es := &testTargetsClient{ + recvErr: errors.New("err"), + } + size := 100 + endpoints := make([]targetspb.TargetsClient, 0, size) + for i := 0; i < size; i++ { + endpoints = append(endpoints, es) } + return endpoints + }) + req := &targetspb.TargetsRequest{ + State: targetspb.TargetsRequest_ANY, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, } - + s := &targetsServer{ + ctx: context.Background(), + } + _ = p.Targets(req, s) } diff --git a/pkg/targets/targets.go b/pkg/targets/targets.go index c24874952a..9a1b4016c3 100644 --- a/pkg/targets/targets.go +++ b/pkg/targets/targets.go @@ -47,7 +47,10 @@ func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string) } func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) { - resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{}} + resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{ + ActiveTargets: make([]*targetspb.ActiveTarget, 0), + DroppedTargets: make([]*targetspb.DroppedTarget, 0), + }} if err := rr.proxy.Targets(req, resp); err != nil { return nil, nil, errors.Wrap(err, "proxy Targets")