Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Fix client-outputhost fd leak on consumer.close() #8

Merged
merged 4 commits into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 7 additions & 30 deletions client/cherami/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"sync"
"sync/atomic"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"

Expand All @@ -41,7 +41,6 @@ type (
path string
consumerGroupName string
consumerName string
streamConnection *tchannel.Channel
ackConnection *tchannel.Channel
prefetchSize int
options *ClientOptions
Expand Down Expand Up @@ -114,12 +113,6 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {
return nil, err
}

streamCh, err := tchannel.NewChannel(uuid.New(), nil)
if err != nil {
return nil, err
}
c.streamConnection = streamCh

// Create a separate connection for ack messages call
ackCh, err := tchannel.NewChannel(uuid.New(), nil)
if err != nil {
Expand Down Expand Up @@ -147,7 +140,7 @@ func (c *consumerImpl) Open(deliveryCh chan Delivery) (chan Delivery, error) {
conn, err := c.createOutputHostConnection(common.GetConnectionKey(tchanHostAddresses[idx]), connKey, chosenProtocol)
if err != nil {
if conn != nil {
closeConnection(conn)
conn.close()
}

c.logger.Errorf("Error opening outputhost connection on %v:%v: %v", host.GetHost(), host.GetPort(), err)
Expand Down Expand Up @@ -186,14 +179,11 @@ func (c *consumerImpl) Close() {
defer c.lk.Unlock()
if c.connections != nil {
for _, outputHostConn := range c.connections {
closeConnection(outputHostConn)
outputHostConn.close()
}
c.reporter.UpdateGauge(metrics.ConsumeNumConnections, nil, 0)
}

if c.streamConnection != nil {
c.streamConnection.Close()
}
if c.ackConnection != nil {
c.ackConnection.Close()
}
Expand Down Expand Up @@ -260,7 +250,7 @@ func (c *consumerImpl) reconfigureConsumer() {
for existingConnKey, existingConn := range c.connections {
if existingConn.isClosed() {
c.logger.WithField(common.TagHostIP, common.FmtHostIP(existingConnKey)).Info("Removing closed connection from cache.")
closeConnection(existingConn)
existingConn.close()
delete(c.connections, existingConnKey)
c.logger.WithField(common.TagHostIP, common.FmtHostIP(existingConn.connKey)).Info("Removed connection from cache.")
}
Expand All @@ -278,7 +268,7 @@ func (c *consumerImpl) reconfigureConsumer() {
if err != nil {
connLogger.Info("Error creating connection to OutputHost after reconfiguration.")
if conn != nil {
closeConnection(conn)
conn.close()
}
} else {
connLogger.Info("Successfully created connection to OutputHost after reconfiguration.")
Expand All @@ -296,7 +286,7 @@ func (c *consumerImpl) reconfigureConsumer() {
if _, ok := currentHosts[host]; !ok {
connLogger := c.logger.WithField(common.TagHostIP, common.FmtHostIP(outputHostConn.connKey))
connLogger.Info("Closing connection to OutputHost after reconfiguration.")
closeConnection(outputHostConn)
outputHostConn.close()
}
}

Expand All @@ -314,21 +304,14 @@ func (c *consumerImpl) reconfigureConsumer() {
func (c *consumerImpl) createOutputHostConnection(tchanHostPort string, connKey string, protocol cherami.Protocol) (*outputHostConnection, error) {
connLogger := c.logger.WithField(common.TagHostIP, common.FmtHostIP(connKey))

// TODO [ljj] to be removed once moved to websocket
client, err := createOutputHostClient(c.streamConnection, connKey)
if err != nil {
connLogger.Infof("Error creating OutputHost client: %v", err)
return nil, err
}

// We use a separate connection for acks to make sure response for acks won't get blocked behind streaming messages
ackClient, err := createOutputHostClient(c.ackConnection, tchanHostPort)
if err != nil {
connLogger.Infof("Error creating AckClient: %v", err)
return nil, err
}

conn := newOutputHostConnection(client, ackClient, c.wsConnector, c.path, c.consumerGroupName, c.options, c.deliveryCh,
conn := newOutputHostConnection(ackClient, c.wsConnector, c.path, c.consumerGroupName, c.options, c.deliveryCh,
c.reconfigureCh, connKey, protocol, int32(c.prefetchSize), connLogger, c.reporter)

// Now open the connection
Expand Down Expand Up @@ -405,9 +388,3 @@ func createOutputHostClient(ch *tchannel.Channel, hostPort string) (cherami.TCha

return client, nil
}

func closeConnection(conn *outputHostConnection) {
conn.close()
// This is necessary to shutdown writeAcksPump within the connection
conn.closeAcksBatchCh()
}
35 changes: 28 additions & 7 deletions client/cherami/outputhostconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"sync/atomic"
"time"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-client-go/stream"
"github.com/uber/cherami-thrift/.generated/go/cherami"

"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"
Expand All @@ -38,7 +38,6 @@ import (

type (
outputHostConnection struct {
outputHostClient cherami.TChanBOut
ackClient cherami.TChanBOut
wsConnector WSConnector
path string
Expand Down Expand Up @@ -74,7 +73,7 @@ const (
ackBatchDelay = time.Second / 10
)

func newOutputHostConnection(client cherami.TChanBOut, ackClient cherami.TChanBOut, wsConnector WSConnector,
func newOutputHostConnection(ackClient cherami.TChanBOut, wsConnector WSConnector,
path, consumerGroupName string, options *ClientOptions, deliveryCh chan<- Delivery,
reconfigureCh chan<- reconfigureInfo, connKey string, protocol cherami.Protocol,
prefetchSize int32, logger bark.Logger, reporter metrics.Reporter) *outputHostConnection {
Expand All @@ -90,7 +89,6 @@ func newOutputHostConnection(client cherami.TChanBOut, ackClient cherami.TChanBO
return &outputHostConnection{
connKey: connKey,
protocol: protocol,
outputHostClient: client,
ackClient: ackClient,
wsConnector: wsConnector,
path: path,
Expand Down Expand Up @@ -161,7 +159,7 @@ func (conn *outputHostConnection) close() {
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this change but using atomic.LoadInt32 here is unnecessary. There is already a lock on entering close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

samarabbas, isOpened() and isClosed() access the member without holding the lock. So, leaving as is.


close(conn.closeChannel)

conn.closeAcksBatchCh() // necessary to shutdown writeAcksPump within the connection
atomic.StoreInt32(&conn.closed, 1)
conn.logger.Info("Output host connection closed.")
}
Expand All @@ -175,7 +173,23 @@ func (conn *outputHostConnection) isClosed() bool {
return atomic.LoadInt32(&conn.closed) != 0
}

// drainReadPipe reads and discards all messages on
// the outputHostStream until it encounters
// a read stream error
func (conn *outputHostConnection) drainReadPipe() {
for {
if _, err := conn.outputHostStream.Read(); err != nil {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here should we try to write to the deliveryCh in a non-blocking way? That way, we can do a best-effort in delivering some messages if the application is still processing. That way we won't just drop the tail in case, say, the server is being restarted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aravindvs - The trouble is - we close the ack tchannel immediately after calling consumer.close(). So, even if you give the opportunity for the app to process them, they won't be able to ack/nack them, so, all that would do is delay the shutdown. @samar & I discussed a proposal for supporting graceful shutdown on the client. It would involve adding another API, something like client.initiateClose(). When the app calls this, the library will drain the read channel and enqueue everything to the delivery channel (and subsequently also close the deliveryChannel). The closing of delivery channel will be the signal for the app to indicate EOF. The app then acks/nacks all of them and finally calls client.close().

This patch doesn't address the clean shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah.. ok.. makes sense..

}
}

func (conn *outputHostConnection) readMessagesPump() {

defer func() {
conn.logger.Info("readMessagesPump done")
}()

var localCredits int32
for {
conn.reporter.UpdateGauge(metrics.ConsumeLocalCredits, nil, int64(localCredits))
Expand All @@ -193,7 +207,6 @@ func (conn *outputHostConnection) readMessagesPump() {
if err != nil {
// Error reading from stream. Time to close and bail out.
conn.logger.Infof("Error reading OutputHost Message Stream: %v", err)

// Stream is closed. Close the connection and bail out
conn.close()
return
Expand All @@ -203,7 +216,15 @@ func (conn *outputHostConnection) readMessagesPump() {
conn.reporter.IncCounter(metrics.ConsumeMessageRate, nil, 1)
msg := cmd.Message
delivery := newDelivery(msg, conn)
conn.deliveryCh <- delivery

select {
case conn.deliveryCh <- delivery:
case <-conn.closeChannel:
conn.logger.Info("close signal received, initiating readPump drain")
conn.drainReadPipe()
return
}

localCredits++
} else if cmd.GetType() == cherami.OutputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.ConsumeReconfigureRate, nil, 1)
Expand Down
24 changes: 11 additions & 13 deletions client/cherami/outputhostconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"errors"
"io"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
mc "github.com/uber/cherami-client-go/mocks/clients/cherami"
"github.com/uber/cherami-thrift/.generated/go/cherami"

log "github.com/Sirupsen/logrus"
"github.com/stretchr/testify/mock"
Expand All @@ -54,7 +54,7 @@ func (s *OutputHostConnectionSuite) SetupTest() {
}

func (s *OutputHostConnectionSuite) TestOutputHostBasic() {
conn, _, _, stream, messagesCh := createOutputHostConnection()
conn, _, stream, messagesCh := createOutputHostConnection()

stream.On("Write", mock.Anything).Return(nil)
stream.On("Flush").Return(nil)
Expand All @@ -74,7 +74,7 @@ func (s *OutputHostConnectionSuite) TestOutputHostBasic() {
}

func (s *OutputHostConnectionSuite) TestReadFailed() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

stream.On("Write", mock.Anything).Return(nil)
stream.On("Flush").Return(nil)
Expand All @@ -90,7 +90,7 @@ func (s *OutputHostConnectionSuite) TestReadFailed() {
}

func (s *OutputHostConnectionSuite) TestReadEOF() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

stream.On("Write", mock.Anything).Return(nil)
stream.On("Flush").Return(nil)
Expand All @@ -106,7 +106,7 @@ func (s *OutputHostConnectionSuite) TestReadEOF() {
}

func (s *OutputHostConnectionSuite) TestCreditsRenewSuccess() {
conn, _, _, stream, messagesCh := createOutputHostConnection()
conn, _, stream, messagesCh := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *OutputHostConnectionSuite) TestCreditsRenewSuccess() {
}

func (s *OutputHostConnectionSuite) TestInitialCreditsWriteFailed() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand All @@ -160,7 +160,7 @@ func (s *OutputHostConnectionSuite) TestInitialCreditsWriteFailed() {
}

func (s *OutputHostConnectionSuite) TestInitialCreditsFlushFailed() {
conn, _, _, stream, _ := createOutputHostConnection()
conn, _, stream, _ := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand All @@ -182,7 +182,7 @@ func (s *OutputHostConnectionSuite) TestInitialCreditsFlushFailed() {
}

func (s *OutputHostConnectionSuite) TestRenewCreditsFailed() {
conn, _, _, stream, messagesCh := createOutputHostConnection()
conn, _, stream, messagesCh := createOutputHostConnection()

initialFlows := cherami.NewControlFlow()
initialFlows.Credits = common.Int32Ptr(conn.prefetchSize)
Expand Down Expand Up @@ -216,22 +216,20 @@ func (s *OutputHostConnectionSuite) TestRenewCreditsFailed() {
stream.AssertExpectations(s.T())
}

func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClient, *mc.MockTChanBOutClient, *mc.MockBOutOpenConsumerStreamOutCall, chan Delivery) {
func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClient, *mc.MockBOutOpenConsumerStreamOutCall, chan Delivery) {
host := "testHost"
outputHostClient := new(mc.MockTChanBOutClient)
ackClient := new(mc.MockTChanBOutClient)
wsConnector := new(mc.MockWSConnector)
stream := new(mc.MockBOutOpenConsumerStreamOutCall)
deliveryCh := make(chan Delivery)
reconfigureCh := make(chan reconfigureInfo, 10)
options := &ClientOptions{Timeout: time.Minute}

outputHostClient.On("OpenConsumerStream", mock.Anything).Return(stream, nil)
wsConnector.On("OpenConsumerStream", mock.Anything, mock.Anything).Return(stream, nil)
conn := newOutputHostConnection(outputHostClient, ackClient, wsConnector, "/test/outputhostconnection", "/consumer", options,
conn := newOutputHostConnection(ackClient, wsConnector, "/test/outputhostconnection", "/consumer", options,
deliveryCh, reconfigureCh, host, cherami.Protocol_WS, int32(100), bark.NewLoggerFromLogrus(log.StandardLogger()), metrics.NewNullReporter())

return conn, outputHostClient, ackClient, stream, deliveryCh
return conn, ackClient, stream, deliveryCh
}

func wrapMessageInCommand(msg *cherami.ConsumerMessage) *cherami.OutputHostCommand {
Expand Down