Skip to content

Commit

Permalink
fix: timeout and shutdown (#121)
Browse files Browse the repository at this point in the history
Fix waitForPruneCondition timeout handling which was not resetting the
connection timeout when new connections came in resulting in the reaper
shutting down incorrectly.

Don't log EOF errors.

Add buffer to connection channels so we don't block the accepting
goroutine.

Fixes testcontainers/testcontainers-go#2348

Co-authored-by: Manuel de la Peña <[email protected]>
  • Loading branch information
stevenh and mdelapenya authored Apr 8, 2024
1 parent 6e00e6e commit 103d99f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 44 deletions.
56 changes: 22 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"net"
"net/url"
Expand Down Expand Up @@ -135,8 +137,9 @@ func main() {

deathNote := sync.Map{}

connectionAccepted := make(chan net.Addr)
connectionLost := make(chan net.Addr)
// Buffered so we don't block the main process.
connectionAccepted := make(chan net.Addr, 10)
connectionLost := make(chan net.Addr, 10)

go processRequests(&deathNote, connectionAccepted, connectionLost)

Expand Down Expand Up @@ -206,8 +209,10 @@ func processRequests(deathNote *sync.Map, connectionAccepted chan<- net.Addr, co
}

if err != nil {
log.Println(err)
break
if !errors.Is(err, io.EOF) {
log.Println(err)
}
return
}
}
}(conn)
Expand All @@ -216,45 +221,28 @@ func processRequests(deathNote *sync.Map, connectionAccepted chan<- net.Addr, co

func waitForPruneCondition(ctx context.Context, connectionAccepted <-chan net.Addr, connectionLost <-chan net.Addr) {
connectionCount := 0
never := make(chan time.Time, 1)
defer close(never)

handleConnectionAccepted := func(addr net.Addr) {
log.Printf("New client connected: %s", addr)
connectionCount++
}

select {
case <-time.After(connectionTimeout):
panic("Timed out waiting for the first connection")
case addr := <-connectionAccepted:
handleConnectionAccepted(addr)
case <-ctx.Done():
log.Println("Signal received")
return
}

timer := time.NewTimer(connectionTimeout)
for {
var noConnectionTimeout <-chan time.Time
if connectionCount == 0 {
noConnectionTimeout = time.After(reconnectionTimeout)
} else {
noConnectionTimeout = never
}

select {
case addr := <-connectionAccepted:
handleConnectionAccepted(addr)
break
log.Printf("New client connected: %s", addr)
connectionCount++
if connectionCount == 1 {
if !timer.Stop() {
<-timer.C
}
}
case addr := <-connectionLost:
log.Printf("Client disconnected: %s", addr.String())
connectionCount--
break
if connectionCount == 0 {
timer.Reset(reconnectionTimeout)
}
case <-ctx.Done():
log.Println("Signal received")
return
case <-noConnectionTimeout:
log.Println("Timed out waiting for re-connection")
case <-timer.C:
log.Println("Timeout waiting for connection")
return
}
}
Expand Down
21 changes: 11 additions & 10 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"context"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -64,26 +64,27 @@ func TestInitialTimeout(t *testing.T) {
// reset connectionTimeout
connectionTimeout = testConnectionTimeout

origWriter := log.Default().Writer()
defer func() {
log.SetOutput(origWriter)
}()
var buf bytes.Buffer
log.SetOutput(&buf)

acc := make(chan net.Addr)
lost := make(chan net.Addr)

done := make(chan string)

go func() {
defer func() {
err := recover().(string)
done <- err
}()
waitForPruneCondition(context.Background(), acc, lost)
done <- buf.String()
}()

select {
case p := <-done:
if !strings.Contains(p, "first connection") {
t.Fail()
}
require.Contains(t, p, "Timeout waiting for connection")
case <-time.After(7 * time.Second):
t.Fail()
t.Fatal("Timeout waiting prune condition")
}
}

Expand Down

0 comments on commit 103d99f

Please sign in to comment.