diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 9979781d..7dda9e77 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -283,8 +283,10 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c func (t *TCPInput) Stop() error { t.cancel() - if err := t.listener.Close(); err != nil { - return err + if t.listener != nil { + if err := t.listener.Close(); err != nil { + t.Errorf("failed to close TCP connection: %s", err) + } } t.wg.Wait() diff --git a/operator/builtin/input/tcp/tcp_test.go b/operator/builtin/input/tcp/tcp_test.go index 2e2cd1bd..be62893e 100644 --- a/operator/builtin/input/tcp/tcp_test.go +++ b/operator/builtin/input/tcp/tcp_test.go @@ -16,6 +16,7 @@ package tcp import ( "crypto/tls" + "math/rand" "net" "os" "strconv" @@ -104,7 +105,10 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) { err = tcpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) - defer tcpInput.Stop() + defer func() { + err := tcpInput.Stop() + require.NoError(t, err, "expected to stop tcp input operator without error") + }() conn, err := net.Dial("tcp", tcpInput.listener.Addr().String()) require.NoError(t, err) @@ -152,7 +156,10 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) err = tcpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) - defer tcpInput.Stop() + defer func() { + err := tcpInput.Stop() + require.NoError(t, err, "expected to stop tcp input operator without error") + }() conn, err := net.Dial("tcp", tcpInput.listener.Addr().String()) require.NoError(t, err) @@ -237,7 +244,10 @@ func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) { err = tcpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) - defer tcpInput.Stop() + defer func() { + err := tcpInput.Stop() + require.NoError(t, err, "expected to stop tcp input operator without error") + }() conn, err := tls.Dial("tcp", tcpInput.listener.Addr().String(), &tls.Config{InsecureSkipVerify: true}) require.NoError(t, err) @@ -357,6 +367,51 @@ func TestTLSTcpInput(t *testing.T) { t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"})) } +func TestFailToBind(t *testing.T) { + ip := "localhost" + port := 0 + minPort := 30000 + maxPort := 40000 + for i := 1; i < 10; i++ { + port = minPort + rand.Intn(maxPort-minPort+1) + _, err := net.DialTimeout("tcp", net.JoinHostPort(ip, strconv.Itoa(port)), time.Second*2) + if err != nil { + // a failed connection indicates that the port is available for use + break + } + } + if port == 0 { + t.Errorf("failed to find a free port between %d and %d", minPort, maxPort) + } + + var startTCP func(port int) (*TCPInput, error) = func(int) (*TCPInput, error) { + cfg := NewTCPInputConfig("test_id") + cfg.ListenAddress = net.JoinHostPort(ip, strconv.Itoa(port)) + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + mockOutput := testutil.Operator{} + tcpInput := op.(*TCPInput) + tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput} + entryChan := make(chan *entry.Entry, 1) + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + entryChan <- args.Get(1).(*entry.Entry) + }).Return(nil) + err = tcpInput.Start(testutil.NewMockPersister("test")) + return tcpInput, err + } + + first, err := startTCP(port) + require.NoError(t, err, "expected first tcp operator to start") + defer func() { + err := first.Stop() + require.NoError(t, err, "expected to stop tcp input operator without error") + require.NoError(t, first.Stop(), "expected stopping an already stopped operator to not return an error") + }() + _, err = startTCP(port) + require.Error(t, err, "expected second tcp operator to fail to start") +} + func BenchmarkTcpInput(b *testing.B) { cfg := NewTCPInputConfig("test_id") cfg.ListenAddress = ":0" @@ -376,8 +431,13 @@ func BenchmarkTcpInput(b *testing.B) { go func() { conn, err := net.Dial("tcp", tcpInput.listener.Addr().String()) require.NoError(b, err) - defer tcpInput.Stop() - defer conn.Close() + defer func() { + err := tcpInput.Stop() + require.NoError(b, err, "expected to stop tcp input operator without error") + + err = conn.Close() + require.NoError(b, err, "expected to close connection without error") + }() message := []byte("message\n") for { select { diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 020db87f..0658ca07 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -216,8 +216,10 @@ func (u *UDPInput) readMessage() ([]byte, net.Addr, error) { // Stop will stop listening for udp messages. func (u *UDPInput) Stop() error { u.cancel() - if err := u.connection.Close(); err != nil { - u.Errorf(err.Error()) + if u.connection != nil { + if err := u.connection.Close(); err != nil { + u.Errorf("failed to close UDP connection: %s", err) + } } u.wg.Wait() if u.resolver != nil { diff --git a/operator/builtin/input/udp/udp_test.go b/operator/builtin/input/udp/udp_test.go index 4746c2df..ec2db49c 100644 --- a/operator/builtin/input/udp/udp_test.go +++ b/operator/builtin/input/udp/udp_test.go @@ -15,6 +15,7 @@ package udp import ( + "math/rand" "net" "strconv" "testing" @@ -50,7 +51,10 @@ func udpInputTest(input []byte, expected []string) func(t *testing.T) { err = udpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) - defer udpInput.Stop() + defer func() { + err := udpInput.Stop() + require.NoError(t, err, "expected to stop udp input operator without error") + }() conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String()) require.NoError(t, err) @@ -100,7 +104,10 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T) err = udpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) - defer udpInput.Stop() + defer func() { + err := udpInput.Stop() + require.NoError(t, err, "expected to stop udp input operator without error") + }() conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String()) require.NoError(t, err) @@ -159,6 +166,57 @@ func TestUDPInputAttributes(t *testing.T) { t.Run("NewlineInMessage", udpInputAttributesTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"})) } +func TestFailToBind(t *testing.T) { + ip := "localhost" + port := 0 + minPort := 30000 + maxPort := 40000 + for i := 1; 1 < 10; i++ { + port = minPort + rand.Intn(maxPort-minPort+1) + _, err := net.DialTimeout("tcp", net.JoinHostPort(ip, strconv.Itoa(port)), time.Second*2) + if err != nil { + // a failed connection indicates that the port is available for use + break + } + } + if port == 0 { + t.Errorf("failed to find a free port between %d and %d", minPort, maxPort) + } + + var startUDP func(port int) (*UDPInput, error) = func(int) (*UDPInput, error) { + cfg := NewUDPInputConfig("test_input") + cfg.ListenAddress = net.JoinHostPort(ip, strconv.Itoa(port)) + + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + mockOutput := testutil.Operator{} + udpInput, ok := op.(*UDPInput) + require.True(t, ok) + + udpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput} + + entryChan := make(chan *entry.Entry, 1) + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + entryChan <- args.Get(1).(*entry.Entry) + }).Return(nil) + + err = udpInput.Start(testutil.NewMockPersister("test")) + return udpInput, err + } + + first, err := startUDP(port) + require.NoError(t, err, "expected first udp operator to start") + defer func() { + err := first.Stop() + require.NoError(t, err, "expected to stop udp input operator without error") + require.NoError(t, first.Stop(), "expected stopping an already stopped operator to not return an error") + }() + _, err = startUDP(port) + require.Error(t, err, "expected second udp operator to fail to start") +} + func BenchmarkUdpInput(b *testing.B) { cfg := NewUDPInputConfig("test_id") cfg.ListenAddress = ":0"