Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async code #25

Merged
merged 13 commits into from
Feb 18, 2019
142 changes: 142 additions & 0 deletions client/cluster_canal_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package client

import (
"errors"
"log"
"time"

pb "github.com/CanalClient/canal-go/protocol"
)

type ClusterCanalConnector struct {
conn *SimpleCanalConnector
canalNode *CanalClusterNode
username, password, destination string
soTimeOut, idleTimeOut int32
filter string

RetryTimes int32
}

func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, password string, destination string,
soTimeOut int32, idleTimeOut int32) *ClusterCanalConnector {

cc := &ClusterCanalConnector{
canalNode: canalNode, username: username, password: password,
destination: destination, soTimeOut: soTimeOut, idleTimeOut: idleTimeOut,
RetryTimes: 3,
}

return cc
}

// 重试失败后重新连接
func (cc *ClusterCanalConnector) reTryWithConn(name string, do func() error) error {
return cc.reTry(name, func() error {
if cc.conn == nil {
cc.Connect()
}
if cc.conn == nil {
return errors.New("canal connect fail")
}
if err := do(); err != nil {
cc.Connect()
return err
}
return nil
})
}

func (cc *ClusterCanalConnector) reTry(name string, do func() error) (err error) {
for times := int32(0); times < cc.RetryTimes; times++ {
if err = do(); err != nil {
log.Printf("%s err:%v, reTry", name, err)
time.Sleep(time.Second * 5)
} else {
return nil
}
}
return
}

func (cc *ClusterCanalConnector) Connect() error {
log.Printf("Connect")

return cc.reTry("Connect", func() error {
var (
addr string
port int
err error
)
cc.DisConnection()
if addr, port, err = cc.canalNode.GetNode(); err != nil {
log.Printf("canalNode.GetNode addr=%s, port=%d, err=%v\n", addr, port, err)
return err
}

conn := NewSimpleCanalConnector(addr, port, cc.username, cc.password,
cc.destination, cc.soTimeOut, cc.idleTimeOut)

if cc.filter != "" {
conn.Filter = cc.filter
}

if err = conn.Connect(); err != nil {
log.Printf("conn.Connect err:%v", err)
conn.DisConnection()
return err
}
cc.conn = conn
return nil
})
}

func (cc *ClusterCanalConnector) DisConnection() {
if cc.conn != nil {
cc.conn.DisConnection()
cc.conn = nil
}
}

func (cc *ClusterCanalConnector) Subscribe(filter string) error {
return cc.reTryWithConn("Subscribe", func() (err error) {
if err = cc.conn.Subscribe(filter); err == nil {
cc.filter = filter
}
return
})
}

func (cc *ClusterCanalConnector) UnSubscribe() error {
return cc.reTryWithConn("UnSubscribe", func() error {
return cc.conn.UnSubscribe()
})
}

func (cc *ClusterCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (msg *pb.Message, err error) {
_ = cc.reTryWithConn("GetWithOutAck", func() error {
msg, err = cc.conn.GetWithOutAck(batchSize, timeOut, units)
return err
})
return
}

func (cc *ClusterCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (msg *pb.Message, err error) {
_ = cc.reTryWithConn("Get", func() error {
msg, err = cc.conn.Get(batchSize, timeOut, units)
return err
})
return
}

func (cc *ClusterCanalConnector) Ack(batchId int64) error {
return cc.reTryWithConn("Ack", func() error {
return cc.conn.Ack(batchId)
})
}

func (cc *ClusterCanalConnector) RollBack(batchId int64) error {
return cc.reTryWithConn("RollBack", func() error {
return cc.conn.RollBack(batchId)
})
}
105 changes: 105 additions & 0 deletions client/cluster_canal_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package client

import (
"encoding/json"
"fmt"
"log"
"math/rand"
"strconv"
"strings"
"time"

"github.com/samuel/go-zookeeper/zk"
)

type ServerRunningData struct {
Cid int64
Address string
Active bool
}

type CanalClusterNode struct {
destination string
clusterAddress []string
runningServer *ServerRunningData
clusterEvent <-chan zk.Event
serverEvent <-chan zk.Event
}

const (
cluster_path = "/otter/canal/destinations/%s/cluster"
running_path = "/otter/canal/destinations/%s/running"
)

func NewCanalClusterNode(destination string, zkServer []string, timeout time.Duration) (canalNode *CanalClusterNode, err error) {
var (
zkClient *zk.Conn
b []byte
cluster []string
clusterEV <-chan zk.Event
serverEV <-chan zk.Event
serverInfo ServerRunningData
)

if zkClient, _, err = zk.Connect(zkServer, timeout); err != nil {
log.Printf("zk.Connect err:%v", err)
return
}
if cluster, _, clusterEV, err = zkClient.ChildrenW(fmt.Sprintf(cluster_path, destination)); err != nil {
log.Printf("zkClient.ChildrenW err:%v", err)
return
}

if b, _, serverEV, err = zkClient.GetW(fmt.Sprintf(running_path, destination)); err != nil {
log.Printf("zkClient.GetW err:%v", err)
return
}

if err = json.Unmarshal(b, &serverInfo); err != nil {
log.Printf("json.Unmarshal err:%v", err)
return
}

canalNode = &CanalClusterNode{
destination: destination,
runningServer: &serverInfo,
clusterEvent: clusterEV,
serverEvent: serverEV,
}

canalNode.InitClusters(cluster)
return
}

func (canalNode *CanalClusterNode) InitClusters(addressList []string) {
rand.Shuffle(len(addressList), func(a, b int) {
addressList[a], addressList[b] = addressList[b], addressList[a]
})
canalNode.clusterAddress = addressList
}

func (canalNode *CanalClusterNode) GetNode() (addr string, port int, err error) {
server := ""
if canalNode.runningServer != nil {
server = canalNode.runningServer.Address
}

if server == "" && len(canalNode.clusterAddress) > 0 {
server = canalNode.clusterAddress[0]
}

if server != "" {
s := strings.Split(server, ":")
if len(s) == 2 {
if port, err = strconv.Atoi(s[1]); err == nil {
addr = s[0]
}
}
} else {
return "", 0, fmt.Errorf("no alive canal server for %s", canalNode.destination)
}
if addr == "" {
return "", 0, fmt.Errorf("error canal cluster server %s", server)
}
return
}
Loading