Skip to content

Commit

Permalink
Merge pull request #38 from jingyugao/split_receiveMessages
Browse files Browse the repository at this point in the history
拆封receiveMessages
  • Loading branch information
withlin authored Oct 11, 2019
2 parents 4a3c5a9 + 3a7ee23 commit 2db66ad
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 48 deletions.
49 changes: 1 addition & 48 deletions client/simple_canal_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,54 +306,7 @@ func (c *SimpleCanalConnector) receiveMessages() (*pb.Message, error) {
if err != nil {
return nil, err
}
p := new(pb.Packet)
err = proto.Unmarshal(data, p)
if err != nil {
return nil, err
}
messages := new(pb.Messages)
message := new(pb.Message)

length := len(messages.Messages)
message.Entries = make([]pb.Entry, length)
ack := new(pb.Ack)
var items []pb.Entry
var entry pb.Entry
switch p.Type {
case pb.PacketType_MESSAGES:
if !(p.GetCompression() == pb.Compression_NONE) {
panic("compression is not supported in this connector")
}
err := proto.Unmarshal(p.Body, messages)
if err != nil {
return nil, err
}
if c.LazyParseEntry {
message.RawEntries = messages.Messages
} else {

for _, value := range messages.Messages {
err := proto.Unmarshal(value, &entry)
if err != nil {
return nil, err
}
items = append(items, entry)
}
}
message.Entries = items
message.Id = messages.GetBatchId()
return message, nil

case pb.PacketType_ACK:
err := proto.Unmarshal(p.Body, ack)
if err != nil {
return nil, err
}
panic(errors.New(fmt.Sprintf("something goes wrong with reason:%s", ack.GetErrorMessage())))
default:
panic(errors.New(fmt.Sprintf("unexpected packet type:%s", p.Type)))

}
return pb.Decode(data, c.LazyParseEntry)
}

//Ack Ack Canal-server的数据(就是昨晚某些逻辑操作后删除canal-server端的数据)
Expand Down
57 changes: 57 additions & 0 deletions protocol/Message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package com_alibaba_otter_canal_protocol

import (
"errors"
"fmt"

"github.com/gogo/protobuf/proto"
)

type Message struct {
Id int64
Entries []Entry
Expand All @@ -27,3 +34,53 @@ func NewMessage(id int64) *Message {
message := &Message{Id: id, Entries: nil, Raw: false, RawEntries: nil}
return message
}

func Decode(data []byte, lazyParseEntry bool) (*Message, error) {
p := new(Packet)
err := proto.Unmarshal(data, p)
if err != nil {
return nil, err
}
messages := new(Messages)
message := new(Message)

length := len(messages.Messages)
message.Entries = make([]Entry, length)
ack := new(Ack)
var items []Entry
var entry Entry
switch p.Type {
case PacketType_MESSAGES:
if !(p.GetCompression() == Compression_NONE) {
panic("compression is not supported in this connector")
}
err := proto.Unmarshal(p.Body, messages)
if err != nil {
return nil, err
}
if lazyParseEntry {
message.RawEntries = messages.Messages
} else {

for _, value := range messages.Messages {
err := proto.Unmarshal(value, &entry)
if err != nil {
return nil, err
}
items = append(items, entry)
}
}
message.Entries = items
message.Id = messages.GetBatchId()
return message, nil

case PacketType_ACK:
err := proto.Unmarshal(p.Body, ack)
if err != nil {
return nil, err
}
panic(errors.New(fmt.Sprintf("something goes wrong with reason:%s", ack.GetErrorMessage())))
default:
panic(errors.New(fmt.Sprintf("unexpected packet type:%s", p.Type)))
}
}

0 comments on commit 2db66ad

Please sign in to comment.