-
Notifications
You must be signed in to change notification settings - Fork 23
Fix client-outputhost fd leak on consumer.close() #8
Conversation
@@ -161,7 +159,7 @@ func (conn *outputHostConnection) close() { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this change but using atomic.LoadInt32 here is unnecessary. There is already a lock on entering close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
samarabbas, isOpened() and isClosed() access the member without holding the lock. So, leaving as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment. You can merge it in after fixing that.
for { | ||
if _, err := conn.outputHostStream.Read(); err != nil { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here should we try to write to the deliveryCh in a non-blocking way? That way, we can do a best-effort in delivering some messages if the application is still processing. That way we won't just drop the tail in case, say, the server is being restarted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aravindvs - The trouble is - we close the ack tchannel immediately after calling consumer.close(). So, even if you give the opportunity for the app to process them, they won't be able to ack/nack them, so, all that would do is delay the shutdown. @samar & I discussed a proposal for supporting graceful shutdown on the client. It would involve adding another API, something like client.initiateClose(). When the app calls this, the library will drain the read channel and enqueue everything to the delivery channel (and subsequently also close the deliveryChannel). The closing of delivery channel will be the signal for the app to indicate EOF. The app then acks/nacks all of them and finally calls client.close().
This patch doesn't address the clean shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah.. ok.. makes sense..
When consumer.close() is called and an outputhostconnection has some messages waiting to be read on the stream/socket, the readMessagesPump() go-routine might block forever, resulting in a go-routine / fd leak. This is because readMessgesPump() blocks on the deliveryCh (which is inturn read by the application using the library). This patch does the following: