diff --git a/client/cluster_canal_connector.go b/client/cluster_canal_connector.go index d26e42a..23d50f9 100644 --- a/client/cluster_canal_connector.go +++ b/client/cluster_canal_connector.go @@ -22,6 +22,8 @@ type ClusterCanalConnector struct { RetryTimes int32 currentSequence string zkVersion int32 + + Path string } const ( @@ -32,12 +34,14 @@ const ( func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) (*ClusterCanalConnector,error) { - err := checkRootPath(canalNode.zkClient) + destinationPath := fmt.Sprintf("%s/%s", path, destination) + + err := checkRootPath(canalNode.zkClient, destinationPath) if err != nil { return nil, err } - currentSequence, err := createEphemeralSequence(canalNode.zkClient) + currentSequence, err := createEphemeralSequence(canalNode.zkClient, destinationPath) if err != nil { return nil,err } @@ -52,6 +56,7 @@ func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, pass RetryTimes: 0, currentSequence:currentSequence, zkVersion:0, + Path: destinationPath, } return cluster, nil @@ -89,7 +94,7 @@ func (cc *ClusterCanalConnector) doConnect() error { return fmt.Errorf("error wait become first zk node %s", err.Error()) } - _, err = cc.canalNode.zkClient.Set(path+"/"+cc.currentSequence, []byte{runningFlag}, cc.zkVersion) + _, err = cc.canalNode.zkClient.Set(cc.Path+"/"+cc.currentSequence, []byte{runningFlag}, cc.zkVersion) if err != nil { return fmt.Errorf("error set running flag %s", err.Error()) } @@ -117,8 +122,8 @@ func (cc *ClusterCanalConnector) doConnect() error { func (cc *ClusterCanalConnector) DisConnection() error { if cc.conn != nil { cc.conn.DisConnection() - _, stat, _ := cc.canalNode.zkClient.Get(path + "/" + cc.currentSequence) - err := cc.canalNode.zkClient.Delete(path+"/"+cc.currentSequence, stat.Version) + _, stat, _ := cc.canalNode.zkClient.Get(cc.Path + "/" + cc.currentSequence) + err := cc.canalNode.zkClient.Delete(cc.Path+"/"+cc.currentSequence, stat.Version) if err != nil { return fmt.Errorf("error delete temp consumer path %s", err.Error()) } @@ -206,8 +211,9 @@ func (cc *ClusterCanalConnector) RollBack(batchId int64) error { return nil } -func createEphemeralSequence(zkClient *zk.Conn) (string, error) { - node, err := zkClient.Create(path+"/", []byte{notRunningFlag}, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll)) +func createEphemeralSequence(zkClient *zk.Conn, destinationPath string) (string, error) { + node, err := zkClient.Create(destinationPath+"/", []byte{notRunningFlag}, zk.FlagEphemeral|zk.FlagSequence, + zk.WorldACL(zk.PermAll)) if err != nil { return "", err } @@ -216,23 +222,33 @@ func createEphemeralSequence(zkClient *zk.Conn) (string, error) { return currentSequence, nil } -func checkRootPath(zkClient *zk.Conn) error { - exists, _, err := zkClient.Exists(path) +func checkRootPath(zkClient *zk.Conn, destinationPath string) error { + rootExists, _, err := zkClient.Exists(path) if err != nil { return err } - if !exists { + if !rootExists { _, err := zkClient.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { return err } } + exists, _, err := zkClient.Exists(destinationPath) + if err != nil { + return err + } + if !exists { + _, err := zkClient.Create(destinationPath, []byte{}, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + return err + } + } return nil } func (cc *ClusterCanalConnector) waitBecomeFirst() error { zkClient := cc.canalNode.zkClient - children, _, err := zkClient.Children(path) + children, _, err := zkClient.Children(cc.Path) if err != nil { return err } @@ -249,7 +265,7 @@ func (cc *ClusterCanalConnector) waitBecomeFirst() error { for i, child := range children { if cc.currentSequence == child { noSelf = false - previousPath := path + "/" + children[i-1] + previousPath := cc.Path + "/" + children[i-1] //阻塞等待上一个比他小的节点删除 log.Println("waiting") err := waitDelete(zkClient, previousPath) @@ -264,7 +280,7 @@ func (cc *ClusterCanalConnector) waitBecomeFirst() error { if noSelf { //以防万一 - cc.currentSequence, err = createEphemeralSequence(zkClient) + cc.currentSequence, err = createEphemeralSequence(zkClient, cc.Path) if err != nil { return err }