From 3a7ee2395d3ac92e24e59f633e3d57bcb92ad583 Mon Sep 17 00:00:00 2001 From: jingyugao <1121087373@qq.com> Date: Thu, 10 Oct 2019 17:30:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=B0=81receiveMessages?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/simple_canal_connector.go | 49 +-------------------------- protocol/Message.go | 57 ++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 48 deletions(-) diff --git a/client/simple_canal_connector.go b/client/simple_canal_connector.go index c2071fa..93a22db 100644 --- a/client/simple_canal_connector.go +++ b/client/simple_canal_connector.go @@ -306,54 +306,7 @@ func (c *SimpleCanalConnector) receiveMessages() (*pb.Message, error) { if err != nil { return nil, err } - p := new(pb.Packet) - err = proto.Unmarshal(data, p) - if err != nil { - return nil, err - } - messages := new(pb.Messages) - message := new(pb.Message) - - length := len(messages.Messages) - message.Entries = make([]pb.Entry, length) - ack := new(pb.Ack) - var items []pb.Entry - var entry pb.Entry - switch p.Type { - case pb.PacketType_MESSAGES: - if !(p.GetCompression() == pb.Compression_NONE) { - panic("compression is not supported in this connector") - } - err := proto.Unmarshal(p.Body, messages) - if err != nil { - return nil, err - } - if c.LazyParseEntry { - message.RawEntries = messages.Messages - } else { - - for _, value := range messages.Messages { - err := proto.Unmarshal(value, &entry) - if err != nil { - return nil, err - } - items = append(items, entry) - } - } - message.Entries = items - message.Id = messages.GetBatchId() - return message, nil - - case pb.PacketType_ACK: - err := proto.Unmarshal(p.Body, ack) - if err != nil { - return nil, err - } - panic(errors.New(fmt.Sprintf("something goes wrong with reason:%s", ack.GetErrorMessage()))) - default: - panic(errors.New(fmt.Sprintf("unexpected packet type:%s", p.Type))) - - } + return pb.Decode(data, c.LazyParseEntry) } //Ack Ack Canal-server的数据(就是昨晚某些逻辑操作后删除canal-server端的数据) diff --git a/protocol/Message.go b/protocol/Message.go index a9530e4..44b8ad3 100644 --- a/protocol/Message.go +++ b/protocol/Message.go @@ -16,6 +16,13 @@ package com_alibaba_otter_canal_protocol +import ( + "errors" + "fmt" + + "github.com/gogo/protobuf/proto" +) + type Message struct { Id int64 Entries []Entry @@ -27,3 +34,53 @@ func NewMessage(id int64) *Message { message := &Message{Id: id, Entries: nil, Raw: false, RawEntries: nil} return message } + +func Decode(data []byte, lazyParseEntry bool) (*Message, error) { + p := new(Packet) + err := proto.Unmarshal(data, p) + if err != nil { + return nil, err + } + messages := new(Messages) + message := new(Message) + + length := len(messages.Messages) + message.Entries = make([]Entry, length) + ack := new(Ack) + var items []Entry + var entry Entry + switch p.Type { + case PacketType_MESSAGES: + if !(p.GetCompression() == Compression_NONE) { + panic("compression is not supported in this connector") + } + err := proto.Unmarshal(p.Body, messages) + if err != nil { + return nil, err + } + if lazyParseEntry { + message.RawEntries = messages.Messages + } else { + + for _, value := range messages.Messages { + err := proto.Unmarshal(value, &entry) + if err != nil { + return nil, err + } + items = append(items, entry) + } + } + message.Entries = items + message.Id = messages.GetBatchId() + return message, nil + + case PacketType_ACK: + err := proto.Unmarshal(p.Body, ack) + if err != nil { + return nil, err + } + panic(errors.New(fmt.Sprintf("something goes wrong with reason:%s", ack.GetErrorMessage()))) + default: + panic(errors.New(fmt.Sprintf("unexpected packet type:%s", p.Type))) + } +}