Skip to content

Commit

Permalink
fix: subscribe filter not work expected
Browse files Browse the repository at this point in the history
the filter argument of subscribe not work as expected.
this commit change the subscribe logic, use the filter to subscribe via tcp protocol.
if subscribe success, update the connector's field Filter value

and also update the panic behavior of act failure action.
  • Loading branch information
yinheli committed Nov 8, 2019
1 parent 2db66ad commit e1c5383
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions client/simple_canal_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error {
if !c.Running {
return nil
}
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: c.Filter})
body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
pack := new(pb.Packet)
pack.Type = pb.PacketType_SUBSCRIPTION
pack.Body = body
Expand All @@ -445,13 +445,12 @@ func (c *SimpleCanalConnector) Subscribe(filter string) error {
}

if ack.GetErrorCode() > 0 {

panic(errors.New(fmt.Sprintf("failed to subscribe with reason::%s", ack.GetErrorMessage())))
return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
}

c.Filter = filter
return nil

return nil
}

//waitClientRunning 等待客户端跑
Expand Down

0 comments on commit e1c5383

Please sign in to comment.