Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Outputhost: add support for delay and skip-older #234

Merged
merged 3 commits into from
Jun 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions services/outputhost/cgcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
destUUID: cgCache.cachedCGDesc.GetDestinationUUID(),
destType: destType,
storeUUIDs: cge.StoreUUIDs,
startFrom: cgCache.cachedCGDesc.GetStartFrom(),
skipOlder: cgCache.cachedCGDesc.GetSkipOlderMessagesSeconds(),
delay: cgCache.cachedCGDesc.GetDelaySeconds(),
startFrom: time.Unix(0, cgCache.cachedCGDesc.GetStartFrom()),
skipOlder: time.Duration(int64(cgCache.cachedCGDesc.GetSkipOlderMessagesSeconds()) * int64(time.Second)),
delay: time.Duration(int64(cgCache.cachedCGDesc.GetDelaySeconds()) * int64(time.Second)),
notifyReplicaCloseCh: make(chan error, 5),
closeChannel: make(chan struct{}),
waitConsumedCh: make(chan bool, 1),
Expand Down Expand Up @@ -392,7 +392,6 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
cgCache.cachedCGDesc.GetConsumerGroupName(),
cgCache.kafkaCluster,
cgCache.kafkaTopics,
common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()),
cgCache.metaClient,
cge,
cgCache.consumerM3Client,
Expand Down
36 changes: 24 additions & 12 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ type extentCache struct {
// numExtents is the total number of extents within the CG. this is used to determine the initial credits
numExtents int

// startFrom is the offset to start from
startFrom int64
// startFrom is the time to start-from
startFrom time.Time

// skipOlder indicates that the CG wants to skip any messages older than this value, in seconds
skipOlder int32
skipOlder time.Duration

// delay indicates that the CG wants to delay every message by the specified value, in seconds
delay int32
delay time.Duration

// msgsCh is the channel where we write the message to the client as we read from replica
msgsCh chan<- *cherami.ConsumerMessage
Expand Down Expand Up @@ -161,15 +161,14 @@ var kafkaLogSetup sync.Once
const extentLoadReportingInterval = 2 * time.Second

// kafkaDefaultRetention is the default value of log.retention.hours in the Kafka system
const kafkaDefaultRetention = common.UnixNanoTime(time.Hour * 24 * 7)
const kafkaDefaultRetention = time.Hour * 24 * 7

func (extCache *extentCache) load(
outputHostUUID,
cgUUID,
cgName,
kafkaCluster string,
kafkaTopics []string,
startFrom common.UnixNanoTime,
metaClient metadata.TChanMetadataService,
cge *shared.ConsumerGroupExtent,
metricsClient metrics.Client,
Expand All @@ -186,7 +185,7 @@ func (extCache *extentCache) load(

if common.IsKafkaConsumerGroupExtent(cge) {
extCache.connectedStoreUUID = kafkaConnectedStoreUUID
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics, metricsClient)
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, kafkaCluster, kafkaTopics, metricsClient)
} else {
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))
Expand Down Expand Up @@ -237,15 +236,28 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence

// First try to start from the already set offset in metadata
if startAddress == 0 {
// If consumer group wants to start from a timestamp, get the address from the store
startFrom := extCache.startFrom
// If consumer group wants to start from a timestamp, get the address from the store.
// Note: we take into account any 'delay' that is associated with the CG and the 'skip-older'
// time that was specified, by offsetting the time appropriately. we apply the 'start-from'
// and 'skip-older' to the "visibility time" (enqueue-time + delay) of the message as opposed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree with applying the delay to start-from...

If I were to say 'start-from = tuesday', delay = 48h, then I expect to receive the first message enqueued on Tuesday at 12:00:01am on Thursday.

Doesn't seem like you need to take delay into account in start-from for that to work out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "start-from" applies to the "visibility time" of the message .. and the visibility-time takes into account the delay. So when we do a GAFT to store, we need to offset the given start-from by the delay.

For your example, the first message that they would see .. would be the one enqueued on Sunday midnight, delayed by 48h.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that "future start-from" needs to be a thing? We don't allow that space to be used right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a "future start from" is technically disallowed -- you cannot create a CG with a start-from beyond a minute into the future (we have checks in frontend create-consumer-group). That said, it is not illegal to have a future start-from associated with an OpenReadStream call from outputhost to storehost -- though this particular code does not enable that scenario; when a 'delay' is specified, the internal start-from value is further pushed into the past by the 'delay' value.

// to the enqueue-time.

// TODO: if the consumer group wants to start from the beginning, we should still calculate the earliest address
// that they can get. To do otherwise means that will will get spurious 'skipped messages' warnings
// NOTE: audit will have to handle an uneven 'virtual startFrom' across all of the extents that a zero startFrom
// consumer group is reading
// NOTE: there is a race between consumption and retention here!
if startFrom > 0 {
if extCache.startFrom.UnixNano() > 0 || extCache.skipOlder > 0 {

var startFrom int64

// compute start-from as the max(skip-older-time, start-from), adjusting for 'delay', if any
if extCache.skipOlder > 0 && time.Now().Add(-extCache.skipOlder).After(extCache.startFrom) {
startFrom = time.Now().Add(-extCache.skipOlder).Add(-extCache.delay).UnixNano()
} else {
startFrom = extCache.startFrom.Add(-extCache.delay).UnixNano()
}

// GetAddressFromTimestamp() from the store using startFrom
// use a tmp context whose timeout is shorter
tmpCtx, cancelGAFT := thrift.NewContext(getAddressCtxTimeout)
Expand Down Expand Up @@ -338,7 +350,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
func (extCache *extentCache) loadKafkaStream(
cgName string,
outputHostUUID string,
startFrom common.UnixNanoTime,
kafkaCluster string,
kafkaTopics []string,
metricsClient metrics.Client,
Expand Down Expand Up @@ -366,7 +377,8 @@ func (extCache *extentCache) loadKafkaStream(
// TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request
// to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster
// starts from the right place
if common.Now()-startFrom > kafkaDefaultRetention/2 {

if time.Now().Sub(extCache.startFrom) > kafkaDefaultRetention/2 {
cfg.Config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

Expand Down
Loading