Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
datoug committed Jul 18, 2017
1 parent a57a9c6 commit 81a507a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 42 deletions.
25 changes: 6 additions & 19 deletions services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
dconfig "github.com/uber/cherami-server/common/dconfigclient"
mm "github.com/uber/cherami-server/common/metadata"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/common/set"
storeStream "github.com/uber/cherami-server/stream"
"github.com/uber/cherami-thrift/.generated/go/admin"
"github.com/uber/cherami-thrift/.generated/go/metadata"
Expand Down Expand Up @@ -67,8 +68,7 @@ type (
storehostConn map[string]*outConnection
storehostConnMutex sync.RWMutex

knownCgExtents map[string]struct{}
knownCgExtentsMutex sync.RWMutex
knownCgExtents set.Set

metadataReconciler MetadataReconciler
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta
replicatorclientFactory: replicatorClientFactory,
remoteReplicatorConn: make(map[string]*outConnection),
storehostConn: make(map[string]*outConnection),
knownCgExtents: make(map[string]struct{}),
knownCgExtents: set.NewConcurrent(0),
}

r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger)
Expand Down Expand Up @@ -1106,16 +1106,11 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
})
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests)

var cgExtentCreated bool
r.knownCgExtentsMutex.RLock()
_, cgExtentCreated = r.knownCgExtents[request.GetExtentUUID()]
r.knownCgExtentsMutex.RUnlock()

if !cgExtentCreated {
if !r.knownCgExtents.Contains(request.GetExtentUUID()) {
// make sure the cg extent is created locally before accepting the SetAckOffset call.
// otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid
// and we may not be able to clean up the entry eventually.
extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{
_, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{
ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()),
ExtentUUID: common.StringPtr(request.GetExtentUUID()),
})
Expand All @@ -1124,16 +1119,8 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}
if len(extent.GetExtent().GetStoreUUIDs()) < 1 {
err = fmt.Errorf(`empty store uuid from cg extent`)
lcllg.Error(`SetAckOffset: empty store uuid from cg extent`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}

r.knownCgExtentsMutex.Lock()
r.knownCgExtents[request.GetExtentUUID()] = struct{}{}
r.knownCgExtentsMutex.Unlock()
r.knownCgExtents.Insert(request.GetExtentUUID())
}

err := r.metaClient.SetAckOffset(nil, request)
Expand Down
23 changes: 0 additions & 23 deletions services/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,29 +991,6 @@ func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() {
s.mockMeta.AssertExpectations(s.T())
}

func (s *ReplicatorSuite) TestSetAckOffsetFailure_NoStoreUUID() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
cgUUID := uuid.New()
extentUUID := uuid.New()
ackLevel := int64(20)
req := &shared.SetAckOffsetRequest{
ConsumerGroupUUID: common.StringPtr(cgUUID),
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
}

s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{
Extent: &shared.ConsumerGroupExtent{},
}, nil).Run(func(args mock.Arguments) {
req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest)
s.Equal(extentUUID, req.GetExtentUUID())
s.Equal(cgUUID, req.GetConsumerGroupUUID())
})
err := repliator.SetAckOffset(nil, req)
s.Error(err)
s.mockMeta.AssertExpectations(s.T())
}

func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
extentUUID := uuid.New()
Expand Down

0 comments on commit 81a507a

Please sign in to comment.