diff --git a/client/cherami/outputhostconnection_test.go b/client/cherami/outputhostconnection_test.go index 96a8e2b..cea123c 100644 --- a/client/cherami/outputhostconnection_test.go +++ b/client/cherami/outputhostconnection_test.go @@ -29,10 +29,10 @@ import ( "errors" "io" - "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-client-go/common/metrics" mc "github.com/uber/cherami-client-go/mocks/clients/cherami" + "github.com/uber/cherami-thrift/.generated/go/cherami" log "github.com/Sirupsen/logrus" "github.com/stretchr/testify/mock" @@ -54,7 +54,7 @@ func (s *OutputHostConnectionSuite) SetupTest() { } func (s *OutputHostConnectionSuite) TestOutputHostBasic() { - conn, _, _, stream, messagesCh := createOutputHostConnection() + conn, _, stream, messagesCh := createOutputHostConnection() stream.On("Write", mock.Anything).Return(nil) stream.On("Flush").Return(nil) @@ -74,7 +74,7 @@ func (s *OutputHostConnectionSuite) TestOutputHostBasic() { } func (s *OutputHostConnectionSuite) TestReadFailed() { - conn, _, _, stream, _ := createOutputHostConnection() + conn, _, stream, _ := createOutputHostConnection() stream.On("Write", mock.Anything).Return(nil) stream.On("Flush").Return(nil) @@ -90,7 +90,7 @@ func (s *OutputHostConnectionSuite) TestReadFailed() { } func (s *OutputHostConnectionSuite) TestReadEOF() { - conn, _, _, stream, _ := createOutputHostConnection() + conn, _, stream, _ := createOutputHostConnection() stream.On("Write", mock.Anything).Return(nil) stream.On("Flush").Return(nil) @@ -106,7 +106,7 @@ func (s *OutputHostConnectionSuite) TestReadEOF() { } func (s *OutputHostConnectionSuite) TestCreditsRenewSuccess() { - conn, _, _, stream, messagesCh := createOutputHostConnection() + conn, _, stream, messagesCh := createOutputHostConnection() initialFlows := cherami.NewControlFlow() initialFlows.Credits = common.Int32Ptr(conn.prefetchSize) @@ -139,7 +139,7 @@ func (s *OutputHostConnectionSuite) TestCreditsRenewSuccess() { } func (s *OutputHostConnectionSuite) TestInitialCreditsWriteFailed() { - conn, _, _, stream, _ := createOutputHostConnection() + conn, _, stream, _ := createOutputHostConnection() initialFlows := cherami.NewControlFlow() initialFlows.Credits = common.Int32Ptr(conn.prefetchSize) @@ -160,7 +160,7 @@ func (s *OutputHostConnectionSuite) TestInitialCreditsWriteFailed() { } func (s *OutputHostConnectionSuite) TestInitialCreditsFlushFailed() { - conn, _, _, stream, _ := createOutputHostConnection() + conn, _, stream, _ := createOutputHostConnection() initialFlows := cherami.NewControlFlow() initialFlows.Credits = common.Int32Ptr(conn.prefetchSize) @@ -182,7 +182,7 @@ func (s *OutputHostConnectionSuite) TestInitialCreditsFlushFailed() { } func (s *OutputHostConnectionSuite) TestRenewCreditsFailed() { - conn, _, _, stream, messagesCh := createOutputHostConnection() + conn, _, stream, messagesCh := createOutputHostConnection() initialFlows := cherami.NewControlFlow() initialFlows.Credits = common.Int32Ptr(conn.prefetchSize) @@ -216,9 +216,8 @@ func (s *OutputHostConnectionSuite) TestRenewCreditsFailed() { stream.AssertExpectations(s.T()) } -func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClient, *mc.MockTChanBOutClient, *mc.MockBOutOpenConsumerStreamOutCall, chan Delivery) { +func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClient, *mc.MockBOutOpenConsumerStreamOutCall, chan Delivery) { host := "testHost" - outputHostClient := new(mc.MockTChanBOutClient) ackClient := new(mc.MockTChanBOutClient) wsConnector := new(mc.MockWSConnector) stream := new(mc.MockBOutOpenConsumerStreamOutCall) @@ -226,12 +225,11 @@ func createOutputHostConnection() (*outputHostConnection, *mc.MockTChanBOutClien reconfigureCh := make(chan reconfigureInfo, 10) options := &ClientOptions{Timeout: time.Minute} - outputHostClient.On("OpenConsumerStream", mock.Anything).Return(stream, nil) wsConnector.On("OpenConsumerStream", mock.Anything, mock.Anything).Return(stream, nil) - conn := newOutputHostConnection(outputHostClient, ackClient, wsConnector, "/test/outputhostconnection", "/consumer", options, + conn := newOutputHostConnection(ackClient, wsConnector, "/test/outputhostconnection", "/consumer", options, deliveryCh, reconfigureCh, host, cherami.Protocol_WS, int32(100), bark.NewLoggerFromLogrus(log.StandardLogger()), metrics.NewNullReporter()) - return conn, outputHostClient, ackClient, stream, deliveryCh + return conn, ackClient, stream, deliveryCh } func wrapMessageInCommand(msg *cherami.ConsumerMessage) *cherami.OutputHostCommand {