Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9079
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Jun 30, 2023
1 parent 34f6846 commit 5288421
Show file tree
Hide file tree
Showing 45 changed files with 847 additions and 276 deletions.
8 changes: 3 additions & 5 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,8 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
if err != nil {
return nil, err
}
if !replicaCfg.EnableOldValue {
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

<<<<<<< HEAD
protocol := sinkURIParsed.Query().Get(config.ProtocolKey)
if protocol != "" {
replicaCfg.Sink.Protocol = protocol
Expand All @@ -231,6 +227,8 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
"if use force replicate, old value feature must be enabled")
}
}
=======
>>>>>>> 6537ab8fbc (config(ticdc): enable-old-value always false if using avro or csv as the encoding protocol (#9079))
f, err := filter.NewFilter(replicaCfg, "")
if err != nil {
return nil, errors.Cause(err)
Expand Down
29 changes: 28 additions & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
// disable old value but force replicate
cfg.SinkURI = "mysql://"
// disable old value but force replicate, and using mysql sink.
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
require.Nil(t, cfInfo)
cfg.ReplicaConfig.ForceReplicate = false
cfg.ReplicaConfig.IgnoreIneligibleTable = true
cfg.SinkURI = "blackhole://"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.NotNil(t, cfInfo)
Expand Down Expand Up @@ -88,6 +90,19 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.SinkURI = string([]byte{0x7f, ' '})
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)

cfg.StartTs = 0
// use blackhole to workaround
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
cfg.ReplicaConfig.EnableOldValue = true
cfg.ReplicaConfig.ForceReplicate = false
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NoError(t, err)
require.False(t, cfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}

func TestVerifyUpdateChangefeedConfig(t *testing.T) {
Expand Down Expand Up @@ -140,4 +155,16 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)

cfg.StartTs = 0
cfg.TargetTs = 0
cfg.ReplicaConfig.EnableOldValue = true
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NoError(t, err)
require.False(t, newCfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}
33 changes: 7 additions & 26 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type Mounter interface {
type mounter struct {
schemaStorage SchemaStorage
tz *time.Location
enableOldValue bool
changefeedID model.ChangeFeedID
filter pfilter.Filter
metricTotalRows prometheus.Gauge
Expand All @@ -98,14 +97,12 @@ func NewMounter(schemaStorage SchemaStorage,
changefeedID model.ChangeFeedID,
tz *time.Location,
filter pfilter.Filter,
enableOldValue bool,
integrity *integrity.Config,
) Mounter {
return &mounter{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
filter: filter,
schemaStorage: schemaStorage,
changefeedID: changefeedID,
filter: filter,
metricTotalRows: totalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricIgnoredDMLEventCounter: ignoredDMLEventCounter.
Expand Down Expand Up @@ -336,7 +333,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
tableInfo *model.TableInfo, datums map[int64]types.Datum,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand All @@ -358,11 +355,6 @@ func datum2Column(
colName := colInfo.Name.O
colID := colInfo.ID
colDatums, exist := datums[colID]
if !exist && !fillWithDefaultValue {
log.Debug("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}

var (
colValue interface{}
Expand All @@ -372,7 +364,7 @@ func datum2Column(
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else if fillWithDefaultValue {
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
Expand Down Expand Up @@ -511,7 +503,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -532,17 +524,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}
corrupted = true
}

// NOTICE: When the old Value feature is off,
// the Delete event only needs to keep the handle key column.
if row.Delete && !m.enableOldValue {
for i := range preCols {
col := preCols[i]
if col != nil && !col.Flag.IsHandleKey() {
preCols[i] = nil
}
}
}
}

var (
Expand All @@ -551,7 +532,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
current uint32
)
if row.RowExist {
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down
24 changes: 10 additions & 14 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ type MounterGroup interface {
}

type mounterGroup struct {
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
enableOldValue bool
integrity *integrity.Config
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
integrity *integrity.Config

workerNum int

Expand All @@ -56,7 +55,6 @@ const (
func NewMounterGroup(
schemaStorage SchemaStorage,
workerNum int,
enableOldValue bool,
filter filter.Filter,
tz *time.Location,
changefeedID model.ChangeFeedID,
Expand All @@ -66,11 +64,10 @@ func NewMounterGroup(
workerNum = defaultMounterWorkerNum
}
return &mounterGroup{
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
enableOldValue: enableOldValue,
filter: filter,
tz: tz,
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
filter: filter,
tz: tz,

integrity: integrity,

Expand Down Expand Up @@ -111,8 +108,7 @@ func (m *mounterGroup) WaitForReady(_ context.Context) {}
func (m *mounterGroup) Close() {}

func (m *mounterGroup) runWorker(ctx context.Context) error {
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter,
m.enableOldValue, m.integrity)
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.integrity)
for {
select {
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 5288421

Please sign in to comment.