From 3c3f2e2e04af14013e512e27d8d51a04a0ce3b17 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 14 Dec 2020 20:25:59 +0000 Subject: [PATCH] store: Added inprocess server to client. Signed-off-by: Bartlomiej Plotka --- pkg/store/storepb/inprocess.go | 94 ++++++++++ pkg/store/storepb/inprocess_test.go | 262 ++++++++++++++++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 pkg/store/storepb/inprocess.go create mode 100644 pkg/store/storepb/inprocess_test.go diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go new file mode 100644 index 0000000000..7185e1ba62 --- /dev/null +++ b/pkg/store/storepb/inprocess.go @@ -0,0 +1,94 @@ +package storepb + +import ( + "context" + "io" + + "google.golang.org/grpc" +) + +func ServerAsClient(srv StoreServer, clientReceiveBufferSize int) StoreClient { + return &serverAsClient{srv: srv, clientReceiveBufferSize: clientReceiveBufferSize} +} + +// serverAsClient allows to use servers as clients. +// NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private). +type serverAsClient struct { + clientReceiveBufferSize int + srv StoreServer +} + +func (s serverAsClient) Info(ctx context.Context, in *InfoRequest, _ ...grpc.CallOption) (*InfoResponse, error) { + return s.srv.Info(ctx, in) +} + +func (s serverAsClient) LabelNames(ctx context.Context, in *LabelNamesRequest, _ ...grpc.CallOption) (*LabelNamesResponse, error) { + return s.srv.LabelNames(ctx, in) +} + +func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, _ ...grpc.CallOption) (*LabelValuesResponse, error) { + return s.srv.LabelValues(ctx, in) +} + +func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) { + inSrv := &inProcessStream{recv: make(chan *SeriesResponse, s.clientReceiveBufferSize), err: make(chan error)} + inSrv.ctx, inSrv.cancel = context.WithCancel(ctx) + go func() { + inSrv.err <- s.srv.Series(in, inSrv) + close(inSrv.err) + close(inSrv.recv) + }() + return &inProcessClientStream{srv: inSrv}, nil +} + +// TODO(bwplotka): Streams attributes, metadata etc are disconnected. Follow up on https://github.com/grpc/grpc-go/issues/906 to +// have solid solution. +type inProcessStream struct { + grpc.ServerStream + + ctx context.Context + cancel context.CancelFunc + recv chan *SeriesResponse + err chan error +} + +func (s *inProcessStream) Context() context.Context { return s.ctx } + +func (s *inProcessStream) Send(r *SeriesResponse) error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case s.recv <- r: + return nil + } +} + +type inProcessClientStream struct { + grpc.ClientStream + + srv *inProcessStream +} + +func (s *inProcessClientStream) Context() context.Context { return s.srv.ctx } + +func (s *inProcessClientStream) CloseSend() error { + s.srv.cancel() + return nil +} + +func (s *inProcessClientStream) Recv() (*SeriesResponse, error) { + select { + case <-s.srv.ctx.Done(): + return nil, s.srv.ctx.Err() + case r, ok := <-s.srv.recv: + if !ok { + return nil, io.EOF + } + return r, nil + case err := <-s.srv.err: + if err == nil { + return nil, io.EOF + } + return nil, err + } +} diff --git a/pkg/store/storepb/inprocess_test.go b/pkg/store/storepb/inprocess_test.go new file mode 100644 index 0000000000..9e1589f071 --- /dev/null +++ b/pkg/store/storepb/inprocess_test.go @@ -0,0 +1,262 @@ +package storepb + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +type testStoreServer struct { + info *InfoResponse + infoLastReq *InfoRequest + + series []*SeriesResponse + seriesLastReq *SeriesRequest + + labelNames *LabelNamesResponse + labelNamesLastReq *LabelNamesRequest + + labelValues *LabelValuesResponse + labelValuesLastReq *LabelValuesRequest + + err error +} + +func (t *testStoreServer) Info(_ context.Context, r *InfoRequest) (*InfoResponse, error) { + t.infoLastReq = r + return t.info, t.err +} + +func (t *testStoreServer) Series(r *SeriesRequest, server Store_SeriesServer) error { + t.seriesLastReq = r + for i, s := range t.series { + if t.err != nil && i == len(t.series)/2 { + return t.err + } + if err := server.Send(s); err != nil { + return err + } + } + return nil +} + +func (t *testStoreServer) LabelNames(_ context.Context, r *LabelNamesRequest) (*LabelNamesResponse, error) { + t.labelNamesLastReq = r + return t.labelNames, t.err +} + +func (t *testStoreServer) LabelValues(_ context.Context, r *LabelValuesRequest) (*LabelValuesResponse, error) { + t.labelValuesLastReq = r + return t.labelValues, t.err +} + +func TestServerAsClient(t *testing.T) { + for _, bufferSize := range []int{0, 1, 20, 100} { + t.Run(fmt.Sprintf("buffer=%v", bufferSize), func(t *testing.T) { + t.Run("Info", func(t *testing.T) { + s := &testStoreServer{ + info: &InfoResponse{ + LabelSets: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}}}, + MinTime: -1, + MaxTime: 10, + StoreType: StoreType_DEBUG, + }} + t.Run("ok", func(t *testing.T) { + for i := 0; i < 20; i++ { + r := &InfoRequest{} + resp, err := ServerAsClient(s, 0).Info(context.TODO(), r) + testutil.Ok(t, err) + testutil.Equals(t, s.info, resp) + testutil.Equals(t, r, s.infoLastReq) + s.infoLastReq = nil + } + }) + t.Run("error", func(t *testing.T) { + s.err = errors.New("some error") + for i := 0; i < 20; i++ { + r := &InfoRequest{} + _, err := ServerAsClient(s, 0).Info(context.TODO(), r) + testutil.NotOk(t, err) + testutil.Equals(t, s.err, err) + } + }) + }) + t.Run("Series", func(t *testing.T) { + s := &testStoreServer{ + series: []*SeriesResponse{ + NewSeriesResponse(&Series{ + Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}, + Chunks: []AggrChunk{{MinTime: 123, MaxTime: 124}, {MinTime: 12455, MaxTime: 14124}}, + }), + NewSeriesResponse(&Series{ + Labels: []labelpb.ZLabel{{Name: "a", Value: "b1"}}, + Chunks: []AggrChunk{{MinTime: 1231, MaxTime: 124}, {MinTime: 12455, MaxTime: 14124}}, + }), + NewWarnSeriesResponse(errors.New("yolo")), + NewSeriesResponse(&Series{ + Labels: []labelpb.ZLabel{{Name: "a", Value: "b3"}}, + Chunks: []AggrChunk{{MinTime: 123, MaxTime: 124}, {MinTime: 124554, MaxTime: 14124}}, + }), + }} + t.Run("ok", func(t *testing.T) { + for i := 0; i < 20; i++ { + r := &SeriesRequest{ + MinTime: -214, + MaxTime: 213, + Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + client, err := ServerAsClient(s, 0).Series(context.TODO(), r) + testutil.Ok(t, err) + var resps []*SeriesResponse + for { + resp, err := client.Recv() + if err == io.EOF { + break + } + testutil.Ok(t, err) + resps = append(resps, resp) + } + testutil.Equals(t, s.series, resps) + testutil.Equals(t, r, s.seriesLastReq) + s.seriesLastReq = nil + } + }) + t.Run("ok, close send", func(t *testing.T) { + s.err = errors.New("some error") + for i := 0; i < 20; i++ { + r := &SeriesRequest{ + MinTime: -214, + MaxTime: 213, + Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + client, err := ServerAsClient(s, 0).Series(context.TODO(), r) + testutil.Ok(t, err) + var resps []*SeriesResponse + for { + if len(resps) == len(s.series)/2 { + testutil.Ok(t, client.CloseSend()) + break + } + resp, err := client.Recv() + if err == io.EOF { + break + } + testutil.Ok(t, err) + resps = append(resps, resp) + } + testutil.Equals(t, s.series[:len(s.series)/2], resps) + testutil.Equals(t, r, s.seriesLastReq) + s.seriesLastReq = nil + } + }) + t.Run("error", func(t *testing.T) { + for i := 0; i < 20; i++ { + r := &SeriesRequest{ + MinTime: -214, + MaxTime: 213, + Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + client, err := ServerAsClient(s, 0).Series(context.TODO(), r) + testutil.Ok(t, err) + var resps []*SeriesResponse + for { + resp, err := client.Recv() + if err == io.EOF { + break + } + if err == s.err { + break + } + testutil.Ok(t, err) + resps = append(resps, resp) + } + testutil.Equals(t, s.series[:len(s.series)/2], resps) + testutil.Equals(t, r, s.seriesLastReq) + s.seriesLastReq = nil + } + }) + }) + t.Run("LabelNames", func(t *testing.T) { + s := &testStoreServer{ + info: &InfoResponse{ + LabelSets: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}}}, + MinTime: -1, + MaxTime: 10, + StoreType: StoreType_DEBUG, + }} + t.Run("ok", func(t *testing.T) { + for i := 0; i < 20; i++ { + r := &LabelNamesRequest{ + Start: -1, + End: 234, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + resp, err := ServerAsClient(s, 0).LabelNames(context.TODO(), r) + testutil.Ok(t, err) + testutil.Equals(t, s.labelNames, resp) + testutil.Equals(t, r, s.labelNamesLastReq) + s.labelNamesLastReq = nil + } + }) + t.Run("error", func(t *testing.T) { + s.err = errors.New("some error") + for i := 0; i < 20; i++ { + r := &LabelNamesRequest{ + Start: -1, + End: 234, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + _, err := ServerAsClient(s, 0).LabelNames(context.TODO(), r) + testutil.NotOk(t, err) + testutil.Equals(t, s.err, err) + } + }) + }) + t.Run("LabelValues", func(t *testing.T) { + s := &testStoreServer{ + labelValues: &LabelValuesResponse{ + Warnings: []string{"1", "a"}, + Values: []string{"abc1", "go_goroutines"}, + }, + } + t.Run("ok", func(t *testing.T) { + for i := 0; i < 20; i++ { + r := &LabelValuesRequest{ + Label: "__name__", + Start: -1, + End: 234, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + resp, err := ServerAsClient(s, 0).LabelValues(context.TODO(), r) + testutil.Ok(t, err) + testutil.Equals(t, s.labelValues, resp) + testutil.Equals(t, r, s.labelValuesLastReq) + s.labelValuesLastReq = nil + } + }) + t.Run("error", func(t *testing.T) { + s.err = errors.New("some error") + for i := 0; i < 20; i++ { + r := &LabelValuesRequest{ + Label: "__name__", + Start: -1, + End: 234, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + _, err := ServerAsClient(s, 0).LabelValues(context.TODO(), r) + testutil.NotOk(t, err) + testutil.Equals(t, s.err, err) + } + }) + }) + }) + } +}