-
Notifications
You must be signed in to change notification settings - Fork 23
Conversation
This patch makes sure the client can handle the DRAIN command from the inputhost. As soon as it receives the command, it will stop it's write pump and will mark itself as "closed" so that if we get the same inputhost as a result of reconfiguration, we open a new connection object. Add a unit test to test the same.
1 similar comment
client/cherami/connection.go
Outdated
// the read pump will exit when the server completes the drain | ||
go conn.stopWritePump() | ||
|
||
reconfigInfo := cmd.Reconfigure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about moving this logic into its own method so that it can be shared across the two if branches ?
client/cherami/connection.go
Outdated
@@ -140,17 +141,42 @@ func (conn *connection) open() error { | |||
return nil | |||
} | |||
|
|||
func (conn *connection) close() { | |||
func (conn *connection) isStopped() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/isStopped/isShuttingDown to be consistent with naming.
func (conn *connection) isClosed() bool { | ||
return atomic.LoadInt32(&conn.closed) != 0 | ||
return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need the additional atomic var ? Can you not do isShuttingDown() instead of Load(&conn.drained) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason i added this is to avoid taking the lock in isClosed(). If i use isShuttingDown() i need to take the lock to prevent race..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channels are thread safe.. not sure about the race.. Will leave it up to you resolve it.
* s/isStopped/isShuttingDown * move handle reconfig into its own utility
func (conn *connection) isClosed() bool { | ||
return atomic.LoadInt32(&conn.closed) != 0 | ||
return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channels are thread safe.. not sure about the race.. Will leave it up to you resolve it.
This patch makes sure the client can handle the DRAIN command from
the inputhost.
As soon as it receives the command, it will stop it's write pump and
will mark itself as "closed" so that if we get the same inputhost as a
result of reconfiguration, we open a new connection object.
Add a unit test to test the same.