From 8af746ef6d920684b30b18fcaeb8630e4682f8ab Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 10 Jul 2013 19:02:58 -0700 Subject: [PATCH 1/2] accept machine list to join cluster --- etcd.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/etcd.go b/etcd.go index 9b4d23f3200..66b8e1fd4f8 100644 --- a/etcd.go +++ b/etcd.go @@ -27,7 +27,8 @@ import ( var verbose bool -var cluster string +var machines string +var cluster []string var address string var clientPort int @@ -51,7 +52,7 @@ var maxSize int func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") - flag.StringVar(&cluster, "C", "", "the ip address and port of a existing cluster") + flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in cluster, sepearate by comma") flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine") flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients") @@ -135,6 +136,8 @@ var info *Info func main() { flag.Parse() + cluster = strings.Split(machines, ",") + // Setup commands. registerCommands() @@ -203,7 +206,7 @@ func startRaft(securityType int) { if raftServer.IsLogEmpty() { // start as a leader in a new cluster - if cluster == "" { + if len(cluster) == 0 { raftServer.StartLeader() time.Sleep(time.Millisecond * 20) @@ -223,9 +226,17 @@ func startRaft(securityType int) { } else { raftServer.StartFollower() - err := joinCluster(raftServer, cluster) + for _, machine := range cluster { + + err := joinCluster(raftServer, machine) + if err != nil { + debug("cannot join to cluster via machine %s", machine) + } else { + break + } + } if err != nil { - fatal(fmt.Sprintln(err)) + fatal("cannot join to cluster via all given machines!") } debug("%s success join to the cluster", raftServer.Name()) } From 45af72c941be41f259b82b2762118b733d88014c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 10 Jul 2013 20:00:05 -0700 Subject: [PATCH 2/2] redirect join to server port --- client_handlers.go | 16 +++++++++++----- etcd.go | 22 ++++++++++++++++------ raft_handlers.go | 2 +- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/client_handlers.go b/client_handlers.go index 934e84adc05..fd447be0ded 100644 --- a/client_handlers.go +++ b/client_handlers.go @@ -51,7 +51,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { (*w).WriteHeader(http.StatusInternalServerError) } - dispatch(command, w, req) + dispatch(command, w, req, true) } @@ -77,7 +77,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) } - dispatch(command, &w, req) + dispatch(command, &w, req, true) } @@ -90,11 +90,11 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { command := &DeleteCommand{} command.Key = key - dispatch(command, w, req) + dispatch(command, w, req, true) } // Dispatch the command to leader -func dispatch(c Command, w *http.ResponseWriter, req *http.Request) { +func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) { if raftServer.State() == "leader" { if body, err := raftServer.Do(c); err != nil { warn("Commit failed %v", err) @@ -132,7 +132,13 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request) { scheme = "http://" } - url := scheme + raftTransporter.GetLeaderClientAddress() + path + var url string + + if client { + url = scheme + raftTransporter.GetLeaderClientAddress() + path + } else { + url = scheme + raftServer.Leader() + path + } debug("Redirect to %s", url) diff --git a/etcd.go b/etcd.go index 66b8e1fd4f8..b30a03339b4 100644 --- a/etcd.go +++ b/etcd.go @@ -425,6 +425,7 @@ func getInfo(path string) *Info { // Delete the old configuration if exist if ignore { + logPath := fmt.Sprintf("%s/log", path) snapshotPath := fmt.Sprintf("%s/snapshotPath", path) os.Remove(infoPath) @@ -507,11 +508,20 @@ func joinCluster(s *raft.Server, serverName string) error { json.NewEncoder(&b).Encode(command) // t must be ok - t, _ := raftServer.Transporter().(transporter) + t, ok := raftServer.Transporter().(transporter) + + if !ok { + panic("wrong type") + } + debug("Send Join Request to %s", serverName) + resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b) + debug("Finish Join Request to %s", serverName) + for { + fmt.Println(err, resp) if err != nil { return fmt.Errorf("Unable to join: %v", err) } @@ -520,17 +530,17 @@ func joinCluster(s *raft.Server, serverName string) error { if resp.StatusCode == http.StatusOK { return nil } - if resp.StatusCode == http.StatusServiceUnavailable { - address, err := ioutil.ReadAll(resp.Body) - if err != nil { - warn("Cannot Read Leader info: %v", err) - } + + if resp.StatusCode == http.StatusTemporaryRedirect { + fmt.Println("redirect") + address = resp.Header.Get("Location") debug("Leader is %s", address) debug("Send Join Request to %s", address) json.NewEncoder(&b).Encode(command) resp, err = t.Post(fmt.Sprintf("%s/join", address), &b) } } + } return fmt.Errorf("Unable to join: %v", err) } diff --git a/raft_handlers.go b/raft_handlers.go index 48b6d10e0ac..523c0193ad8 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -86,7 +86,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { if err := decodeJsonRequest(req, command); err == nil { debug("Receive Join Request from %s", command.Name) - dispatch(command, &w, req) + dispatch(command, &w, req, false) } else { w.WriteHeader(http.StatusInternalServerError) return