Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Fix client-outputhost fd leak on consumer.close() (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Jan 25, 2017
1 parent dcf9c0f commit f6da423
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 50 deletions.
37 changes: 7 additions & 30 deletions client/cherami/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"sync"
"sync/atomic"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"

Expand All @@ -41,7 +41,6 @@ type (
path string
consumerGroupName string
consumerName string
streamConnection *tchannel.Channel
ackConnection *tchannel.Channel
prefetchSize int
options *ClientOptions
Expand Down Expand Up @@ -114,12 +113,6 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {
return nil, err
}

streamCh, err := tchannel.NewChannel(uuid.New(), nil)
if err != nil {
return nil, err
}
c.streamConnection = streamCh

// Create a separate connection for ack messages call
ackCh, err := tchannel.NewChannel(uuid.New(), nil)
if err != nil {
Expand Down Expand Up @@ -147,7 +140,7 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {
conn, err := c.createOutputHostConnection(common.GetConnectionKey(tchanHostAddresses[idx]), connKey, chosenProtocol)
if err != nil {
if conn != nil {
closeConnection(conn)
conn.close()
}

c.logger.Errorf("Error opening outputhost connection on %v:%v: %v", host.GetHost(), host.GetPort(), err)
Expand Down Expand Up @@ -186,14 +179,11 @@ func (c *consumerImpl) Close() {
defer c.lk.Unlock()
if c.connections != nil {
for _, outputHostConn := range c.connections {
closeConnection(outputHostConn)
outputHostConn.close()
}
c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, 0)
}

if c.streamConnection != nil {
c.streamConnection.Close()
}
if c.ackConnection != nil {
c.ackConnection.Close()
}
Expand Down Expand Up @@ -260,7 +250,7 @@ func (c *consumerImpl) reconfigureConsumer() {
for existingConnKey, existingConn := range c.connections {
if existingConn.isClosed() {
c.logger.WithField(common.TagHostIP, common.FmtHostIP(existingConnKey)).Info("Removing closed connection from cache.")
closeConnection(existingConn)
existingConn.close()
delete(c.connections, existingConnKey)
c.logger.WithField(common.TagHostIP, common.FmtHostIP(existingConn.connKey)).Info("Removed connection from cache.")
}
Expand All @@ -278,7 +268,7 @@ func (c *consumerImpl) reconfigureConsumer() {
if err != nil {
connLogger.Info("Error creating connection to OutputHost after reconfiguration.")
if conn != nil {
closeConnection(conn)
conn.close()
}
} else {
connLogger.Info("Successfully created connection to OutputHost after reconfiguration.")
Expand All @@ -296,7 +286,7 @@ func (c *consumerImpl) reconfigureConsumer() {
if _, ok := currentHosts[host]; !ok {
connLogger := c.logger.WithField(common.TagHostIP, common.FmtHostIP(outputHostConn.connKey))
connLogger.Info("Closing connection to OutputHost after reconfiguration.")
closeConnection(outputHostConn)
outputHostConn.close()
}
}

Expand All @@ -314,21 +304,14 @@ func (c *consumerImpl) reconfigureConsumer() {
func (c *consumerImpl) createOutputHostConnection(tchanHostPort string, connKey string, protocol cherami.Protocol) (*outputHostConnection, error) {
connLogger := c.logger.WithField(common.TagHostIP, common.FmtHostIP(connKey))

// TODO [ljj] to be removed once moved to websocket
client, err := createOutputHostClient(c.streamConnection, connKey)
if err != nil {
connLogger.Infof("Error creating OutputHost client: %v", err)
return nil, err
}

// We use a separate connection for acks to make sure response for acks won't get blocked behind streaming messages
ackClient, err := createOutputHostClient(c.ackConnection, tchanHostPort)
if err != nil {
connLogger.Infof("Error creating AckClient: %v", err)
return nil, err
}

conn := newOutputHostConnection(client, ackClient, c.wsConnector, c.path, c.consumerGroupName, c.options, c.deliveryCh,
conn := newOutputHostConnection(ackClient, c.wsConnector, c.path, c.consumerGroupName, c.options, c.deliveryCh,
c.reconfigureCh, connKey, protocol, int32(c.prefetchSize), connLogger, c.reporter)

// Now open the connection
Expand Down Expand Up @@ -405,9 +388,3 @@ func createOutputHostClient(ch *tchannel.Channel, hostPort string) (cherami.TCha

return client, nil
}

func closeConnection(conn *outputHostConnection) {
conn.close()
// This is necessary to shutdown writeAcksPump within the connection
conn.closeAcksBatchCh()
}
35 changes: 28 additions & 7 deletions client/cherami/outputhostconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"sync/atomic"
"time"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-client-go/stream"
"github.com/uber/cherami-thrift/.generated/go/cherami"

"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"
Expand All @@ -38,7 +38,6 @@ import (

type (
outputHostConnection struct {
outputHostClient cherami.TChanBOut
ackClient cherami.TChanBOut
wsConnector WSConnector
path string
Expand Down Expand Up @@ -74,7 +73,7 @@ const (
ackBatchDelay = time.Second / 10
)

func newOutputHostConnection(client cherami.TChanBOut, ackClient cherami.TChanBOut, wsConnector WSConnector,
func newOutputHostConnection(ackClient cherami.TChanBOut, wsConnector WSConnector,
path, consumerGroupName string, options *ClientOptions, deliveryCh chan<- Delivery,
reconfigureCh chan<- reconfigureInfo, connKey string, protocol cherami.Protocol,
prefetchSize int32, logger bark.Logger, reporter metrics.Reporter) *outputHostConnection {
Expand All @@ -90,7 +89,6 @@ func newOutputHostConnection(client cherami.TChanBOut, ackClient cherami.TChanBO
return &outputHostConnection{
connKey: connKey,
protocol: protocol,
outputHostClient: client,
ackClient: ackClient,
wsConnector: wsConnector,
path: path,
Expand Down Expand Up @@ -161,7 +159,7 @@ func (conn *outputHostConnection) close() {
}

close(conn.closeChannel)

conn.closeAcksBatchCh() // necessary to shutdown writeAcksPump within the connection
atomic.StoreInt32(&conn.closed, 1)
conn.logger.Info("Output host connection closed.")
}
Expand All @@ -175,7 +173,23 @@ func (conn *outputHostConnection) isClosed() bool {
return atomic.LoadInt32(&conn.closed) != 0
}

// drainReadPipe reads and discards all messages on
// the outputHostStream until it encounters
// a read stream error
func (conn *outputHostConnection) drainReadPipe() {
for {
if _, err := conn.outputHostStream.Read(); err != nil {
return
}
}
}

func (conn *outputHostConnection) readMessagesPump() {

defer func() {
conn.logger.Info("readMessagesPump done")
}()

var localCredits int32
for {
conn.reporter.UpdateGauge(metrics.ConsumeLocalCredits, nil, int64(localCredits))
Expand All @@ -193,7 +207,6 @@ func (conn *outputHostConnection) readMessagesPump() {
if err != nil {
// Error reading from stream. Time to close and bail out.
conn.logger.Infof("Error reading OutputHost Message Stream: %v", err)

// Stream is closed. Close the connection and bail out
conn.close()
return
Expand All @@ -203,7 +216,15 @@ func (conn *outputHostConnection) readMessagesPump() {
conn.reporter.IncCounter(metrics.ConsumeMessageRate, nil, 1)
msg := cmd.Message
delivery := newDelivery(msg, conn)
conn.deliveryCh <- delivery

select {
case conn.deliveryCh <- delivery:
case <-conn.closeChannel:
conn.logger.Info("close signal received, initiating readPump drain")
conn.drainReadPipe()
return
}

localCredits++
} else if cmd.GetType() == cherami.OutputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.ConsumeReconfigureRate, nil, 1)
Expand Down
24 changes: 11 additions & 13 deletions client/cherami/outputhostconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"errors"
"io"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
mc "github.com/uber/cherami-client-go/mocks/clients/cherami"
"github.com/uber/cherami-thrift/.generated/go/cherami"

log "github.com/Sirupsen/logrus"
"github.com/stretchr/testify/mock"
Expand All @@ -54,7 +54,7 @@ func (s *OutputHostConnectionSuite) SetupTest() {
}

func (s *OutputHostConnectionSuite) TestOutputHostBasic() {
conn, _, _, stream, messagesCh := createOutputHostConnection()
conn, _, stream, messagesCh := createOutputHostConnection()

stream.On("Write", mock.Anything).Return(nil)
stream.On("Flush").Return(nil)
Expand All @@ -74,7 +74,7 @@ func (s *OutputHostConnectionSuite) TestOutputHostBasic() {
}

func (s *OutputHostConnectionSuite) TestReadFailed() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

stream.On("Write", mock.Anything).Return(nil)
stream.On("Flush").Return(nil)
Expand All @@ -90,7 +90,7 @@ func (s *OutputHostConnectionSuite) TestReadFailed() {
}

func (s *OutputHostConnectionSuite) TestReadEOF() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

stream.On("Write", mock.Anything).Return(nil)
stream.On("Flush").Return(nil)
Expand All @@ -106,7 +106,7 @@ func (s *OutputHostConnectionSuite) TestReadEOF() {
}

func (s *OutputHostConnectionSuite) TestCreditsRenewSuccess() {
conn, _, _, stream, messagesCh := createOutputHostConnection()
conn, _, stream, messagesCh := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *OutputHostConnectionSuite) TestCreditsRenewSuccess() {
}

func (s *OutputHostConnectionSuite) TestInitialCreditsWriteFailed() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand All @@ -160,7 +160,7 @@ func (s *OutputHostConnectionSuite) TestInitialCreditsWriteFailed() {
}

func (s *OutputHostConnectionSuite) TestInitialCreditsFlushFailed() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand All @@ -182,7 +182,7 @@ func (s *OutputHostConnectionSuite) TestInitialCreditsFlushFailed() {
}

func (s *OutputHostConnectionSuite) TestRenewCreditsFailed() {
conn, _, _, stream, messagesCh := createOutputHostConnection()
conn, _, stream, messagesCh := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand Down Expand Up @@ -216,22 +216,20 @@ func (s *OutputHostConnectionSuite) TestRenewCreditsFailed() {
stream.AssertExpectations(s.T())
}

func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClient, *mc.MockTChanBOutClient, *mc.MockBOutOpenConsumerStreamOutCall, chan Delivery) {
func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClient, *mc.MockBOutOpenConsumerStreamOutCall, chan Delivery) {
host := "testHost"
outputHostClient := new(mc.MockTChanBOutClient)
ackClient := new(mc.MockTChanBOutClient)
wsConnector := new(mc.MockWSConnector)
stream := new(mc.MockBOutOpenConsumerStreamOutCall)
deliveryCh := make(chan Delivery)
reconfigureCh := make(chan reconfigureInfo, 10)
options := &ClientOptions{Timeout: time.Minute}

outputHostClient.On("OpenConsumerStream", mock.Anything).Return(stream, nil)
wsConnector.On("OpenConsumerStream", mock.Anything, mock.Anything).Return(stream, nil)
conn := newOutputHostConnection(outputHostClient, ackClient, wsConnector, "/test/outputhostconnection", "/consumer", options,
conn := newOutputHostConnection(ackClient, wsConnector, "/test/outputhostconnection", "/consumer", options,
deliveryCh, reconfigureCh, host, cherami.Protocol_WS, int32(100), bark.NewLoggerFromLogrus(log.StandardLogger()), metrics.NewNullReporter())

return conn, outputHostClient, ackClient, stream, deliveryCh
return conn, ackClient, stream, deliveryCh
}

func wrapMessageInCommand(msg *cherami.ConsumerMessage) *cherami.OutputHostCommand {
Expand Down

0 comments on commit f6da423

Please sign in to comment.