Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Brett Jones <[email protected]>
  • Loading branch information
blockloop committed Jan 9, 2020
1 parent ea911f5 commit 0d039eb
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 123 deletions.
23 changes: 16 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
h.forwardRequestsTotal.WithLabelValues("success").Inc()
}()

cl, err := h.peers.Get(ctx, endpoint)
cl, err := h.peers.get(ctx, endpoint)
if err != nil {
level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint)
ec <- err
Expand All @@ -396,6 +396,8 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic
span, ctx := tracing.StartSpan(ctx, "thanos_receive_forward")
defer span.Finish()

// Actually make the request against the endpoint
// we determined should handle these time series.
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[endpoint].Timeseries,
Tenant: tenant,
Expand Down Expand Up @@ -516,29 +518,36 @@ func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup {
return &peerGroup{
dialOpts: dialOpts,
cache: map[string]storepb.WriteableStoreClient{},
m: sync.Mutex{},
m: sync.RWMutex{},
dialer: grpc.DialContext,
}
}

type peerGroup struct {
dialOpts []grpc.DialOption
cache map[string]storepb.WriteableStoreClient
m sync.Mutex
m sync.RWMutex

// dialer is used for testing.
dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
}

func (p *peerGroup) Get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) {
p.m.Lock()
defer p.m.Unlock()

func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) {
// use a RLock first to prevent blocking if we don't need to.
p.m.RLock()
c, ok := p.cache[addr]
p.m.RUnlock()
if ok {
return c, nil
}

p.m.Lock()
defer p.m.Unlock()
// Make sure that another caller hasn't created the connection since obtaining the write lock.
c, ok = p.cache[addr]
if ok {
return c, nil
}
conn, err := p.dialer(ctx, addr, p.dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "failed to dial peer")
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64)
// This removes the network from the tests and creates a more consistent testing harness.
peers := &peerGroup{
dialOpts: nil,
m: sync.Mutex{},
m: sync.RWMutex{},
cache: map[string]storepb.WriteableStoreClient{},
dialer: func(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error) {
// dialer should never be called since we are creating fake clients with fake addresses
Expand Down
224 changes: 112 additions & 112 deletions pkg/store/storepb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0d039eb

Please sign in to comment.