Skip to content

Commit

Permalink
Fix Shutdown()
Browse files Browse the repository at this point in the history
* Add test in sse_test.go for Shutdown() panic and goroutine leak
* Avoid panic: don't close `removeClient` (it's useless and it would require a sort of wait group)
* Avoid leak: don't block on `removeClient<-` when an HTTP Handler ends because of the client being removed already

Shutdown should probably take a Context and return on completion.
  • Loading branch information
tc-hib committed Dec 4, 2020
1 parent 6e69e3d commit 3f4c5ef
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 3 deletions.
1 change: 1 addition & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ func (c *Channel) removeClient(client *Client) {
delete(c.clients, client)
c.mu.Unlock()
close(client.send)
client.removed <- struct{}{}
}
4 changes: 3 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ package sse
type Client struct {
lastEventID,
channel string
send chan *Message
send chan *Message
removed chan struct{}
}

func newClient(lastEventID, channel string) *Client {
return &Client{
lastEventID,
channel,
make(chan *Message),
make(chan struct{}, 1),
}
}

Expand Down
6 changes: 4 additions & 2 deletions sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func (s *Server) ServeHTTP(response http.ResponseWriter, request *http.Request)

go func() {
<-closeNotify
s.removeClient <- c
select {
case s.removeClient <- c:
case <-c.removed:
}
}()

response.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -253,7 +256,6 @@ func (s *Server) dispatch() {
case <-s.shutdown:
s.close()
close(s.addClient)
close(s.removeClient)
close(s.closeChannel)
close(s.shutdown)

Expand Down
40 changes: 40 additions & 0 deletions sse_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package sse

import (
"context"
"fmt"
"go.uber.org/goleak"
"io/ioutil"
"log"
"net/http"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -87,3 +91,39 @@ func TestServer(t *testing.T) {
t.Errorf("Expected %d messages but got %d", channelCount*clientCount, messageCount)
}
}

func TestShutdown(t *testing.T) {
defer goleak.VerifyNone(t)

srv := NewServer(nil)

http.Handle("/events/", srv)

httpServer := &http.Server{Addr: ":3000", Handler: nil}

go func() { _ = httpServer.ListenAndServe() }()

stop := make(chan struct{})

go func() {
r, err := http.Get("http://localhost:3000/events/chan")
if err != nil {
log.Fatalln(err)
return
}
// Stop while client is reading the response
stop <- struct{}{}
_, _ = ioutil.ReadAll(r.Body)
}()

<-stop

srv.Shutdown()

ctx, done := context.WithTimeout(context.Background(), 600*time.Millisecond)
err := httpServer.Shutdown(ctx)
if err != nil {
log.Println(err)
}
done()
}

0 comments on commit 3f4c5ef

Please sign in to comment.