Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Replicator: Do not accept SetAckOffset before the cg extent is create…
Browse files Browse the repository at this point in the history
…d locally (#251)

* Replicator: Donot accept SetAckOffset before the cg extent is created

* cache the cg extent locally to reduce load on cassandra

* address comments
  • Loading branch information
datoug authored Jul 19, 2017
1 parent 9f74915 commit 98771ed
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
21 changes: 21 additions & 0 deletions services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
dconfig "github.com/uber/cherami-server/common/dconfigclient"
mm "github.com/uber/cherami-server/common/metadata"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/common/set"
storeStream "github.com/uber/cherami-server/stream"
"github.com/uber/cherami-thrift/.generated/go/admin"
"github.com/uber/cherami-thrift/.generated/go/metadata"
Expand Down Expand Up @@ -67,6 +68,8 @@ type (
storehostConn map[string]*outConnection
storehostConnMutex sync.RWMutex

knownCgExtents set.Set

metadataReconciler MetadataReconciler
}
)
Expand Down Expand Up @@ -127,6 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta
replicatorclientFactory: replicatorClientFactory,
remoteReplicatorConn: make(map[string]*outConnection),
storehostConn: make(map[string]*outConnection),
knownCgExtents: set.NewConcurrent(0),
}

r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger)
Expand Down Expand Up @@ -1102,6 +1106,23 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
})
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests)

if !r.knownCgExtents.Contains(request.GetExtentUUID()) {
// make sure the cg extent is created locally before accepting the SetAckOffset call.
// otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid
// and we may not be able to clean up the entry eventually.
_, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{
ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()),
ExtentUUID: common.StringPtr(request.GetExtentUUID()),
})
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}

r.knownCgExtents.Insert(request.GetExtentUUID())
}

err := r.metaClient.SetAckOffset(nil, request)
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`)
Expand Down
37 changes: 35 additions & 2 deletions services/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,13 +941,25 @@ func (s *ReplicatorSuite) TestCreateRemoteConsumerGroupExtentFailure() {

func (s *ReplicatorSuite) TestSetAckOffset() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
cgUUID := uuid.New()
extentUUID := uuid.New()
storeUUID := []string{uuid.New(), uuid.New(), uuid.New()}
ackLevel := int64(20)
req := &shared.SetAckOffsetRequest{
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
ConsumerGroupUUID: common.StringPtr(cgUUID),
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
}

s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{
Extent: &shared.ConsumerGroupExtent{
StoreUUIDs: storeUUID,
},
}, nil).Run(func(args mock.Arguments) {
req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest)
s.Equal(extentUUID, req.GetExtentUUID())
s.Equal(cgUUID, req.GetConsumerGroupUUID())
})
s.mockMeta.On("SetAckOffset", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
req := args.Get(1).(*shared.SetAckOffsetRequest)
s.Equal(extentUUID, req.GetExtentUUID())
Expand All @@ -958,6 +970,27 @@ func (s *ReplicatorSuite) TestSetAckOffset() {
s.mockMeta.AssertExpectations(s.T())
}

func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
cgUUID := uuid.New()
extentUUID := uuid.New()
ackLevel := int64(20)
req := &shared.SetAckOffsetRequest{
ConsumerGroupUUID: common.StringPtr(cgUUID),
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
}

s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(nil, &shared.InternalServiceError{Message: "test2"}).Run(func(args mock.Arguments) {
req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest)
s.Equal(extentUUID, req.GetExtentUUID())
s.Equal(cgUUID, req.GetConsumerGroupUUID())
})
err := repliator.SetAckOffset(nil, req)
s.Error(err)
s.mockMeta.AssertExpectations(s.T())
}

func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
extentUUID := uuid.New()
Expand Down

0 comments on commit 98771ed

Please sign in to comment.