Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of the project, give more coherence to the Registration struct #36

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ API
* The registeration will stop when the context will be canceled.
* It will return the service uuid and a channel which will send back any modifications made to the service by the other host of the same service. This is usefull for credential synchronisation.
*/
ctx, cancel := context.WithCancel(context.Background())
registration := service.Register(
ctx,
registration, err := service.Register(
"my-service",
&service.Host{
Hostname: "public-domain.dev",
Expand All @@ -40,7 +38,20 @@ registration := service.Register(
"http": "8080",
"https": "80443",
},
})
},
)
if err != nil {
// Register return an error if it fails to initialize
// Then, it reconnects automatically to etcd etc. if required
}

// ...

// To release properly resources
err = registration.Stop()
if err != nil {
// Do something with error
}
```

This will create two different etcd keys:
Expand Down
24 changes: 13 additions & 11 deletions service/get_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"context"
"errors"
"testing"

Expand All @@ -19,21 +18,24 @@ func TestGetNoHost(t *testing.T) {
}

func TestGet(t *testing.T) {
Convey("With registred services", t, func() {
ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())
Convey("With registered services", t, func() {
host1, host2 := genHost("host1"), genHost("host2")
host1.Name = "test_service_get"
host2.Name = "test_service_get"
w1 := Register(ctx1, "test_service_get", host1)
w2 := Register(ctx2, "test_service_get", host2)
w1.WaitRegistration()
w2.WaitRegistration()

r1, err := Register("test_service_get", host1)
So(err, ShouldBeNil)
r2, err := Register("test_service_get", host2)
So(err, ShouldBeNil)

r1.WaitRegistration()
r2.WaitRegistration()

Convey("We should have 2 hosts", func() {
hosts, err := Get("test_service_get").All()
So(err, ShouldBeNil)
So(len(hosts), ShouldEqual, 2)
if hosts[0].UUID == w1.UUID() {
if hosts[0].UUID == r1.UUID() {
host1.UUID = hosts[0].UUID
host2.UUID = hosts[1].UUID
So(hosts[0], ShouldResemble, &host1)
Expand All @@ -46,8 +48,8 @@ func TestGet(t *testing.T) {
}
So(err, ShouldBeNil)
})
cancel1()
cancel2()
So(r1.Stop(), ShouldBeNil)
So(r2.Stop(), ShouldBeNil)
})
}

Expand Down
152 changes: 11 additions & 141 deletions service/register.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package service

import (
"encoding/json"
"fmt"
"time"

errgo "gopkg.in/errgo.v1"

etcd "github.com/coreos/etcd/client"
uuid "github.com/nu7hatch/gouuid"
"golang.org/x/net/context"
)

const (
Expand All @@ -25,15 +19,15 @@ const (
// This service will launch two go routines. The first one will maintain the
// registration every 5 seconds and the second one will check if the service
// credentials don't change and notify otherwise
func Register(ctx context.Context, service string, host Host) *Registration {
func Register(serviceName string, host Host) (*Registration, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you keep the context here to forward it to the NewRegistration function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is just no need of context in here, as I wanted to use the context to stop the registration, but it's not possible.

When a context is canceled, there is no way to way for the registration to remove itself cleanly from etcd synchronously, the entity which canceled the context will keep its execution, like shutting down the application.

Now the Registration can be Stop(), which is synchronous and do it in a clean manner, it's a better way to handle the lifecycle of the Registration. The context cancellation not being used anymore, there was no more requirement of the context.

if !host.Public && len(host.PrivateHostname) == 0 {
host.PrivateHostname = host.Hostname
}

if len(host.PrivateHostname) == 0 {
host.PrivateHostname = hostname
}
host.Name = service
host.Name = serviceName

if len(host.PrivateHostname) != 0 && len(host.PrivatePorts) == 0 {
host.PrivatePorts = host.Ports
Expand All @@ -44,147 +38,23 @@ func Register(ctx context.Context, service string, host Host) *Registration {
hostUuid := fmt.Sprintf("%s-%s", uuid.String(), host.PrivateHostname)
host.UUID = hostUuid

serviceInfos := &Service{
Name: service,
service := Service{
Name: serviceName,
Critical: host.Critical,
Public: host.Public,
}

if host.Public {
serviceInfos.Hostname = host.Hostname
serviceInfos.Ports = host.Ports
serviceInfos.Password = host.Password
serviceInfos.User = host.User
service.Hostname = host.Hostname
service.Ports = host.Ports
service.Password = host.Password
service.User = host.User
}

publicCredentialsChan := make(chan Credentials, 1) // Communication between register and the client
privateCredentialsChan := make(chan Credentials, 1) // Communication between watcher and register

hostKey := fmt.Sprintf("/services/%s/%s", service, hostUuid)
hostJson, _ := json.Marshal(&host)
hostValue := string(hostJson)

serviceKey := fmt.Sprintf("/services_infos/%s", service)
serviceJson, _ := json.Marshal(serviceInfos)
serviceValue := string(serviceJson)

go func() {
ticker := time.NewTicker((HEARTBEAT_DURATION - 1) * time.Second)

// id is the current modification index of the service key.
// this is used for the watcher.
id, err := serviceRegistration(serviceKey, serviceValue)
for err != nil {
id, err = serviceRegistration(serviceKey, serviceValue)
}

err = hostRegistration(hostKey, hostValue)
for err != nil {
err = hostRegistration(hostKey, hostValue)
}

publicCredentialsChan <- Credentials{
User: serviceInfos.User,
Password: serviceInfos.Password,
}

if host.Public {
go watch(ctx, serviceKey, id, privateCredentialsChan)
}

for {
select {
case <-ctx.Done():
_, err := KAPI().Delete(context.Background(), hostKey, &etcd.DeleteOptions{Recursive: false})
if err != nil {
logger.Println("fail to remove key", hostKey)
}
ticker.Stop()
return
case credentials := <-privateCredentialsChan: // If the credentials has benn changed
// We update our cache
host.User = credentials.User
host.Password = credentials.Password
serviceInfos.User = credentials.User
serviceInfos.Password = credentials.Password

// Re-marshal the host
hostJson, _ = json.Marshal(&host)
hostValue = string(hostJson)

// synchro the host informations
hostRegistration(hostKey, hostValue)
// and transmit them to the client
publicCredentialsChan <- credentials
case <-ticker.C:
err := hostRegistration(hostKey, hostValue)
// If for any random reason, there is an error,
// we retry every second until it's ok.
for err != nil {
logger.Printf("lost registration of '%v': %v (%v)", service, err, Client().Endpoints())
time.Sleep(1 * time.Second)

err = hostRegistration(hostKey, hostValue)
if err == nil {
logger.Printf("recover registration of '%v'", service)
}
}
}
}
}()

return NewRegistration(ctx, hostUuid, publicCredentialsChan)
}

func watch(ctx context.Context, serviceKey string, id uint64, credentialsChan chan Credentials) {
// id is the index of the last modification made to the key. The watcher will
// start watching for modifications done after this index. This will prevent
// packet or modification lost.

for {
watcher := KAPI().Watcher(serviceKey, &etcd.WatcherOptions{
AfterIndex: id,
})
resp, err := watcher.Next(ctx)
if err == context.Canceled {
return
}

if err != nil {
// We've lost the connexion to etcd. Speel 1s and retry
logger.Printf("lost watcher of '%v': '%v' (%v)", serviceKey, err, Client().Endpoints())
id = 0
time.Sleep(1 * time.Second)
}
var serviceInfos Service
err = json.Unmarshal([]byte(resp.Node.Value), &serviceInfos)
if err != nil {
logger.Printf("error while getting service key '%v': '%v' (%v)", serviceKey, err, Client().Endpoints())
time.Sleep(1 * time.Second)
}
// We've got the modification, send it to the register agent
id = resp.Node.ModifiedIndex
credentialsChan <- Credentials{
User: serviceInfos.User,
Password: serviceInfos.Password,
}
}
}

func hostRegistration(hostKey, hostJson string) error {
_, err := KAPI().Set(context.Background(), hostKey, hostJson, &etcd.SetOptions{TTL: HEARTBEAT_DURATION * time.Second})
if err != nil {
return errgo.Notef(err, "Unable to register host")
}
return nil

}

func serviceRegistration(serviceKey, serviceJson string) (uint64, error) {
key, err := KAPI().Set(context.Background(), serviceKey, serviceJson, nil)
registration, err := NewRegistration(host, service)
if err != nil {
return 0, errgo.Notef(err, "Unable to register service")
return nil, fmt.Errorf("fail to create new registration: %v", err)
}

return key.Node.ModifiedIndex, nil
return registration, nil
}
Loading