Skip to content

Commit

Permalink
store: Added inprocess server to client.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Dec 18, 2020
1 parent 4723967 commit 3c3f2e2
Show file tree
Hide file tree
Showing 2 changed files with 356 additions and 0 deletions.
94 changes: 94 additions & 0 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package storepb

import (
"context"
"io"

"google.golang.org/grpc"
)

func ServerAsClient(srv StoreServer, clientReceiveBufferSize int) StoreClient {
return &serverAsClient{srv: srv, clientReceiveBufferSize: clientReceiveBufferSize}
}

// serverAsClient allows to use servers as clients.
// NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private).
type serverAsClient struct {
clientReceiveBufferSize int
srv StoreServer
}

func (s serverAsClient) Info(ctx context.Context, in *InfoRequest, _ ...grpc.CallOption) (*InfoResponse, error) {
return s.srv.Info(ctx, in)
}

func (s serverAsClient) LabelNames(ctx context.Context, in *LabelNamesRequest, _ ...grpc.CallOption) (*LabelNamesResponse, error) {
return s.srv.LabelNames(ctx, in)
}

func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, _ ...grpc.CallOption) (*LabelValuesResponse, error) {
return s.srv.LabelValues(ctx, in)
}

func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) {
inSrv := &inProcessStream{recv: make(chan *SeriesResponse, s.clientReceiveBufferSize), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
inSrv.err <- s.srv.Series(in, inSrv)
close(inSrv.err)
close(inSrv.recv)
}()
return &inProcessClientStream{srv: inSrv}, nil
}

// TODO(bwplotka): Streams attributes, metadata etc are disconnected. Follow up on https://github.com/grpc/grpc-go/issues/906 to
// have solid solution.
type inProcessStream struct {
grpc.ServerStream

ctx context.Context
cancel context.CancelFunc
recv chan *SeriesResponse
err chan error
}

func (s *inProcessStream) Context() context.Context { return s.ctx }

func (s *inProcessStream) Send(r *SeriesResponse) error {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case s.recv <- r:
return nil
}
}

type inProcessClientStream struct {
grpc.ClientStream

srv *inProcessStream
}

func (s *inProcessClientStream) Context() context.Context { return s.srv.ctx }

func (s *inProcessClientStream) CloseSend() error {
s.srv.cancel()
return nil
}

func (s *inProcessClientStream) Recv() (*SeriesResponse, error) {
select {
case <-s.srv.ctx.Done():
return nil, s.srv.ctx.Err()
case r, ok := <-s.srv.recv:
if !ok {
return nil, io.EOF
}
return r, nil
case err := <-s.srv.err:
if err == nil {
return nil, io.EOF
}
return nil, err
}
}
262 changes: 262 additions & 0 deletions pkg/store/storepb/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package storepb

import (
"context"
"fmt"
"io"
"testing"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/testutil"
)

type testStoreServer struct {
info *InfoResponse
infoLastReq *InfoRequest

series []*SeriesResponse
seriesLastReq *SeriesRequest

labelNames *LabelNamesResponse
labelNamesLastReq *LabelNamesRequest

labelValues *LabelValuesResponse
labelValuesLastReq *LabelValuesRequest

err error
}

func (t *testStoreServer) Info(_ context.Context, r *InfoRequest) (*InfoResponse, error) {
t.infoLastReq = r
return t.info, t.err
}

func (t *testStoreServer) Series(r *SeriesRequest, server Store_SeriesServer) error {
t.seriesLastReq = r
for i, s := range t.series {
if t.err != nil && i == len(t.series)/2 {
return t.err
}
if err := server.Send(s); err != nil {
return err
}
}
return nil
}

func (t *testStoreServer) LabelNames(_ context.Context, r *LabelNamesRequest) (*LabelNamesResponse, error) {
t.labelNamesLastReq = r
return t.labelNames, t.err
}

func (t *testStoreServer) LabelValues(_ context.Context, r *LabelValuesRequest) (*LabelValuesResponse, error) {
t.labelValuesLastReq = r
return t.labelValues, t.err
}

func TestServerAsClient(t *testing.T) {
for _, bufferSize := range []int{0, 1, 20, 100} {
t.Run(fmt.Sprintf("buffer=%v", bufferSize), func(t *testing.T) {
t.Run("Info", func(t *testing.T) {
s := &testStoreServer{
info: &InfoResponse{
LabelSets: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}}},
MinTime: -1,
MaxTime: 10,
StoreType: StoreType_DEBUG,
}}
t.Run("ok", func(t *testing.T) {
for i := 0; i < 20; i++ {
r := &InfoRequest{}
resp, err := ServerAsClient(s, 0).Info(context.TODO(), r)
testutil.Ok(t, err)
testutil.Equals(t, s.info, resp)
testutil.Equals(t, r, s.infoLastReq)
s.infoLastReq = nil
}
})
t.Run("error", func(t *testing.T) {
s.err = errors.New("some error")
for i := 0; i < 20; i++ {
r := &InfoRequest{}
_, err := ServerAsClient(s, 0).Info(context.TODO(), r)
testutil.NotOk(t, err)
testutil.Equals(t, s.err, err)
}
})
})
t.Run("Series", func(t *testing.T) {
s := &testStoreServer{
series: []*SeriesResponse{
NewSeriesResponse(&Series{
Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}},
Chunks: []AggrChunk{{MinTime: 123, MaxTime: 124}, {MinTime: 12455, MaxTime: 14124}},
}),
NewSeriesResponse(&Series{
Labels: []labelpb.ZLabel{{Name: "a", Value: "b1"}},
Chunks: []AggrChunk{{MinTime: 1231, MaxTime: 124}, {MinTime: 12455, MaxTime: 14124}},
}),
NewWarnSeriesResponse(errors.New("yolo")),
NewSeriesResponse(&Series{
Labels: []labelpb.ZLabel{{Name: "a", Value: "b3"}},
Chunks: []AggrChunk{{MinTime: 123, MaxTime: 124}, {MinTime: 124554, MaxTime: 14124}},
}),
}}
t.Run("ok", func(t *testing.T) {
for i := 0; i < 20; i++ {
r := &SeriesRequest{
MinTime: -214,
MaxTime: 213,
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
client, err := ServerAsClient(s, 0).Series(context.TODO(), r)
testutil.Ok(t, err)
var resps []*SeriesResponse
for {
resp, err := client.Recv()
if err == io.EOF {
break
}
testutil.Ok(t, err)
resps = append(resps, resp)
}
testutil.Equals(t, s.series, resps)
testutil.Equals(t, r, s.seriesLastReq)
s.seriesLastReq = nil
}
})
t.Run("ok, close send", func(t *testing.T) {
s.err = errors.New("some error")
for i := 0; i < 20; i++ {
r := &SeriesRequest{
MinTime: -214,
MaxTime: 213,
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
client, err := ServerAsClient(s, 0).Series(context.TODO(), r)
testutil.Ok(t, err)
var resps []*SeriesResponse
for {
if len(resps) == len(s.series)/2 {
testutil.Ok(t, client.CloseSend())
break
}
resp, err := client.Recv()
if err == io.EOF {
break
}
testutil.Ok(t, err)
resps = append(resps, resp)
}
testutil.Equals(t, s.series[:len(s.series)/2], resps)
testutil.Equals(t, r, s.seriesLastReq)
s.seriesLastReq = nil
}
})
t.Run("error", func(t *testing.T) {
for i := 0; i < 20; i++ {
r := &SeriesRequest{
MinTime: -214,
MaxTime: 213,
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
client, err := ServerAsClient(s, 0).Series(context.TODO(), r)
testutil.Ok(t, err)
var resps []*SeriesResponse
for {
resp, err := client.Recv()
if err == io.EOF {
break
}
if err == s.err {
break
}
testutil.Ok(t, err)
resps = append(resps, resp)
}
testutil.Equals(t, s.series[:len(s.series)/2], resps)
testutil.Equals(t, r, s.seriesLastReq)
s.seriesLastReq = nil
}
})
})
t.Run("LabelNames", func(t *testing.T) {
s := &testStoreServer{
info: &InfoResponse{
LabelSets: []labelpb.ZLabelSet{{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}}},
MinTime: -1,
MaxTime: 10,
StoreType: StoreType_DEBUG,
}}
t.Run("ok", func(t *testing.T) {
for i := 0; i < 20; i++ {
r := &LabelNamesRequest{
Start: -1,
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
resp, err := ServerAsClient(s, 0).LabelNames(context.TODO(), r)
testutil.Ok(t, err)
testutil.Equals(t, s.labelNames, resp)
testutil.Equals(t, r, s.labelNamesLastReq)
s.labelNamesLastReq = nil
}
})
t.Run("error", func(t *testing.T) {
s.err = errors.New("some error")
for i := 0; i < 20; i++ {
r := &LabelNamesRequest{
Start: -1,
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
_, err := ServerAsClient(s, 0).LabelNames(context.TODO(), r)
testutil.NotOk(t, err)
testutil.Equals(t, s.err, err)
}
})
})
t.Run("LabelValues", func(t *testing.T) {
s := &testStoreServer{
labelValues: &LabelValuesResponse{
Warnings: []string{"1", "a"},
Values: []string{"abc1", "go_goroutines"},
},
}
t.Run("ok", func(t *testing.T) {
for i := 0; i < 20; i++ {
r := &LabelValuesRequest{
Label: "__name__",
Start: -1,
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
resp, err := ServerAsClient(s, 0).LabelValues(context.TODO(), r)
testutil.Ok(t, err)
testutil.Equals(t, s.labelValues, resp)
testutil.Equals(t, r, s.labelValuesLastReq)
s.labelValuesLastReq = nil
}
})
t.Run("error", func(t *testing.T) {
s.err = errors.New("some error")
for i := 0; i < 20; i++ {
r := &LabelValuesRequest{
Label: "__name__",
Start: -1,
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
_, err := ServerAsClient(s, 0).LabelValues(context.TODO(), r)
testutil.NotOk(t, err)
testutil.Equals(t, s.err, err)
}
})
})
})
}
}

0 comments on commit 3c3f2e2

Please sign in to comment.