-
Notifications
You must be signed in to change notification settings - Fork 96
Outputhost: add support for delay and skip-older #234
Conversation
services/outputhost/extcache.go
Outdated
|
||
// compute start-from as the max(skip-older-time, start-from), adjusting for 'delay', if any | ||
if time.Now().Add(-extCache.skipOlder).After(extCache.startFrom) { | ||
startFrom = time.Now().Add(-extCache.skipOlder).Add(-extCache.delay).UnixNano() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on why the delay is in this line but not the if-statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we adjust for the 'delay' proportionally .. on both 'startFrom' and 'now - skip-older'; so the if-check only needs to find out which of them is greater .. and then apply the delay to whichever was the chosen one. i thought the comment did imply that ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some missing stuff here. One thing is that we are preventing messages from getting into message cache, but we really ought to expire messages in the cache as well (by acking them). Some logic in messagecache.go would solve this. If you pursue that, be sure that you appropriately expire the smart-retry LKG message.
// If consumer group wants to start from a timestamp, get the address from the store. | ||
// Note: we take into account any 'delay' that is associated with the CG and the 'skip-older' | ||
// time that was specified, by offsetting the time appropriately. we apply the 'start-from' | ||
// and 'skip-older' to the "visibility time" (enqueue-time + delay) of the message as opposed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I agree with applying the delay to start-from...
If I were to say 'start-from = tuesday', delay = 48h, then I expect to receive the first message enqueued on Tuesday at 12:00:01am on Thursday.
Doesn't seem like you need to take delay into account in start-from for that to work out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "start-from" applies to the "visibility time" of the message .. and the visibility-time takes into account the delay. So when we do a GAFT to store, we need to offset the given start-from by the delay.
For your example, the first message that they would see .. would be the one enqueued on Sunday midnight, delayed by 48h.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean that "future start-from" needs to be a thing? We don't allow that space to be used right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So a "future start from" is technically disallowed -- you cannot create a CG with a start-from beyond a minute into the future (we have checks in frontend create-consumer-group). That said, it is not illegal to have a future start-from associated with an OpenReadStream call from outputhost to storehost -- though this particular code does not enable that scenario; when a 'delay' is specified, the internal start-from value is further pushed into the past by the 'delay' value.
|
||
// Note: the skip-older should apply to the 'visibility time' (ie, after the delay is added). so | ||
// we push out the skip-older by the delay amount, if any | ||
var skipOlderNanos = int64(conn.extCache.skipOlder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on how the delay amount is reflected in this value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stale comment; removed.
// T471157 For timers, do not signal discontinuities to ack manager, since discontinuities are frequent | ||
msgSeqNum = 0 | ||
// compute the 'visibility-time' of the message, taking into account the specified 'delay', if any. | ||
visibilityTime := msg.Message.GetEnqueueTimeUtc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the visibility time is zero, then we can neither process delay nor skipolder, so we might just skip this whole if-block starting at line 269. That would also be a good place to call out that it may be zero, and specify the policy of pass-thru in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true; done.
conn.updateSuccessfulSendToMsgsCh(&localReadMsgs, int64(len(cMsg.Payload.GetData()))) | ||
// TODO: Make sure we listen on the close channel if and only if, all the | ||
// consumers are gone as well. (i.e, separate out the close channel). | ||
case <-delayTimer.C: // sleep until delay expiry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bit of danger here:
- I suspect that our idle detection logic will kill the connection if it sits here too long.
- A firewall admin can give us a very bad day with a small policy change. Long lived zero-throughput connections often don't resume properly.
I don't think that we should pursue a design change, but you ought to test the idle connection timeout thing once, probably by adjusting the idle timeout to a suitably low value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the we time-out (or the connection is dropped), when we connect again, the "delay"/sleep would be reduced .. since it is computed off the enqueue-time in the message .. so I don't see that causing a problem here.
Unless, I misunderstood what you meant to point out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, just suggest that we verify that it works, not that we gate the PR on that test.
The 'skip-older' is primarily intended as a mechanism to flush out the backlog quickly. And what you suggest does help enforce 'skip-older' more strictly .. but I am going to leave this out for now; we can perhaps consider this in the future if we find that there are too many messages hanging around in the message cache for unacceptably long. |
No description provided.