Skip to content

Commit

Permalink
try fix conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 13, 2022
1 parent 877a689 commit a39f2df
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 14 deletions.
4 changes: 0 additions & 4 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,8 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if err != nil {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
<<<<<<< HEAD
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
=======
ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(rowCount)
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
<<<<<<< HEAD
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
=======
m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MqMessageTypeRow, msg.getSchema(), msg.getTable())
m.IncRowsCount()
ret[i] = m
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,8 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
return nil
}

<<<<<<< HEAD
ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
=======
ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(d.batchSize)
>>>>>>> fc70dbde8 (metrics(cdc): fix mq sink write row count metrics. (#4192))
d.Reset()
return []*MQMessage{ret}
}
Expand Down

0 comments on commit a39f2df

Please sign in to comment.