Skip to content

Commit

Permalink
Merge pull request #3 from Haraj-backend/fix/data-race-in-send-msg
Browse files Browse the repository at this point in the history
fix: data race in send message
  • Loading branch information
riandyrn authored Nov 22, 2022
2 parents f5a4e31 + a05be36 commit d67a98f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
2 changes: 2 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ func newChannel(name string) *Channel {

// SendMessage broadcast a message to all clients in a channel.
func (c *Channel) SendMessage(message *Message) {
c.mu.Lock()
c.lastEventID = message.id
c.mu.Unlock()

timer := time.NewTimer(sendMessageToClientTimeout)
defer timer.Stop()
Expand Down
65 changes: 65 additions & 0 deletions sse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,68 @@ func TestServerDontStartServer(t *testing.T) {
// check for number of message received
assert.Equal(t, numChannels*numSubscribersEachChannel, countMessageReceived)
}

func TestMultipleTopics(t *testing.T) {
// The usage pattern: Imagine we have a client which subsribed to multiple topics
// inside a connection. It track changes of specific items state by their ID
// for example.
sendersWg := sync.WaitGroup{}
workerWg := sync.WaitGroup{}
m := sync.Mutex{}
messageCount := 0
numMessages := 3
topics := []string{"Sunday", "Monday", "Tuesday", "Wednesday", "Thuesday", "Friday", "Saturday"}

srv := NewServer(&Options{
Logger: log.New(os.Stdout, "go-sse: ", log.Ldate|log.Ltime|log.Lshortfile),
})

defer srv.Shutdown()

name := "CH-0"
ch := srv.addChannel(name)
fmt.Printf("Channel %s registed\n", name)

// Create new client
c := newClient("", name)
// Add client to current channel
ch.addClient(c)

// receive messages
workerWg.Add(1)
go func() {
defer workerWg.Done()
// Wait for messages in the channel
for msg := range c.send {
m.Lock()
messageCount++
m.Unlock()
fmt.Printf("ID: %s - Topic: %s - Message: %s\n", msg.id, msg.event, msg.data)
}
}()

for _, day := range topics {
sendersWg.Add(1)
go func(topic string) {
defer sendersWg.Done()
for n := 0; n < numMessages; n++ {
srv.SendMessage(name, NewMessage(id(), "hello", topic))
}
}(day)
}
// Wait senders to complete
sendersWg.Wait()

srv.close()
// Wait recipient routine
workerWg.Wait()

if messageCount != len(topics)*numMessages {
t.Errorf("Expected %d messages but got %d", 3*numMessages, messageCount)
}

}

func id() string {
return fmt.Sprintf("%d", time.Now().UTC().Unix()*1000)
}

0 comments on commit d67a98f

Please sign in to comment.