-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Graceful Shutdown #55
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adamgfraser Looks good. We need to slightly tweak this to make the feature production ready:
- There's a corner case that I'd like to handle in which there are buffered records in the state; I'd like those records to be drained into the stream before a
Request
is resolved withTake.End
. - Another issue is that we actually need to keep polling the consumer until it is shutdown. This makes sure we don't get kicked out of the consumer group, and that commits are actually sent to the broker. So the trick here would be to check that ref in the polling loop and keep all partitions paused there.
- Let's call the
gracefulShutdown
methodstopConsumption
, and document that it stops consumption of data, drains buffered records and ends the attached streams while still serving commit requests. The reason it's not a graceful shutdown is that this needs to be combined with joining the stream's fiber in order to actually be a graceful shutdown. Something like this:
consumer.use { c =>
val stream = c.subscribeAnd(...)
.plainStream
// more stuff
.runDrain
for {
streamFiber <- stream.fork
_ <- clock.sleep(2.seconds)
_ <- c.stopConsumption
_ <- streamFiber.await
} yield ()
}
- We also need to make sure that the
partitions
queue is shutdown after thatTake.End
is dequeued, so that subsequent attempts to create a stream from the consumer would be short circuited. I wish we hadQueue#drain
for this, but I think we can emulate it by doing amapM
on the partitions queue that checks if the dequeued element is aTake.End
, and if so, shuts down the stream.
@iravid Thanks! Tried to address all your comments. Not sure I got everything quite right so please take a look when you have the chance. |
@@ -384,17 +407,34 @@ object Runloop { | |||
else doCommit(List(cmd)).as(state) | |||
} yield newState | |||
|
|||
def handleShutdown(state: State, cmd: Command): BlockingTask[State] = cmd match { | |||
case Command.Poll() => UIO.succeed(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to call handlePoll
here, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! Fixed now.
@@ -384,17 +407,34 @@ object Runloop { | |||
else doCommit(List(cmd)).as(state) | |||
} yield newState | |||
|
|||
def handleShutdown(state: State, cmd: Command): BlockingTask[State] = cmd match { | |||
case Command.Poll() => handlePoll(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thing is bothering me here. When the shutdownRef is switched to true
, there are probably still some requests pending in the state. This means that:
- We enter
handlePoll
with pending requests; - The partitions for those requests will be resumed (line 327);
- The poll will return records for those, and those records are discarded in line 343.
We can probably do better by just making sure that when shutdownRef = true, we always enter handlePoll with no requests. This can be done by eagerly resolving them at this line. Then handlePoll
can stay unchanged without being aware of the shutdown state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated eagerly resolve pending requests.
Resolves #29.