Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#6 from reyoung/feature/refactorize_fr…
Browse files Browse the repository at this point in the history
…amework_proto

Feature/refactorize framework proto
  • Loading branch information
reyoung authored Aug 10, 2017
2 parents 36709d0 + 7202f42 commit c7e8c1a
Show file tree
Hide file tree
Showing 47 changed files with 1,188 additions and 420 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN apt-get update && \
wget unzip unrar tar xz-utils bzip2 gzip coreutils ntp \
curl sed grep graphviz libjpeg-dev zlib1g-dev \
python-matplotlib gcc-4.8 g++-4.8 \
automake locales clang-format-3.8 swig doxygen cmake \
automake locales clang-format swig doxygen cmake \
liblapack-dev liblapacke-dev libboost-dev \
clang-3.8 llvm-3.8 libclang-3.8-dev \
net-tools && \
Expand Down
10 changes: 3 additions & 7 deletions go/glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 14 additions & 10 deletions go/master/service_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
package master_test

import (
"io/ioutil"
"net/url"
"os"
"strings"
"testing"
"time"

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/docker/docker/pkg/ioutils"
"github.com/stretchr/testify/assert"
)

func TestNewServiceWithEtcd(t *testing.T) {
// setup an embed etcd server
etcdDir, err := ioutils.TempDir("", "")
etcdDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
cfg := embed.NewConfig()
lpurl, _ := url.Parse("http://localhost:0")
lcurl, _ := url.Parse("http://localhost:0")
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = etcdDir
e, err := embed.StartEtcd(cfg)
if err != nil {
Expand All @@ -30,15 +36,13 @@ func TestNewServiceWithEtcd(t *testing.T) {
t.Fatal(err)
}
}()
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-time.After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Fatal("Server took too long to start!")
}

ep := []string{"127.0.0.1:2379"}
<-e.Server.ReadyNotify()

port := strings.Split(e.Clients[0].Addr().String(), ":")[1]
endpoint := "127.0.0.1:" + port

ep := []string{endpoint}
masterAddr := "127.0.0.1:3306"
store, err := master.NewEtcdClient(ep, masterAddr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, 30)
if err != nil {
Expand Down
20 changes: 14 additions & 6 deletions go/pserver/client/c/cclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ func cArrayToSlice(p unsafe.Pointer, len int) []byte {

type selector bool

func (s selector) Select() bool {
return bool(s)
func (s selector) Select() (bool, error) {
return bool(s), nil
}

func (s selector) Done() error {
return nil
}

type lister []client.Server
Expand All @@ -114,11 +118,10 @@ func paddle_new_pserver_client(addrs *C.char, selected int) C.paddle_pserver_cli
}

//export paddle_new_etcd_pserver_client
func paddle_new_etcd_pserver_client(etcdEndpoints *C.char, selected int) C.paddle_pserver_client {
// TODO(Longfei: use etcd lock to decide which trainer to initialize the parameters)
func paddle_new_etcd_pserver_client(etcdEndpoints *C.char) C.paddle_pserver_client {
addr := C.GoString(etcdEndpoints)
etcdClient := client.NewEtcd(addr)
c := client.NewClient(etcdClient, etcdClient.Desired(), selector(selected != 0))
c := client.NewClient(etcdClient, etcdClient.Desired(), etcdClient)
return add(c)
}

Expand All @@ -136,7 +139,12 @@ func paddle_pserver_client_release(client C.paddle_pserver_client) {
//export paddle_begin_init_params
func paddle_begin_init_params(client C.paddle_pserver_client) C.int {
c := get(client)
if selected := c.BeginInitParams(); selected {
selected, err := c.BeginInitParams()
if err != nil {
panic(err)
}

if selected {
return 1
}
return 0
Expand Down
10 changes: 7 additions & 3 deletions go/pserver/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (

// TODO(helin): add RPC call retry logic

// Selector selects if the client should initialize parameter servers.
// Selector selects if the client should initialize parameters and
// reports the initialization process done.
type Selector interface {
Select() bool
// Select selects if the client should initialize parameter servers.
Select() (bool, error)
// Done indicates the initialization process is done.
Done() error
}

// Server is the identification of a parameter Server.
Expand Down Expand Up @@ -115,7 +119,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
// servers. Other trainers will be blocked until the initialization is
// done, and they need to get the initialized parameters from
// parameter servers using GetParams.
func (c *Client) BeginInitParams() bool {
func (c *Client) BeginInitParams() (bool, error) {
return c.sel.Select()
}

Expand Down
14 changes: 11 additions & 3 deletions go/pserver/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,12 @@ func initEtcdClient() {

type selector bool

func (s selector) Select() bool {
return bool(s)
func (s selector) Select() (bool, error) {
return bool(s), nil
}

func (s selector) Done() error {
return nil
}

type lister []client.Server
Expand All @@ -135,7 +139,11 @@ func (l lister) List() []client.Server {
}

func testClient(t *testing.T, c *client.Client) {
selected := c.BeginInitParams()
selected, err := c.BeginInitParams()
if err != nil {
t.Fatal(err)
}

if !selected {
t.Fatal("should be selected.")
}
Expand Down
Loading

0 comments on commit c7e8c1a

Please sign in to comment.