Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix datarace while update a channel.lastEventID #30

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,9 @@ func newChannel(name string) *Channel {

// SendMessage broadcast a message to all clients in a channel.
func (c *Channel) SendMessage(message *Message) {
c.mu.Lock()
c.lastEventID = message.id
c.mu.Unlock()

c.mu.RLock()

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alexandrevicenzi/go-sse
module github.com/iscander/go-sse

go 1.11
go 1.15
66 changes: 66 additions & 0 deletions sse_test.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"os"
"sync"
"testing"
"time"
)

func TestNewServerNilOptions(t *testing.T) {
@@ -87,3 +88,68 @@ func TestServer(t *testing.T) {
t.Errorf("Expected %d messages but got %d", channelCount*clientCount, messageCount)
}
}

func TestMultipleTopics(t *testing.T) {
// The usage pattern: Imagine we have a client which subsribed to multiple topics
// inside a connection. It track changes of specific items state by their ID
// for example.
sendersWg := sync.WaitGroup{}
workerWg := sync.WaitGroup{}
m := sync.Mutex{}
messageCount := 0
numMessages := 3
topics := []string{"Sunday", "Monday", "Tuesday", "Wednesday", "Thuesday", "Friday", "Saturday"}

srv := NewServer(&Options{
Logger: log.New(os.Stdout, "go-sse: ", log.Ldate|log.Ltime|log.Lshortfile),
})

defer srv.Shutdown()

name := "CH-0"
ch := srv.addChannel(name)
fmt.Printf("Channel %s registed\n", name)

// Create new client
c := newClient("", name)
// Add client to current channel
ch.addClient(c)

// receive messages
workerWg.Add(1)
go func() {
defer workerWg.Done()
// Wait for messages in the channel
for msg := range c.send {
m.Lock()
messageCount++
m.Unlock()
fmt.Printf("ID: %s - Topic: %s - Message: %s\n", msg.id, msg.event, msg.data)
}
}()

for _, day := range topics {
sendersWg.Add(1)
go func(topic string) {
defer sendersWg.Done()
for n := 0; n < numMessages; n++ {
srv.SendMessage(name, NewMessage(id(), "hello", topic))
}
}(day)
}
// Wait senders to complete
sendersWg.Wait()

srv.close()
// Wait recipient routine
workerWg.Wait()

if messageCount != len(topics)*numMessages {
t.Errorf("Expected %d messages but got %d", 3*numMessages, messageCount)
}

}

func id() string {
return fmt.Sprintf("%d", time.Now().UTC().Unix()*1000)
}