Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Replicator: Do not accept SetAckOffset before the cg extent is created locally #251

Merged
merged 3 commits into from
Jul 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions services/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
dconfig "github.com/uber/cherami-server/common/dconfigclient"
mm "github.com/uber/cherami-server/common/metadata"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/common/set"
storeStream "github.com/uber/cherami-server/stream"
"github.com/uber/cherami-thrift/.generated/go/admin"
"github.com/uber/cherami-thrift/.generated/go/metadata"
Expand Down Expand Up @@ -67,6 +68,8 @@ type (
storehostConn map[string]*outConnection
storehostConnMutex sync.RWMutex

knownCgExtents set.Set

metadataReconciler MetadataReconciler
}
)
Expand Down Expand Up @@ -127,6 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta
replicatorclientFactory: replicatorClientFactory,
remoteReplicatorConn: make(map[string]*outConnection),
storehostConn: make(map[string]*outConnection),
knownCgExtents: set.NewConcurrent(0),
}

r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger)
Expand Down Expand Up @@ -1102,6 +1106,23 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs
})
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests)

if !r.knownCgExtents.Contains(request.GetExtentUUID()) {
// make sure the cg extent is created locally before accepting the SetAckOffset call.
// otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid
// and we may not be able to clean up the entry eventually.
_, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{
ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()),
ExtentUUID: common.StringPtr(request.GetExtentUUID()),
})
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`)
r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)
return err
}

r.knownCgExtents.Insert(request.GetExtentUUID())
}

err := r.metaClient.SetAckOffset(nil, request)
if err != nil {
lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`)
Expand Down
37 changes: 35 additions & 2 deletions services/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,13 +941,25 @@ func (s *ReplicatorSuite) TestCreateRemoteConsumerGroupExtentFailure() {

func (s *ReplicatorSuite) TestSetAckOffset() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
cgUUID := uuid.New()
extentUUID := uuid.New()
storeUUID := []string{uuid.New(), uuid.New(), uuid.New()}
ackLevel := int64(20)
req := &shared.SetAckOffsetRequest{
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
ConsumerGroupUUID: common.StringPtr(cgUUID),
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
}

s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{
Extent: &shared.ConsumerGroupExtent{
StoreUUIDs: storeUUID,
},
}, nil).Run(func(args mock.Arguments) {
req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest)
s.Equal(extentUUID, req.GetExtentUUID())
s.Equal(cgUUID, req.GetConsumerGroupUUID())
})
s.mockMeta.On("SetAckOffset", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
req := args.Get(1).(*shared.SetAckOffsetRequest)
s.Equal(extentUUID, req.GetExtentUUID())
Expand All @@ -958,6 +970,27 @@ func (s *ReplicatorSuite) TestSetAckOffset() {
s.mockMeta.AssertExpectations(s.T())
}

func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
cgUUID := uuid.New()
extentUUID := uuid.New()
ackLevel := int64(20)
req := &shared.SetAckOffsetRequest{
ConsumerGroupUUID: common.StringPtr(cgUUID),
ExtentUUID: common.StringPtr(extentUUID),
AckLevelAddress: common.Int64Ptr(ackLevel),
}

s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(nil, &shared.InternalServiceError{Message: "test2"}).Run(func(args mock.Arguments) {
req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest)
s.Equal(extentUUID, req.GetExtentUUID())
s.Equal(cgUUID, req.GetConsumerGroupUUID())
})
err := repliator.SetAckOffset(nil, req)
s.Error(err)
s.mockMeta.AssertExpectations(s.T())
}

func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() {
repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg)
extentUUID := uuid.New()
Expand Down