Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/ticdc into fix-strict-col…
Browse files Browse the repository at this point in the history
…lation
  • Loading branch information
lance6716 committed Aug 22, 2022
2 parents aa4bcbd + 10f967d commit cb687db
Show file tree
Hide file tree
Showing 94 changed files with 2,182 additions and 1,462 deletions.
4 changes: 2 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestFixSinkProtocolIncompatible(t *testing.T) {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), "ErrMQSinkUnknownProtocol")
require.Contains(t, err.Error(), "ErrSinkUnknownProtocol")
}
}
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestFixSinkProtocol(t *testing.T) {
require.Equal(t, tc.expectedProtocol, protocol)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), "ErrMQSinkUnknownProtocol")
require.Contains(t, err.Error(), "ErrSinkUnknownProtocol")
}
}

Expand Down
16 changes: 0 additions & 16 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,22 +360,6 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
n.sorter.AddEntry(ctx, event)
}

func (n *sorterNode) TryHandleDataMessage(
ctx context.Context, msg pmessage.Message,
) (bool, error) {
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
n.handleRawEvent(ctx, msg.PolymorphicEvent)
return true, nil
case pmessage.MessageTypeBarrier:
n.updateBarrierTs(msg.BarrierTs)
fallthrough
default:
ctx.(pipeline.NodeContext).SendToNextNode(msg)
return true, nil
}
}

func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) {
if barrierTs > n.BarrierTs() {
atomic.StoreUint64(&n.barrierTs, barrierTs)
Expand Down
52 changes: 15 additions & 37 deletions cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,8 @@ func TestSorterResolvedTs(t *testing.T) {
require.Equal(t, model.Ts(1), sn.ResolvedTs())
require.Equal(t, TableStatePreparing, sn.State())

msg := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 2))
ok, err := sn.TryHandleDataMessage(context.Background(), msg)
require.True(t, ok)
require.Nil(t, err)
msg := model.NewResolvedPolymorphicEvent(0, 2)
sn.handleRawEvent(context.Background(), msg)
require.EqualValues(t, model.Ts(2), sn.ResolvedTs())
require.Equal(t, TableStatePrepared, sn.State())
}
Expand Down Expand Up @@ -203,51 +201,31 @@ func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) {
sn := newSorterNode("tableName", 1, 1, nil, nil, &state,
model.DefaultChangeFeedID("changefeed-id-test"), false, &mockPD{})
sn.sorter = s

ch := make(chan pmessage.Message, 1)
require.Equal(t, model.Ts(1), sn.ResolvedTs())

// Resolved ts must not regress even if there is no barrier ts message.
resolvedTs1 := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 1))
ok, err := sn.TryHandleDataMessage(context.Background(), resolvedTs1)
require.True(t, ok)
require.Nil(t, err)
resolvedTs1 := model.NewResolvedPolymorphicEvent(0, 1)
sn.handleRawEvent(context.Background(), resolvedTs1)
require.EqualValues(t, model.NewResolvedPolymorphicEvent(0, 1), <-sch)
require.Equal(t, TableStatePrepared, sn.State())

// Advance barrier ts.
nctx := pipeline.NewNodeContext(
cdcContext.NewContext(context.Background(), nil),
pmessage.BarrierMessage(2),
ch,
)
msg := nctx.Message()
ok, err = sn.TryHandleDataMessage(nctx, msg)
require.True(t, ok)
require.Nil(t, err)
sn.updateBarrierTs(2)
require.EqualValues(t, 2, sn.BarrierTs())
// Barrier message must be passed to the next node.
require.EqualValues(t, pmessage.BarrierMessage(2), <-ch)

resolvedTs2 := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 2))
ok, err = sn.TryHandleDataMessage(context.Background(), resolvedTs2)
require.True(t, ok)
require.Nil(t, err)
require.EqualValues(t, resolvedTs2.PolymorphicEvent, <-s.Output())
resolvedTs2 := model.NewResolvedPolymorphicEvent(0, 2)
sn.handleRawEvent(context.Background(), resolvedTs2)
require.EqualValues(t, resolvedTs2, <-s.Output())

resolvedTs3 := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 3))
ok, err = sn.TryHandleDataMessage(context.Background(), resolvedTs3)
require.True(t, ok)
require.Nil(t, err)
require.EqualValues(t, resolvedTs2.PolymorphicEvent, <-s.Output())
resolvedTs3 := model.NewResolvedPolymorphicEvent(0, 3)
sn.handleRawEvent(context.Background(), resolvedTs3)
require.EqualValues(t, resolvedTs2, <-s.Output())

resolvedTs4 := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 4))
resolvedTs4 := model.NewResolvedPolymorphicEvent(0, 4)
sn.redoLogEnabled = true
ok, err = sn.TryHandleDataMessage(context.Background(), resolvedTs4)
require.True(t, ok)
require.Nil(t, err)
resolvedTs4 = pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 4))
require.EqualValues(t, resolvedTs4.PolymorphicEvent, <-s.Output())
sn.handleRawEvent(context.Background(), resolvedTs4)
resolvedTs4 = model.NewResolvedPolymorphicEvent(0, 4)
require.EqualValues(t, resolvedTs4, <-s.Output())
}

func TestSorterUpdateBarrierTs(t *testing.T) {
Expand Down
87 changes: 45 additions & 42 deletions cdc/sink/mq/codec/avro.go → cdc/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package codec
package avro

import (
"bytes"
Expand All @@ -30,18 +30,20 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

// AvroEventBatchEncoder converts the events to binary Avro data
type AvroEventBatchEncoder struct {
// BatchEncoder converts the events to binary Avro data
type BatchEncoder struct {
namespace string
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
resultBuf []*MQMessage
keySchemaManager *schemaManager
valueSchemaManager *schemaManager
resultBuf []*common.Message
maxMessageBytes int

enableTiDBExtension bool
Expand All @@ -56,14 +58,14 @@ type avroEncodeResult struct {

// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
func (a *BatchEncoder) AppendRowChangedEvent(
ctx context.Context,
topic string,
e *model.RowChangedEvent,
_ func(),
) error {
log.Debug("AppendRowChangedEvent", zap.Any("rowChangedEvent", e))
mqMessage := newMsg(
message := common.NewMsg(
config.ProtocolAvro,
nil,
nil,
Expand All @@ -87,9 +89,9 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
return errors.Trace(err)
}

mqMessage.Value = evlp
message.Value = evlp
} else {
mqMessage.Value = nil
message.Value = nil
}

res, err := a.avroEncode(ctx, e, topic, true)
Expand All @@ -104,42 +106,42 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
log.Error("AppendRowChangedEvent: could not construct Avro envelope", zap.Error(err))
return errors.Trace(err)
}
mqMessage.Key = evlp
message.Key = evlp
} else {
mqMessage.Key = nil
message.Key = nil
}
mqMessage.IncRowsCount()
message.IncRowsCount()

if mqMessage.Length() > a.maxMessageBytes {
if message.Length() > a.maxMessageBytes {
log.Error(
"Single message too large",
zap.Int(
"maxMessageBytes",
a.maxMessageBytes,
),
zap.Int("length", mqMessage.Length()),
zap.Int("length", message.Length()),
zap.Any("table", e.Table),
)
return cerror.ErrAvroEncodeFailed.GenWithStackByArgs()
}

a.resultBuf = append(a.resultBuf, mqMessage)
a.resultBuf = append(a.resultBuf, message)

return nil
}

// EncodeCheckpointEvent is no-op for now
func (a *AvroEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
func (a *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
return nil, nil
}

// EncodeDDLEvent is no-op now
func (a *AvroEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) {
func (a *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) {
return nil, nil
}

// Build MQ Messages
func (a *AvroEventBatchEncoder) Build() (mqMessages []*MQMessage) {
// Build Messages
func (a *BatchEncoder) Build() (messages []*common.Message) {
old := a.resultBuf
a.resultBuf = nil
return old
Expand All @@ -150,7 +152,7 @@ const (
updateOperation = "u"
)

func (a *AvroEventBatchEncoder) avroEncode(
func (a *BatchEncoder) avroEncode(
ctx context.Context,
e *model.RowChangedEvent,
topic string,
Expand All @@ -160,7 +162,7 @@ func (a *AvroEventBatchEncoder) avroEncode(
cols []*model.Column
colInfos []rowcodec.ColInfo
enableTiDBExtension bool
schemaManager *AvroSchemaManager
schemaManager *schemaManager
operation string
)
if isKey {
Expand Down Expand Up @@ -526,7 +528,7 @@ func columnToAvroSchema(
Parameters: map[string]string{tidbType: tt},
}, nil
case mysql.TypeLonglong: // BIGINT
if col.Flag.IsUnsigned() && bigintUnsignedHandlingMode == bigintUnsignedHandlingModeString {
if col.Flag.IsUnsigned() && bigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString {
return avroSchema{
Type: "string",
Parameters: map[string]string{tidbType: tt},
Expand Down Expand Up @@ -554,7 +556,7 @@ func columnToAvroSchema(
},
}, nil
case mysql.TypeNewDecimal:
if decimalHandlingMode == decimalHandlingModePrecise {
if decimalHandlingMode == common.DecimalHandlingModePrecise {
defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType())
displayFlen, displayDecimal := ft.GetFlen(), ft.GetDecimal()
// length not specified, set it to system type default
Expand Down Expand Up @@ -673,7 +675,7 @@ func columnToAvroData(
case mysql.TypeLonglong:
if v, ok := col.Value.(string); ok {
if col.Flag.IsUnsigned() {
if bigintUnsignedHandlingMode == bigintUnsignedHandlingModeString {
if bigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString {
return v, "string", nil
}
n, err := strconv.ParseUint(v, 10, 64)
Expand All @@ -689,7 +691,7 @@ func columnToAvroData(
return n, "long", nil
}
if col.Flag.IsUnsigned() {
if bigintUnsignedHandlingMode == bigintUnsignedHandlingModeLong {
if bigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeLong {
return int64(col.Value.(uint64)), "long", nil
}
// bigintUnsignedHandlingMode == "string"
Expand All @@ -711,7 +713,7 @@ func columnToAvroData(
}
return []byte(types.NewBinaryLiteralFromUint(col.Value.(uint64), -1)), "bytes", nil
case mysql.TypeNewDecimal:
if decimalHandlingMode == decimalHandlingModePrecise {
if decimalHandlingMode == common.DecimalHandlingModePrecise {
v, succ := new(big.Rat).SetString(col.Value.(string))
if !succ {
return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack(
Expand Down Expand Up @@ -793,23 +795,24 @@ func (r *avroEncodeResult) toEnvelope() ([]byte, error) {
return buf.Bytes(), nil
}

type avroEventBatchEncoderBuilder struct {
type batchEncoderBuilder struct {
namespace string
config *Config
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
config *common.Config
keySchemaManager *schemaManager
valueSchemaManager *schemaManager
}

const (
keySchemaSuffix = "-key"
valueSchemaSuffix = "-value"
)

func newAvroEventBatchEncoderBuilder(ctx context.Context, config *Config) (EncoderBuilder, error) {
// NewBatchEncoderBuilder creates a avro batchEncoderBuilder.
func NewBatchEncoderBuilder(ctx context.Context, config *common.Config) (codec.EncoderBuilder, error) {
keySchemaManager, err := NewAvroSchemaManager(
ctx,
nil,
config.avroSchemaRegistry,
config.AvroSchemaRegistry,
keySchemaSuffix,
)
if err != nil {
Expand All @@ -819,14 +822,14 @@ func newAvroEventBatchEncoderBuilder(ctx context.Context, config *Config) (Encod
valueSchemaManager, err := NewAvroSchemaManager(
ctx,
nil,
config.avroSchemaRegistry,
config.AvroSchemaRegistry,
valueSchemaSuffix,
)
if err != nil {
return nil, errors.Trace(err)
}

return &avroEventBatchEncoderBuilder{
return &batchEncoderBuilder{
namespace: contextutil.ChangefeedIDFromCtx(ctx).Namespace,
config: config,
keySchemaManager: keySchemaManager,
Expand All @@ -835,16 +838,16 @@ func newAvroEventBatchEncoderBuilder(ctx context.Context, config *Config) (Encod
}

// Build an AvroEventBatchEncoder.
func (b *avroEventBatchEncoderBuilder) Build() EventBatchEncoder {
encoder := &AvroEventBatchEncoder{}
func (b *batchEncoderBuilder) Build() codec.EventBatchEncoder {
encoder := &BatchEncoder{}
encoder.namespace = b.namespace
encoder.keySchemaManager = b.keySchemaManager
encoder.valueSchemaManager = b.valueSchemaManager
encoder.resultBuf = make([]*MQMessage, 0, 4096)
encoder.maxMessageBytes = b.config.maxMessageBytes
encoder.enableTiDBExtension = b.config.enableTiDBExtension
encoder.decimalHandlingMode = b.config.avroDecimalHandlingMode
encoder.bigintUnsignedHandlingMode = b.config.avroBigintUnsignedHandlingMode
encoder.resultBuf = make([]*common.Message, 0, 4096)
encoder.maxMessageBytes = b.config.MaxMessageBytes
encoder.enableTiDBExtension = b.config.EnableTiDBExtension
encoder.decimalHandlingMode = b.config.AvroDecimalHandlingMode
encoder.bigintUnsignedHandlingMode = b.config.AvroBigintUnsignedHandlingMode

return encoder
}
Loading

0 comments on commit cb687db

Please sign in to comment.