diff --git a/services/outputhost/cgcache.go b/services/outputhost/cgcache.go index b0c44c7b..4d2cfba5 100644 --- a/services/outputhost/cgcache.go +++ b/services/outputhost/cgcache.go @@ -312,9 +312,9 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType destUUID: cgCache.cachedCGDesc.GetDestinationUUID(), destType: destType, storeUUIDs: cge.StoreUUIDs, - startFrom: cgCache.cachedCGDesc.GetStartFrom(), - skipOlder: cgCache.cachedCGDesc.GetSkipOlderMessagesSeconds(), - delay: cgCache.cachedCGDesc.GetDelaySeconds(), + startFrom: time.Unix(0, cgCache.cachedCGDesc.GetStartFrom()), + skipOlder: time.Duration(int64(cgCache.cachedCGDesc.GetSkipOlderMessagesSeconds()) * int64(time.Second)), + delay: time.Duration(int64(cgCache.cachedCGDesc.GetDelaySeconds()) * int64(time.Second)), notifyReplicaCloseCh: make(chan error, 5), closeChannel: make(chan struct{}), waitConsumedCh: make(chan bool, 1), @@ -392,7 +392,6 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType cgCache.cachedCGDesc.GetConsumerGroupName(), cgCache.kafkaCluster, cgCache.kafkaTopics, - common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()), cgCache.metaClient, cge, cgCache.consumerM3Client, diff --git a/services/outputhost/extcache.go b/services/outputhost/extcache.go index 1a1d1a0f..55149586 100644 --- a/services/outputhost/extcache.go +++ b/services/outputhost/extcache.go @@ -86,14 +86,14 @@ type extentCache struct { // numExtents is the total number of extents within the CG. this is used to determine the initial credits numExtents int - // startFrom is the offset to start from - startFrom int64 + // startFrom is the time to start-from + startFrom time.Time // skipOlder indicates that the CG wants to skip any messages older than this value, in seconds - skipOlder int32 + skipOlder time.Duration // delay indicates that the CG wants to delay every message by the specified value, in seconds - delay int32 + delay time.Duration // msgsCh is the channel where we write the message to the client as we read from replica msgsCh chan<- *cherami.ConsumerMessage @@ -161,7 +161,7 @@ var kafkaLogSetup sync.Once const extentLoadReportingInterval = 2 * time.Second // kafkaDefaultRetention is the default value of log.retention.hours in the Kafka system -const kafkaDefaultRetention = common.UnixNanoTime(time.Hour * 24 * 7) +const kafkaDefaultRetention = time.Hour * 24 * 7 func (extCache *extentCache) load( outputHostUUID, @@ -169,7 +169,6 @@ func (extCache *extentCache) load( cgName, kafkaCluster string, kafkaTopics []string, - startFrom common.UnixNanoTime, metaClient metadata.TChanMetadataService, cge *shared.ConsumerGroupExtent, metricsClient metrics.Client, @@ -186,7 +185,7 @@ func (extCache *extentCache) load( if common.IsKafkaConsumerGroupExtent(cge) { extCache.connectedStoreUUID = kafkaConnectedStoreUUID - extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics, metricsClient) + extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, kafkaCluster, kafkaTopics, metricsClient) } else { extCache.connection, extCache.pickedIndex, err = extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs))) @@ -237,15 +236,28 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence // First try to start from the already set offset in metadata if startAddress == 0 { - // If consumer group wants to start from a timestamp, get the address from the store - startFrom := extCache.startFrom + // If consumer group wants to start from a timestamp, get the address from the store. + // Note: we take into account any 'delay' that is associated with the CG and the 'skip-older' + // time that was specified, by offsetting the time appropriately. we apply the 'start-from' + // and 'skip-older' to the "visibility time" (enqueue-time + delay) of the message as opposed + // to the enqueue-time. // TODO: if the consumer group wants to start from the beginning, we should still calculate the earliest address // that they can get. To do otherwise means that will will get spurious 'skipped messages' warnings // NOTE: audit will have to handle an uneven 'virtual startFrom' across all of the extents that a zero startFrom // consumer group is reading // NOTE: there is a race between consumption and retention here! - if startFrom > 0 { + if extCache.startFrom.UnixNano() > 0 || extCache.skipOlder > 0 { + + var startFrom int64 + + // compute start-from as the max(skip-older-time, start-from), adjusting for 'delay', if any + if extCache.skipOlder > 0 && time.Now().Add(-extCache.skipOlder).After(extCache.startFrom) { + startFrom = time.Now().Add(-extCache.skipOlder).Add(-extCache.delay).UnixNano() + } else { + startFrom = extCache.startFrom.Add(-extCache.delay).UnixNano() + } + // GetAddressFromTimestamp() from the store using startFrom // use a tmp context whose timeout is shorter tmpCtx, cancelGAFT := thrift.NewContext(getAddressCtxTimeout) @@ -338,7 +350,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence func (extCache *extentCache) loadKafkaStream( cgName string, outputHostUUID string, - startFrom common.UnixNanoTime, kafkaCluster string, kafkaTopics []string, metricsClient metrics.Client, @@ -366,7 +377,8 @@ func (extCache *extentCache) loadKafkaStream( // TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request // to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster // starts from the right place - if common.Now()-startFrom > kafkaDefaultRetention/2 { + + if time.Now().Sub(extCache.startFrom) > kafkaDefaultRetention/2 { cfg.Config.Consumer.Offsets.Initial = sarama.OffsetOldest } diff --git a/services/outputhost/outputhost_test.go b/services/outputhost/outputhost_test.go index 54886046..3b5d5f06 100644 --- a/services/outputhost/outputhost_test.go +++ b/services/outputhost/outputhost_test.go @@ -1141,3 +1141,338 @@ func (s *OutputHostSuite) TestOutputHostReplicaRollover() { s.Equal(conn.sentMsgs, conn.sentToMsgCache, "sentMsgs != sentToMsgCache") outputHost.Shutdown() } + +func (s *OutputHostSuite) TestOutputHostSkipOlder() { + + count := 60 + skipCount := 20 + nonSkipCount := count - skipCount + skipOlder := 60 * time.Second + + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) + httpRequest := utilGetHTTPRequestWithPath("foo") + + destUUID := uuid.New() + destDesc := shared.NewDestinationDescription() + destDesc.Path = common.StringPtr("/foo/bar") + destDesc.DestinationUUID = common.StringPtr(destUUID) + destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) + s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Once() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) + + cgDesc := shared.NewConsumerGroupDescription() + cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) + cgDesc.DestinationUUID = common.StringPtr(destUUID) + cgDesc.SkipOlderMessagesSeconds = common.Int32Ptr(int32(skipOlder.Seconds())) + s.mockMeta.On("ReadConsumerGroup", mock.Anything, mock.Anything).Return(cgDesc, nil).Twice() + + cgExt := shared.NewConsumerGroupExtent() + cgExt.ExtentUUID = common.StringPtr(uuid.New()) + cgExt.StoreUUIDs = []string{"mock"} + + cgRes := &shared.ReadConsumerGroupExtentsResult_{} + cgRes.Extents = append(cgRes.Extents, cgExt) + s.mockMeta.On("ReadConsumerGroupExtents", mock.Anything, mock.Anything).Return(cgRes, nil).Once() + s.mockRead.On("Write", mock.Anything).Return(nil) + + s.mockStore.On("GetAddressFromTimestamp", mock.Anything, mock.Anything).Once().Return(&store.GetAddressFromTimestampResult_{ + Address: common.Int64Ptr(1234000000), + SequenceNumber: common.Int64Ptr(1), + Sealed: common.BoolPtr(false), + }, nil).Run(func(args mock.Arguments) { + + req := args.Get(1).(*store.GetAddressFromTimestampRequest) + s.True(req.GetTimestamp() <= time.Now().Add(-skipOlder).UnixNano()) + }) + + // set enqueue times older that 'now - skip-older' + tSkipOlder := time.Now().UnixNano() - int64(skipOlder) + + writeDoneCh := make(chan struct{}) + + var recvMsgs int + s.mockCons.On("Write", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + + ohc := args.Get(0).(*cherami.OutputHostCommand) + + if ohc.GetType() == cherami.OutputHostCommandType_MESSAGE { + + recvMsgs++ + msg := ohc.GetMessage() + tEnq := msg.GetEnqueueTimeUtc() + s.True(tEnq == 0 || tEnq > tSkipOlder) // ensure all messages are strictly over the skip-older threshold + + if recvMsgs == nonSkipCount { + close(writeDoneCh) + } + } + + }).Times(count) + + cFlow := cherami.NewControlFlow() + cFlow.Credits = common.Int32Ptr(int32(count)) + + connOpenedCh := make(chan struct{}) + s.mockCons.On("Read").Return(cFlow, nil).Once().Run(func(args mock.Arguments) { close(connOpenedCh) }) + + rmc := store.NewReadMessageContent() + rmc.Type = store.ReadMessageContentTypePtr(store.ReadMessageContentType_MESSAGE) + + var seqnum int64 + + // send 'skipCount' messages older than tSkipOlder + s.mockRead.On("Read").Return(rmc, nil).Run(func(args mock.Arguments) { + + seqnum++ + aMsg := store.NewAppendMessage() + aMsg.SequenceNumber = common.Int64Ptr(seqnum) + aMsg.EnqueueTimeUtc = common.Int64Ptr(tSkipOlder - (int64(skipCount)-seqnum)*int64(time.Second)) + pMsg := cherami.NewPutMessage() + pMsg.ID = common.StringPtr(strconv.Itoa(int(seqnum))) + pMsg.Data = []byte(fmt.Sprintf("seqnum=%d", seqnum)) + aMsg.Payload = pMsg + rMsg := store.NewReadMessage() + rMsg.Message = aMsg + rMsg.Address = common.Int64Ptr(1234000000 + seqnum) + rmc.Message = rMsg + + }).Times(skipCount) + + // send 'nonSkipCount - 1' messages that are current + s.mockRead.On("Read").Return(rmc, nil).Run(func(args mock.Arguments) { + + seqnum++ + aMsg := store.NewAppendMessage() + aMsg.SequenceNumber = common.Int64Ptr(seqnum) + aMsg.EnqueueTimeUtc = common.Int64Ptr(time.Now().UnixNano()) + pMsg := cherami.NewPutMessage() + pMsg.ID = common.StringPtr(strconv.Itoa(int(seqnum))) + pMsg.Data = []byte(fmt.Sprintf("seqnum=%d", seqnum)) + aMsg.Payload = pMsg + rMsg := store.NewReadMessage() + rMsg.Message = aMsg + rMsg.Address = common.Int64Ptr(1234000000 + seqnum) + rmc.Message = rMsg + + }).Times(nonSkipCount - 1) + + // send one message with no enqueue-time (that we should receive) + s.mockRead.On("Read").Return(rmc, nil).Run(func(args mock.Arguments) { + + seqnum++ + aMsg := store.NewAppendMessage() + aMsg.SequenceNumber = common.Int64Ptr(seqnum) + // aMsg.EnqueueTimeUtc = common.Int64Ptr(tSkipOlder - (int64(skipCount)-seqnum)*int64(time.Second)) + pMsg := cherami.NewPutMessage() + pMsg.ID = common.StringPtr(strconv.Itoa(int(seqnum))) + pMsg.Data = []byte(fmt.Sprintf("seqnum=%d", seqnum)) + aMsg.Payload = pMsg + rMsg := store.NewReadMessage() + rMsg.Message = aMsg + rMsg.Address = common.Int64Ptr(1234000000 + seqnum) + rmc.Message = rMsg + + }).Times(1) + + s.mockRead.On("Read").Return(&store.ReadMessageContent{ + Type: store.ReadMessageContentTypePtr(store.ReadMessageContentType_SEALED), + Sealed: store.NewExtentSealedError(), + }, nil).Once() + + streamDoneCh := make(chan struct{}) + go func() { + outputHost.OpenConsumerStreamHandler(s.mockHTTPResponse, httpRequest) + close(streamDoneCh) + }() + + // close the read stream + creditUnblockCh := make(chan struct{}) + s.mockRead.On("Read").Return(nil, io.EOF) + s.mockCons.On("Read").Return(nil, io.EOF).Run(func(args mock.Arguments) { + <-writeDoneCh + <-creditUnblockCh + }) + + <-connOpenedCh // wait for the consConnection to open + + // look up cgcache and the underlying client connnection + outputHost.cgMutex.RLock() + cgCache, ok := outputHost.cgCache[cgDesc.GetConsumerGroupUUID()] + outputHost.cgMutex.RUnlock() + s.True(ok, "cannot find cgcache entry") + + var nConns = 0 + var conn *consConnection + cgCache.extMutex.RLock() + for _, conn = range cgCache.connections { + break + } + nConns = len(cgCache.connections) + cgCache.extMutex.RUnlock() + s.Equal(1, nConns, "wrong number of consumer connections") + s.NotNil(conn, "failed to find consConnection within cgcache") + + creditUnblockCh <- struct{}{} // now unblock the readCreditsPump on the consconnection + <-streamDoneCh + + s.mockHTTPResponse.AssertNotCalled(s.T(), "WriteHeader", mock.Anything) + s.Equal(int64(nonSkipCount), conn.sentMsgs, "wrong sentMsgs count") + s.Equal(conn.sentMsgs, conn.sentToMsgCache, "sentMsgs != sentToMsgCache") + outputHost.Shutdown() +} + +func (s *OutputHostSuite) TestOutputHostDelay() { + + count := 60 + delay := 50 * time.Second + skipOlder := 20 * time.Second + + skipCount := 20 + nonSkipCount := count - skipCount + + startFrom := time.Now() + startFromExpected := startFrom.Add(-delay) + + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) + httpRequest := utilGetHTTPRequestWithPath("foo") + + destUUID := uuid.New() + destDesc := shared.NewDestinationDescription() + destDesc.Path = common.StringPtr("/foo/bar") + destDesc.DestinationUUID = common.StringPtr(destUUID) + destDesc.Status = common.InternalDestinationStatusPtr(shared.DestinationStatus_ENABLED) + s.mockMeta.On("ReadDestination", mock.Anything, mock.Anything).Return(destDesc, nil).Once() + s.mockMeta.On("ReadExtentStats", mock.Anything, mock.Anything).Return(nil, fmt.Errorf(`foo`)) + + cgDesc := shared.NewConsumerGroupDescription() + cgDesc.ConsumerGroupUUID = common.StringPtr(uuid.New()) + cgDesc.DestinationUUID = common.StringPtr(destUUID) + cgDesc.DelaySeconds = common.Int32Ptr(int32(delay.Seconds())) + cgDesc.SkipOlderMessagesSeconds = common.Int32Ptr(int32(skipOlder.Seconds())) + cgDesc.StartFrom = common.Int64Ptr(startFrom.UnixNano()) + s.mockMeta.On("ReadConsumerGroup", mock.Anything, mock.Anything).Return(cgDesc, nil).Twice() + + cgExt := shared.NewConsumerGroupExtent() + cgExt.ExtentUUID = common.StringPtr(uuid.New()) + cgExt.StoreUUIDs = []string{"mock"} + + cgRes := &shared.ReadConsumerGroupExtentsResult_{} + cgRes.Extents = append(cgRes.Extents, cgExt) + s.mockMeta.On("ReadConsumerGroupExtents", mock.Anything, mock.Anything).Return(cgRes, nil).Once() + s.mockRead.On("Write", mock.Anything).Return(nil) + + s.mockStore.On("GetAddressFromTimestamp", mock.Anything, mock.Anything).Once().Return(&store.GetAddressFromTimestampResult_{ + Address: common.Int64Ptr(1234000000), + SequenceNumber: common.Int64Ptr(1), + Sealed: common.BoolPtr(false), + }, nil).Run(func(args mock.Arguments) { + + req := args.Get(1).(*store.GetAddressFromTimestampRequest) + s.Equal(req.GetTimestamp(), startFromExpected.UnixNano()) + }) + + tSkipOlder := time.Now().Add(-skipOlder) + tVisible := time.Now().Add(-skipOlder).Add(-delay) + + writeDoneCh := make(chan struct{}) + + var recvMsgs int + s.mockCons.On("Write", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + + ohc := args.Get(0).(*cherami.OutputHostCommand) + + if ohc.GetType() == cherami.OutputHostCommandType_MESSAGE { + + recvMsgs++ + msg := ohc.GetMessage() + + // compute visible time + msgVisibilityTime := time.Unix(0, msg.GetEnqueueTimeUtc()).Add(delay) + + s.True(tSkipOlder.Before(msgVisibilityTime)) // message should not have been skipped + s.True(time.Now().After(msgVisibilityTime)) // message is expected to be 'visible' + + if recvMsgs == nonSkipCount { + close(writeDoneCh) + } + } + + }).Times(count) + + cFlow := cherami.NewControlFlow() + cFlow.Credits = common.Int32Ptr(int32(count)) + + connOpenedCh := make(chan struct{}) + s.mockCons.On("Read").Return(cFlow, nil).Once().Run(func(args mock.Arguments) { close(connOpenedCh) }) + + rmc := store.NewReadMessageContent() + rmc.Type = store.ReadMessageContentTypePtr(store.ReadMessageContentType_MESSAGE) + + var seqnum int64 + + s.mockRead.On("Read").Return(rmc, nil).Run(func(args mock.Arguments) { + + seqnum++ + aMsg := store.NewAppendMessage() + aMsg.SequenceNumber = common.Int64Ptr(seqnum) + // set the enqueue-time such that 'visibleCount' messages are readily visible (ie, past delay) + // while the rest get visible one every 100ms + aMsg.EnqueueTimeUtc = common.Int64Ptr(tVisible.UnixNano() - (int64(skipCount)-seqnum)*int64(100*time.Millisecond)) + + pMsg := cherami.NewPutMessage() + pMsg.ID = common.StringPtr(strconv.Itoa(int(seqnum))) + pMsg.Data = []byte(fmt.Sprintf("seqnum=%d", seqnum)) + aMsg.Payload = pMsg + rMsg := store.NewReadMessage() + rMsg.Message = aMsg + rMsg.Address = common.Int64Ptr(1234000000 + seqnum) + rmc.Message = rMsg + + }).Times(count) + + s.mockRead.On("Read").Return(&store.ReadMessageContent{ + Type: store.ReadMessageContentTypePtr(store.ReadMessageContentType_SEALED), + Sealed: store.NewExtentSealedError(), + }, nil).Once() + + streamDoneCh := make(chan struct{}) + go func() { + outputHost.OpenConsumerStreamHandler(s.mockHTTPResponse, httpRequest) + close(streamDoneCh) + }() + + // close the read stream + creditUnblockCh := make(chan struct{}) + s.mockRead.On("Read").Return(nil, io.EOF) + s.mockCons.On("Read").Return(nil, io.EOF).Run(func(args mock.Arguments) { + <-writeDoneCh + <-creditUnblockCh + }) + + <-connOpenedCh // wait for the consConnection to open + + // look up cgcache and the underlying client connnection + outputHost.cgMutex.RLock() + cgCache, ok := outputHost.cgCache[cgDesc.GetConsumerGroupUUID()] + outputHost.cgMutex.RUnlock() + s.True(ok, "cannot find cgcache entry") + + var nConns = 0 + var conn *consConnection + cgCache.extMutex.RLock() + for _, conn = range cgCache.connections { + break + } + nConns = len(cgCache.connections) + cgCache.extMutex.RUnlock() + s.Equal(1, nConns, "wrong number of consumer connections") + s.NotNil(conn, "failed to find consConnection within cgcache") + + creditUnblockCh <- struct{}{} // now unblock the readCreditsPump on the consconnection + <-streamDoneCh + + s.mockHTTPResponse.AssertNotCalled(s.T(), "WriteHeader", mock.Anything) + s.Equal(int64(nonSkipCount), conn.sentMsgs, "wrong sentMsgs count") + s.Equal(conn.sentMsgs, conn.sentToMsgCache, "sentMsgs != sentToMsgCache") + outputHost.Shutdown() +} diff --git a/services/outputhost/replicaconnection.go b/services/outputhost/replicaconnection.go index 9f8d7922..6832d6ef 100644 --- a/services/outputhost/replicaconnection.go +++ b/services/outputhost/replicaconnection.go @@ -71,8 +71,9 @@ type ( // one message at a time, and read on close after the pumps // are done; therefore, these do not require to be protected // for concurrent access - sentCreds int32 // total credits sent - recvMsgs int32 // total messages received + sentCreds int32 // total credits sent + recvMsgs int32 // total messages received + skipOlderMsgs int32 // messages skipped // consumerM3Client for metrics per consumer group consumerM3Client metrics.Client @@ -112,7 +113,11 @@ func (conn *replicaConnection) open() { go conn.writeCreditsPump() conn.opened = true - conn.logger.Info("replConn opened") + conn.logger.WithFields(bark.Fields{ + `delay`: conn.extCache.delay, + `skipOlder`: conn.extCache.skipOlder, + `startingSequence`: conn.startingSequence, + }).Info("replConn opened") } } @@ -132,8 +137,9 @@ func (conn *replicaConnection) close(err error) { } conn.logger.WithFields(bark.Fields{ - `sentCreds`: conn.sentCreds, - `recvMsgs`: conn.recvMsgs, + `sentCreds`: conn.sentCreds, + `recvMsgs`: conn.recvMsgs, + `skipOlderMsgs`: conn.skipOlderMsgs, }).Info("replConn closed") } conn.lk.Unlock() @@ -186,124 +192,176 @@ func (conn *replicaConnection) readMessagesPump() { // monotonically increasing var lastSeqNum = int64(conn.startingSequence) + var skipOlderNanos = int64(conn.extCache.skipOlder) + var delayNanos = int64(conn.extCache.delay) + var delayTimer = common.NewTimer(time.Hour) +loop: for { - select { - default: - if localReadMsgs >= minReadBatchSize { - // Issue more credits - select { - case conn.readMsgsCh <- int32(localReadMsgs): - localReadMsgs = 0 - default: - // if we are unable to renew credits at this time accumulate it - conn.logger.WithField(`credits`, localReadMsgs). - Debug("readMessagesPump: blocked sending credits; accumulating credits to send later") - } + if localReadMsgs >= minReadBatchSize { + // Issue more credits + select { + case conn.readMsgsCh <- int32(localReadMsgs): + localReadMsgs = 0 + default: + // if we are unable to renew credits at this time accumulate it + conn.logger.WithField(`credits`, localReadMsgs). + Debug("readMessagesPump: blocked sending credits; accumulating credits to send later") } + } - rmc, err := conn.call.Read() - if err != nil { - // any error here means our stream is done. close the connection - conn.logger.WithField(common.TagErr, err).Error(`Error reading msg from store`) - go conn.close(err) - <-conn.closeChannel - return + rmc, err := conn.call.Read() + if err != nil { + // any error here means our stream is done. close the connection + conn.logger.WithField(common.TagErr, err).Error(`Error reading msg from store`) + go conn.close(err) + <-conn.closeChannel + return + } + + hb.Beat() + + switch rmc.GetType() { + case store.ReadMessageContentType_MESSAGE: + + msg := rmc.GetMessage() + msgSeqNum := common.SequenceNumber(msg.Message.GetSequenceNumber()) + msgAddr := msg.GetAddress() + visibilityTime := msg.Message.GetEnqueueTimeUtc() + + // XXX: Sequence number check to make sure we get monotonically increasing + // sequence number. + // We just log and move forward + // XXX: Note we skip the first message check here because we can start from + // a bigger sequence number in case of restarts + if conn.extCache.destType == shared.DestinationType_TIMER { + + // T471157 For timers, do not signal discontinuities to ack manager, since discontinuities are frequent + msgSeqNum = 0 + + } else if lastSeqNum+1 != int64(msgSeqNum) { + + // FIXME: add metric to help alert this case + expectedSeqNum := 1 + lastSeqNum + skippedMessages := int64(msgSeqNum) - lastSeqNum + + conn.logger.WithFields(bark.Fields{ + "msgSeqNum": msgSeqNum, + "expectedSeqNum": expectedSeqNum, + "skippedMessages": skippedMessages, + }).Error("sequence number out of order") } - hb.Beat() + // update the lastSeqNum to this value + lastSeqNum = msg.Message.GetSequenceNumber() - switch rmc.GetType() { - case store.ReadMessageContentType_MESSAGE: + // convert this to an outMessage + cMsg := cherami.NewConsumerMessage() + cMsg.EnqueueTimeUtc = msg.Message.EnqueueTimeUtc + cMsg.Payload = msg.Message.Payload - msg := rmc.GetMessage() - msgSeqNum := common.SequenceNumber(msg.Message.GetSequenceNumber()) + cMsg.AckId = common.StringPtr(conn.extCache.ackMgr.getNextAckID(storeHostAddress(msgAddr), msgSeqNum)) - // XXX: Sequence number check to make sure we get monotonically increasing - // sequence number. - // We just log and move forward - // XXX: Note we skip the first message check here because we can start from - // a bigger sequence number in case of restarts - if conn.extCache.destType != shared.DestinationType_TIMER { - if lastSeqNum+1 != int64(msgSeqNum) { - // FIXME: add metric to help alert this case - expectedSeqNum := 1 + lastSeqNum - skippedMessages := int64(msgSeqNum) - lastSeqNum + if conn.extCache.destType != shared.DestinationType_TIMER && visibilityTime > 0 { - conn.logger.WithFields(bark.Fields{ - "msgSeqNum": msgSeqNum, - "expectedSeqNum": expectedSeqNum, - "skippedMessages": skippedMessages, - }).Error("sequence number out of order") - } - } else { - // T471157 For timers, do not signal discontinuities to ack manager, since discontinuities are frequent - msgSeqNum = 0 + // offset visibilityTime by the specified delay, if any + if delayNanos > 0 { + visibilityTime += delayNanos } - // update the lastSeqNum to this value - lastSeqNum = msg.Message.GetSequenceNumber() + now := time.Now().UnixNano() - // convert this to an outMessage - cMsg := cherami.NewConsumerMessage() - cMsg.EnqueueTimeUtc = msg.Message.EnqueueTimeUtc - cMsg.Payload = msg.Message.Payload + // check if the messages have already 'expired'; ie, if it falls outside the + // skip-older window. ignore (ie, don't skip any messages), if skip-older is '0'. + // NB: messages coming from KFC may not have enqueue-time set; the logic below + // ignores (ie, does not skip) messages that have no/zero enqueue-time. + if skipOlderNanos > 0 && (visibilityTime < (now - skipOlderNanos)) { - cMsg.AckId = common.StringPtr(conn.extCache.ackMgr.getNextAckID(storeHostAddress(msg.GetAddress()), msgSeqNum)) - // write the message to the msgsCh so that it can be delivered - // after being stored on the cache. - // 1. either there are no listeners - // 2. the buffer is full - // Wait until the there are some listeners or we are shutting down - select { - case conn.msgsCh <- cMsg: - // written successfully. Now accumulate credits - // TODO: one message is now assumed to take one credit - // we might need to change it to be based on size later. - // we accumulate a bunch of credits and then send it in a batch to store - conn.updateSuccessfulSendToMsgsCh(&localReadMsgs, int64(len(cMsg.Payload.GetData()))) - default: - // we were unable to write it above which probably means the channel is full - // now do it in a blocking way except shutdown. + conn.skipOlderMsgs++ + continue loop + } + + // check if this is a delayed-cg, and if so delay messages appropriately + // compute visibility time based on the enqueue-time and specified delay + if delayNanos > 0 && visibilityTime > now { + + delayTimer.Reset(time.Duration(visibilityTime - now)) + + // wait unil the message should be made 'visible' select { - case conn.msgsCh <- cMsg: - // written successfully. Now accumulate credits - conn.updateSuccessfulSendToMsgsCh(&localReadMsgs, int64(len(cMsg.Payload.GetData()))) - // TODO: Make sure we listen on the close channel if and only if, all the - // consumers are gone as well. (i.e, separate out the close channel). + case <-delayTimer.C: // sleep until delay expiry + // continue down, to send message to cache + case <-conn.closeChannel: conn.logger.WithFields(bark.Fields{ common.TagMsgID: common.FmtMsgID(cMsg.GetAckId()), - `Offset`: msg.GetAddress(), - }).Info("writing msg to the client channel failed because of shutdown.") + `Offset`: msgAddr, + }).Info("aborting delay and failing msg because of shutdown") // We need to update the ackMgr here to *not* have this message in the local map // since we couldn't even write this message out to the msgsCh. // Note: we need to just update this last message because this is a synchronous // pump which reads message after message and once we are here we immediately break // the pump. So there is absolute guarantee that we cannot call getNextAckID() on this // pump parallely. - conn.extCache.ackMgr.resetMsg(msg.GetAddress()) + conn.extCache.ackMgr.resetMsg(msgAddr) return } } + } - case store.ReadMessageContentType_SEALED: - - seal := rmc.GetSealed() - conn.logger.WithField(common.TagSeq, seal.GetSequenceNumber()).Info(`extent seal`) - // Notify the extent cache with an extent sealed error so that - // it can notify the ackMgr and wait for the extent to be consumed - go conn.close(seal) - return - case store.ReadMessageContentType_ERROR: - - msgErr := rmc.GetError() - conn.logger.WithField(common.TagErr, msgErr.GetMessage()).Error(`received error from storehost`) - // close the connection - go conn.close(err) - return + // write the message to the msgsCh so that it can be delivered + // after being stored on the cache. + // 1. either there are no listeners + // 2. the buffer is full + // Wait until the there are some listeners or we are shutting down + select { + case conn.msgsCh <- cMsg: + // written successfully. Now accumulate credits + // TODO: one message is now assumed to take one credit + // we might need to change it to be based on size later. + // we accumulate a bunch of credits and then send it in a batch to store + conn.updateSuccessfulSendToMsgsCh(&localReadMsgs, int64(len(cMsg.Payload.GetData()))) default: - conn.logger.WithField(`Type`, rmc.GetType()).Error(`received ReadMessageContent with unrecognized type`) + // we were unable to write it above which probably means the channel is full + // now do it in a blocking way except shutdown. + select { + case conn.msgsCh <- cMsg: + // written successfully. Now accumulate credits + conn.updateSuccessfulSendToMsgsCh(&localReadMsgs, int64(len(cMsg.Payload.GetData()))) + // TODO: Make sure we listen on the close channel if and only if, all the + // consumers are gone as well. (i.e, separate out the close channel). + case <-conn.closeChannel: + conn.logger.WithFields(bark.Fields{ + common.TagMsgID: common.FmtMsgID(cMsg.GetAckId()), + `Offset`: msgAddr, + }).Info("writing msg to the client channel failed because of shutdown.") + // We need to update the ackMgr here to *not* have this message in the local map + // since we couldn't even write this message out to the msgsCh. + // Note: we need to just update this last message because this is a synchronous + // pump which reads message after message and once we are here we immediately break + // the pump. So there is absolute guarantee that we cannot call getNextAckID() on this + // pump parallely. + conn.extCache.ackMgr.resetMsg(msgAddr) + return + } } + + case store.ReadMessageContentType_SEALED: + + seal := rmc.GetSealed() + conn.logger.WithField(common.TagSeq, seal.GetSequenceNumber()).Info(`extent seal`) + // Notify the extent cache with an extent sealed error so that + // it can notify the ackMgr and wait for the extent to be consumed + go conn.close(seal) + return + case store.ReadMessageContentType_ERROR: + + msgErr := rmc.GetError() + conn.logger.WithField(common.TagErr, msgErr.GetMessage()).Error(`received error from storehost`) + // close the connection + go conn.close(err) + return + default: + conn.logger.WithField(`Type`, rmc.GetType()).Error(`received ReadMessageContent with unrecognized type`) } } }