Skip to content

Commit

Permalink
UDP / TCP input: check for nil interface before calling Close() (#273)
Browse files Browse the repository at this point in the history
* check for nil interface before calling Close()

* test fail to bind to port

* test stopping an already stopped operator
  • Loading branch information
Joseph Sirianni authored Sep 27, 2021
1 parent 943c03a commit efb458e
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 11 deletions.
6 changes: 4 additions & 2 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,10 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
func (t *TCPInput) Stop() error {
t.cancel()

if err := t.listener.Close(); err != nil {
return err
if t.listener != nil {
if err := t.listener.Close(); err != nil {
t.Errorf("failed to close TCP connection: %s", err)
}
}

t.wg.Wait()
Expand Down
70 changes: 65 additions & 5 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tcp

import (
"crypto/tls"
"math/rand"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -104,7 +105,10 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {

err = tcpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer tcpInput.Stop()
defer func() {
err := tcpInput.Stop()
require.NoError(t, err, "expected to stop tcp input operator without error")
}()

conn, err := net.Dial("tcp", tcpInput.listener.Addr().String())
require.NoError(t, err)
Expand Down Expand Up @@ -152,7 +156,10 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T)

err = tcpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer tcpInput.Stop()
defer func() {
err := tcpInput.Stop()
require.NoError(t, err, "expected to stop tcp input operator without error")
}()

conn, err := net.Dial("tcp", tcpInput.listener.Addr().String())
require.NoError(t, err)
Expand Down Expand Up @@ -237,7 +244,10 @@ func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {

err = tcpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer tcpInput.Stop()
defer func() {
err := tcpInput.Stop()
require.NoError(t, err, "expected to stop tcp input operator without error")
}()

conn, err := tls.Dial("tcp", tcpInput.listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
require.NoError(t, err)
Expand Down Expand Up @@ -357,6 +367,51 @@ func TestTLSTcpInput(t *testing.T) {
t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestFailToBind(t *testing.T) {
ip := "localhost"
port := 0
minPort := 30000
maxPort := 40000
for i := 1; i < 10; i++ {
port = minPort + rand.Intn(maxPort-minPort+1)
_, err := net.DialTimeout("tcp", net.JoinHostPort(ip, strconv.Itoa(port)), time.Second*2)
if err != nil {
// a failed connection indicates that the port is available for use
break
}
}
if port == 0 {
t.Errorf("failed to find a free port between %d and %d", minPort, maxPort)
}

var startTCP func(port int) (*TCPInput, error) = func(int) (*TCPInput, error) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = net.JoinHostPort(ip, strconv.Itoa(port))
ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]
mockOutput := testutil.Operator{}
tcpInput := op.(*TCPInput)
tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}
entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)
err = tcpInput.Start(testutil.NewMockPersister("test"))
return tcpInput, err
}

first, err := startTCP(port)
require.NoError(t, err, "expected first tcp operator to start")
defer func() {
err := first.Stop()
require.NoError(t, err, "expected to stop tcp input operator without error")
require.NoError(t, first.Stop(), "expected stopping an already stopped operator to not return an error")
}()
_, err = startTCP(port)
require.Error(t, err, "expected second tcp operator to fail to start")
}

func BenchmarkTcpInput(b *testing.B) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
Expand All @@ -376,8 +431,13 @@ func BenchmarkTcpInput(b *testing.B) {
go func() {
conn, err := net.Dial("tcp", tcpInput.listener.Addr().String())
require.NoError(b, err)
defer tcpInput.Stop()
defer conn.Close()
defer func() {
err := tcpInput.Stop()
require.NoError(b, err, "expected to stop tcp input operator without error")

err = conn.Close()
require.NoError(b, err, "expected to close connection without error")
}()
message := []byte("message\n")
for {
select {
Expand Down
6 changes: 4 additions & 2 deletions operator/builtin/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ func (u *UDPInput) readMessage() ([]byte, net.Addr, error) {
// Stop will stop listening for udp messages.
func (u *UDPInput) Stop() error {
u.cancel()
if err := u.connection.Close(); err != nil {
u.Errorf(err.Error())
if u.connection != nil {
if err := u.connection.Close(); err != nil {
u.Errorf("failed to close UDP connection: %s", err)
}
}
u.wg.Wait()
if u.resolver != nil {
Expand Down
62 changes: 60 additions & 2 deletions operator/builtin/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package udp

import (
"math/rand"
"net"
"strconv"
"testing"
Expand Down Expand Up @@ -50,7 +51,10 @@ func udpInputTest(input []byte, expected []string) func(t *testing.T) {

err = udpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer udpInput.Stop()
defer func() {
err := udpInput.Stop()
require.NoError(t, err, "expected to stop udp input operator without error")
}()

conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String())
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +104,10 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T)

err = udpInput.Start(testutil.NewMockPersister("test"))
require.NoError(t, err)
defer udpInput.Stop()
defer func() {
err := udpInput.Stop()
require.NoError(t, err, "expected to stop udp input operator without error")
}()

conn, err := net.Dial("udp", udpInput.connection.LocalAddr().String())
require.NoError(t, err)
Expand Down Expand Up @@ -159,6 +166,57 @@ func TestUDPInputAttributes(t *testing.T) {
t.Run("NewlineInMessage", udpInputAttributesTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}))
}

func TestFailToBind(t *testing.T) {
ip := "localhost"
port := 0
minPort := 30000
maxPort := 40000
for i := 1; 1 < 10; i++ {
port = minPort + rand.Intn(maxPort-minPort+1)
_, err := net.DialTimeout("tcp", net.JoinHostPort(ip, strconv.Itoa(port)), time.Second*2)
if err != nil {
// a failed connection indicates that the port is available for use
break
}
}
if port == 0 {
t.Errorf("failed to find a free port between %d and %d", minPort, maxPort)
}

var startUDP func(port int) (*UDPInput, error) = func(int) (*UDPInput, error) {
cfg := NewUDPInputConfig("test_input")
cfg.ListenAddress = net.JoinHostPort(ip, strconv.Itoa(port))

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
udpInput, ok := op.(*UDPInput)
require.True(t, ok)

udpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = udpInput.Start(testutil.NewMockPersister("test"))
return udpInput, err
}

first, err := startUDP(port)
require.NoError(t, err, "expected first udp operator to start")
defer func() {
err := first.Stop()
require.NoError(t, err, "expected to stop udp input operator without error")
require.NoError(t, first.Stop(), "expected stopping an already stopped operator to not return an error")
}()
_, err = startUDP(port)
require.Error(t, err, "expected second udp operator to fail to start")
}

func BenchmarkUdpInput(b *testing.B) {
cfg := NewUDPInputConfig("test_id")
cfg.ListenAddress = ":0"
Expand Down

0 comments on commit efb458e

Please sign in to comment.