Skip to content

Commit

Permalink
Merge pull request #38 from ernoaapa/fix-discovery-server-stop
Browse files Browse the repository at this point in the history
Fix discovery server not stopping properly
  • Loading branch information
ernoaapa authored Apr 5, 2018
2 parents 8ace815 + 34ddabd commit f3fd627
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 28 deletions.
35 changes: 11 additions & 24 deletions pkg/discovery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"sync"
"time"

node "github.com/ernoaapa/eliot/pkg/api/services/node/v1"
Expand All @@ -11,44 +12,30 @@ import (

// Nodes return list of NodeInfos synchronously with given timeout
func Nodes(timeout time.Duration) (nodes []*node.Info, err error) {
results := make(chan *node.Info)
defer close(results)

go func() {
for node := range results {
nodes = append(nodes, node)
}
}()

err = NodesAsync(results, timeout)
if err != nil {
return nil, err
}

return nodes, nil
}

// NodesAsync search for nodes in network asynchronously for given timeout
func NodesAsync(results chan<- *node.Info, timeout time.Duration) error {
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return errors.Wrapf(err, "Failed to initialize new zeroconf resolver")
return nodes, errors.Wrapf(err, "Failed to initialize new zeroconf resolver")
}

// Initialize a waitgroup variable
var wg sync.WaitGroup
wg.Add(1)
entries := make(chan *zeroconf.ServiceEntry)
go func(entries <-chan *zeroconf.ServiceEntry) {
for entry := range entries {
results <- MapToAPIModel(entry)
nodes = append(nodes, MapToAPIModel(entry))
}
wg.Done()
}(entries)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err = resolver.Browse(ctx, ZeroConfServiceName, "", entries)
if err != nil {
return errors.Wrapf(err, "Failed to browse zeroconf nodes")
return nodes, errors.Wrapf(err, "Failed to browse zeroconf nodes")
}

<-ctx.Done()
return nil
wg.Wait()

return nodes, nil
}
20 changes: 20 additions & 0 deletions pkg/discovery/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package discovery

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestClientNodes(t *testing.T) {
server := NewServer("testing", 1234, "v1.0")
go server.Serve()
defer server.Stop()

nodes, err := Nodes(1 * time.Second)
assert.NoError(t, err)
assert.Len(t, nodes, 1)
assert.Equal(t, int64(1234), nodes[0].GrpcPort)
assert.Equal(t, "v1.0", nodes[0].Version)
}
9 changes: 5 additions & 4 deletions pkg/discovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ type Server struct {
// NewServer creates new discovery server
func NewServer(name string, port int, version string) *Server {
return &Server{
Name: name,
Domain: "local.",
Port: port,
Version: version,
Name: name,
Domain: "local.",
Port: port,
Version: version,
shutdown: make(chan bool),
}
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/discovery/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package discovery

import (
"sync"
"testing"
)

func TestServerServeStop(t *testing.T) {
var wg sync.WaitGroup
server := NewServer("testing", 1234, "v1.0")
wg.Add(1)
go func() {
defer wg.Done()
server.Serve()
}()

server.Stop()
wg.Wait()
}

0 comments on commit f3fd627

Please sign in to comment.