Skip to content

Commit

Permalink
storepb: make ServerAsClient channels unbuffered
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored and jnyi committed Apr 4, 2024
1 parent efc378f commit ac0ca8f
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), 0),
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil)),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore {
}
cls[i] = &storetestutil.TestClient{
Name: fmt.Sprintf("%v", i),
StoreClient: storepb.ServerAsClient(s, 0),
StoreClient: storepb.ServerAsClient(s),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
WithoutReplicaLabelsEnabled: withoutReplicaLabelsEnabled,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestQuerier_Proxy(t *testing.T) {
// TODO(bwplotka): Parse external labels.
clients = append(clients, &storetestutil.TestClient{
Name: fmt.Sprintf("store number %v", i),
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0),
StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt)),
MinTime: st.mint,
MaxTime: st.maxt,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (t *tenant) client(logger log.Logger) store.Client {
return nil
}

client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0)
client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore))
return newLocalClient(client, tsdbStore)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,8 @@ func TestProxyStore_Acceptance(t *testing.T) {
p2 := startNestedStore(tt, extLset, appendFn)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1, 0)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2, 0)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2)},
}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
Expand Down
9 changes: 4 additions & 5 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (
"google.golang.org/grpc"
)

func ServerAsClient(srv StoreServer, clientReceiveBufferSize int) StoreClient {
return &serverAsClient{srv: srv, clientReceiveBufferSize: clientReceiveBufferSize}
func ServerAsClient(srv StoreServer) StoreClient {
return &serverAsClient{srv: srv}
}

// serverAsClient allows to use servers as clients.
// NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private).
type serverAsClient struct {
clientReceiveBufferSize int
srv StoreServer
srv StoreServer
}

func (s serverAsClient) Info(ctx context.Context, in *InfoRequest, _ ...grpc.CallOption) (*InfoResponse, error) {
Expand All @@ -34,7 +33,7 @@ func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest,
}

func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) {
inSrv := &inProcessStream{recv: make(chan *SeriesResponse, s.clientReceiveBufferSize), err: make(chan error)}
inSrv := &inProcessStream{recv: make(chan *SeriesResponse), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
inSrv.err <- s.srv.Series(in, inSrv)
Expand Down
19 changes: 10 additions & 9 deletions pkg/store/storepb/inprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (t *testStoreServer) LabelValues(_ context.Context, r *LabelValuesRequest)
}

func TestServerAsClient(t *testing.T) {
ctx := context.Background()
for _, bufferSize := range []int{0, 1, 20, 100} {
t.Run(fmt.Sprintf("buffer=%v", bufferSize), func(t *testing.T) {
t.Run("Info", func(t *testing.T) {
Expand All @@ -72,7 +73,7 @@ func TestServerAsClient(t *testing.T) {
t.Run("ok", func(t *testing.T) {
for i := 0; i < 20; i++ {
r := &InfoRequest{}
resp, err := ServerAsClient(s, 0).Info(context.TODO(), r)
resp, err := ServerAsClient(s).Info(ctx, r)
testutil.Ok(t, err)
testutil.Equals(t, s.info, resp)
testutil.Equals(t, r, s.infoLastReq)
Expand All @@ -83,7 +84,7 @@ func TestServerAsClient(t *testing.T) {
s.err = errors.New("some error")
for i := 0; i < 20; i++ {
r := &InfoRequest{}
_, err := ServerAsClient(s, 0).Info(context.TODO(), r)
_, err := ServerAsClient(s).Info(ctx, r)
testutil.NotOk(t, err)
testutil.Equals(t, s.err, err)
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestServerAsClient(t *testing.T) {
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
client, err := ServerAsClient(s, 0).Series(context.TODO(), r)
client, err := ServerAsClient(s).Series(ctx, r)
testutil.Ok(t, err)
var resps []*SeriesResponse
for {
Expand All @@ -139,7 +140,7 @@ func TestServerAsClient(t *testing.T) {
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
client, err := ServerAsClient(s, 0).Series(context.TODO(), r)
client, err := ServerAsClient(s).Series(ctx, r)
testutil.Ok(t, err)
var resps []*SeriesResponse
for {
Expand Down Expand Up @@ -167,7 +168,7 @@ func TestServerAsClient(t *testing.T) {
Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}},
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
client, err := ServerAsClient(s, 0).Series(context.TODO(), r)
client, err := ServerAsClient(s).Series(ctx, r)
testutil.Ok(t, err)
var resps []*SeriesResponse
for {
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestServerAsClient(t *testing.T) {
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
resp, err := ServerAsClient(s, 0).LabelNames(context.TODO(), r)
resp, err := ServerAsClient(s).LabelNames(ctx, r)
testutil.Ok(t, err)
testutil.Equals(t, s.labelNames, resp)
testutil.Equals(t, r, s.labelNamesLastReq)
Expand All @@ -217,7 +218,7 @@ func TestServerAsClient(t *testing.T) {
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
_, err := ServerAsClient(s, 0).LabelNames(context.TODO(), r)
_, err := ServerAsClient(s).LabelNames(ctx, r)
testutil.NotOk(t, err)
testutil.Equals(t, s.err, err)
}
Expand All @@ -238,7 +239,7 @@ func TestServerAsClient(t *testing.T) {
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
resp, err := ServerAsClient(s, 0).LabelValues(context.TODO(), r)
resp, err := ServerAsClient(s).LabelValues(ctx, r)
testutil.Ok(t, err)
testutil.Equals(t, s.labelValues, resp)
testutil.Equals(t, r, s.labelValuesLastReq)
Expand All @@ -254,7 +255,7 @@ func TestServerAsClient(t *testing.T) {
End: 234,
PartialResponseStrategy: PartialResponseStrategy_ABORT,
}
_, err := ServerAsClient(s, 0).LabelValues(context.TODO(), r)
_, err := ServerAsClient(s).LabelValues(ctx, r)
testutil.NotOk(t, err)
testutil.Equals(t, s.err, err)
}
Expand Down

0 comments on commit ac0ca8f

Please sign in to comment.