From e1c53833f051e069b2d4d5c6d49fa74ba5c2ee5e Mon Sep 17 00:00:00 2001 From: yinheli Date: Fri, 8 Nov 2019 19:47:47 +0800 Subject: [PATCH] fix: subscribe filter not work expected the filter argument of subscribe not work as expected. this commit change the subscribe logic, use the filter to subscribe via tcp protocol. if subscribe success, update the connector's field Filter value and also update the panic behavior of act failure action. --- client/simple_canal_connector.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/simple_canal_connector.go b/client/simple_canal_connector.go index 93a22db..04c9192 100644 --- a/client/simple_canal_connector.go +++ b/client/simple_canal_connector.go @@ -420,7 +420,7 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error { if !c.Running { return nil } - body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: c.Filter}) + body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter}) pack := new(pb.Packet) pack.Type = pb.PacketType_SUBSCRIPTION pack.Body = body @@ -445,13 +445,12 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error { } if ack.GetErrorCode() > 0 { - - panic(errors.New(fmt.Sprintf("failed to subscribe with reason::%s", ack.GetErrorMessage()))) + return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage()) } c.Filter = filter - return nil + return nil } //waitClientRunning 等待客户端跑