Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
* s/isStopped/isShuttingDown
* move handle reconfig into its own utility
  • Loading branch information
Aravind Srinivasan committed Mar 31, 2017
1 parent 6bec9ac commit 1ede1e5
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions client/cherami/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (conn *connection) open() error {
return nil
}

func (conn *connection) isStopped() bool {
func (conn *connection) isShuttingDown() bool {
select {
case <-conn.shuttingDownCh:
// already shutdown
Expand All @@ -161,7 +161,7 @@ func (conn *connection) stopWritePump() {

// Note: this needs to be called with the conn lock held!
func (conn *connection) stopWritePumpWithLock() {
if !conn.isStopped() {
if !conn.isShuttingDown() {
close(conn.shuttingDownCh)
if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("writeMsgPumpWG timed out")
Expand Down Expand Up @@ -242,6 +242,14 @@ func (conn *connection) writeMessagesPump() {
}
}

func (conn *connection) handleReconfigCmd(reconfigInfo *cherami.ReconfigureInfo) {
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
}

func (conn *connection) readAcksPump() {
defer conn.readAckPumpWG.Done()

Expand Down Expand Up @@ -309,13 +317,9 @@ func (conn *connection) readAcksPump() {
}
} else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1)
reconfigInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.")
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.")
conn.handleReconfigCmd(rInfo)
} else if cmd.GetType() == cherami.InputHostCommandType_DRAIN {
// drain all inflight messages
// reconfigure to pick up new extents if any
Expand All @@ -324,15 +328,9 @@ func (conn *connection) readAcksPump() {
// this makes sure, we don't send any new messages.
// the read pump will exit when the server completes the drain
go conn.stopWritePump()

reconfigInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Drain command received from InputHost.")
// reconfigure to pick up new extents
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.")
conn.handleReconfigCmd(rInfo)
}

}
Expand Down

0 comments on commit 1ede1e5

Please sign in to comment.