diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 4b49d198..7b372c0b 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -67,6 +67,9 @@ type ( storehostConn map[string]*outConnection storehostConnMutex sync.RWMutex + knownCgExtents map[string]struct{} + knownCgExtentsMutex sync.RWMutex + metadataReconciler MetadataReconciler } ) @@ -127,6 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta replicatorclientFactory: replicatorClientFactory, remoteReplicatorConn: make(map[string]*outConnection), storehostConn: make(map[string]*outConnection), + knownCgExtents: make(map[string]struct{}), } r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger) @@ -1102,26 +1106,37 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs }) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests) - // make sure the cg extent is created locally before accepting the SetAckOffset call. - // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid - // and we may not be able to clean up the entry eventually. - extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ - ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), - ExtentUUID: common.StringPtr(request.GetExtentUUID()), - }) - if err != nil { - lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`) - r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) - return err - } - if len(extent.GetExtent().GetStoreUUIDs()) < 1 { - err = fmt.Errorf(`empty store uuid from cg extent`) - lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) - r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) - return err + var cgExtentCreated bool + r.knownCgExtentsMutex.RLock() + _, cgExtentCreated = r.knownCgExtents[request.GetExtentUUID()] + r.knownCgExtentsMutex.RUnlock() + + if !cgExtentCreated { + // make sure the cg extent is created locally before accepting the SetAckOffset call. + // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid + // and we may not be able to clean up the entry eventually. + extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ + ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), + ExtentUUID: common.StringPtr(request.GetExtentUUID()), + }) + if err != nil { + lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + if len(extent.GetExtent().GetStoreUUIDs()) < 1 { + err = fmt.Errorf(`empty store uuid from cg extent`) + lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + + r.knownCgExtentsMutex.Lock() + r.knownCgExtents[request.GetExtentUUID()] = struct{}{} + r.knownCgExtentsMutex.Unlock() } - err = r.metaClient.SetAckOffset(nil, request) + err := r.metaClient.SetAckOffset(nil, request) if err != nil { lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures)