Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config(ticdc): enable-old-value always false if using avro or csv as the encoding protocol (#9079) #9318

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
if err != nil {
return nil, err
}
if !replicaCfg.EnableOldValue {
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

protocol := sinkURIParsed.Query().Get(config.ProtocolKey)
if protocol != "" {
replicaCfg.Sink.Protocol = protocol
}
for _, fp := range config.ForceEnableOldValueProtocols {
if replicaCfg.Sink.Protocol == fp {
log.Warn(
"Attempting to replicate without old value enabled. "+
"CDC will enable old value and continue.",
zap.String("protocol", replicaCfg.Sink.Protocol))
replicaCfg.EnableOldValue = true
break
}
}

if replicaCfg.ForceReplicate {
return nil, cerror.ErrOldValueNotEnabled.GenWithStackByArgs(
"if use force replicate, old value feature must be enabled")
}
}
f, err := filter.NewFilter(replicaCfg, "")
if err != nil {
return nil, errors.Cause(err)
Expand Down
29 changes: 28 additions & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
// disable old value but force replicate
cfg.SinkURI = "mysql://"
// disable old value but force replicate, and using mysql sink.
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
require.Nil(t, cfInfo)
cfg.ReplicaConfig.ForceReplicate = false
cfg.ReplicaConfig.IgnoreIneligibleTable = true
cfg.SinkURI = "blackhole://"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.NotNil(t, cfInfo)
Expand Down Expand Up @@ -88,6 +90,19 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.SinkURI = string([]byte{0x7f, ' '})
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)

cfg.StartTs = 0
// use blackhole to workaround
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
cfg.ReplicaConfig.EnableOldValue = true
cfg.ReplicaConfig.ForceReplicate = false
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NoError(t, err)
require.False(t, cfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}

func TestVerifyUpdateChangefeedConfig(t *testing.T) {
Expand Down Expand Up @@ -140,4 +155,16 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)

cfg.StartTs = 0
cfg.TargetTs = 0
cfg.ReplicaConfig.EnableOldValue = true
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NoError(t, err)
require.False(t, newCfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}
33 changes: 7 additions & 26 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type Mounter interface {
type mounter struct {
schemaStorage SchemaStorage
tz *time.Location
enableOldValue bool
changefeedID model.ChangeFeedID
filter pfilter.Filter
metricTotalRows prometheus.Gauge
Expand All @@ -98,14 +97,12 @@ func NewMounter(schemaStorage SchemaStorage,
changefeedID model.ChangeFeedID,
tz *time.Location,
filter pfilter.Filter,
enableOldValue bool,
integrity *integrity.Config,
) Mounter {
return &mounter{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
filter: filter,
schemaStorage: schemaStorage,
changefeedID: changefeedID,
filter: filter,
metricTotalRows: totalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricIgnoredDMLEventCounter: ignoredDMLEventCounter.
Expand Down Expand Up @@ -336,7 +333,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
tableInfo *model.TableInfo, datums map[int64]types.Datum,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand All @@ -358,11 +355,6 @@ func datum2Column(
colName := colInfo.Name.O
colID := colInfo.ID
colDatums, exist := datums[colID]
if !exist && !fillWithDefaultValue {
log.Debug("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}

var (
colValue interface{}
Expand All @@ -372,7 +364,7 @@ func datum2Column(
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else if fillWithDefaultValue {
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
Expand Down Expand Up @@ -511,7 +503,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -532,17 +524,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}
corrupted = true
}

// NOTICE: When the old Value feature is off,
// the Delete event only needs to keep the handle key column.
if row.Delete && !m.enableOldValue {
for i := range preCols {
col := preCols[i]
if col != nil && !col.Flag.IsHandleKey() {
preCols[i] = nil
}
}
}
}

var (
Expand All @@ -551,7 +532,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
current uint32
)
if row.RowExist {
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down
24 changes: 10 additions & 14 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ type MounterGroup interface {
}

type mounterGroup struct {
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
enableOldValue bool
integrity *integrity.Config
schemaStorage SchemaStorage
inputCh chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
integrity *integrity.Config

workerNum int

Expand All @@ -56,7 +55,6 @@ const (
func NewMounterGroup(
schemaStorage SchemaStorage,
workerNum int,
enableOldValue bool,
filter filter.Filter,
tz *time.Location,
changefeedID model.ChangeFeedID,
Expand All @@ -66,11 +64,10 @@ func NewMounterGroup(
workerNum = defaultMounterWorkerNum
}
return &mounterGroup{
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
enableOldValue: enableOldValue,
filter: filter,
tz: tz,
schemaStorage: schemaStorage,
inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize),
filter: filter,
tz: tz,

integrity: integrity,

Expand Down Expand Up @@ -111,8 +108,7 @@ func (m *mounterGroup) WaitForReady(_ context.Context) {}
func (m *mounterGroup) Close() {}

func (m *mounterGroup) runWorker(ctx context.Context) error {
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter,
m.enableOldValue, m.integrity)
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.integrity)
for {
select {
case <-ctx.Done():
Expand Down
11 changes: 4 additions & 7 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
filter, err := filter.NewFilter(config, "")
require.Nil(t, err)
mounter := NewMounter(scheamStorage,
model.DefaultChangeFeedID("c1"),
time.UTC, filter, false,
config.Integrity).(*mounter)
model.DefaultChangeFeedID("c1"), time.UTC, filter, config.Integrity).(*mounter)
mounter.tz = time.Local
ctx := context.Background()

Expand Down Expand Up @@ -1024,8 +1022,7 @@ func TestDecodeRow(t *testing.T) {

schemaStorage.AdvanceResolvedTs(ver.Ver)

mounter := NewMounter(
schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter)

helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`)
helper.Tk().MustExec(`update student set age = 27 where id = 1`)
Expand Down Expand Up @@ -1105,7 +1102,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, true, cfg.Integrity).(*mounter)
mounter := NewMounter(schemaStorage, cfID, time.Local, f, cfg.Integrity).(*mounter)

type testCase struct {
schema string
Expand Down Expand Up @@ -1282,7 +1279,7 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
Expand Down
25 changes: 22 additions & 3 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"net/url"
"regexp"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -261,7 +262,7 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
func (info *ChangeFeedInfo) VerifyAndComplete() {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -285,8 +286,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
if info.Config.Integrity == nil {
info.Config.Integrity = defaultConfig.Integrity
}

return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down Expand Up @@ -320,6 +319,14 @@ func (info *ChangeFeedInfo) FixIncompatible() {
inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66()
info.fixScheduler(inheritV66)
log.Info("Fix incompatible scheduler completed", zap.String("changefeed", info.String()))

if creatorVersionGate.ChangefeedAdjustEnableOldValueByProtocol() {
log.Info("Start fixing incompatible enable old value", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
info.fixEnableOldValue()
log.Info("Fix incompatible enable old value completed", zap.String("changefeed", info.String()),
zap.Bool("enableOldValue", info.Config.EnableOldValue))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
Expand Down Expand Up @@ -390,6 +397,18 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
}
}

func (info *ChangeFeedInfo) fixEnableOldValue() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
// this is impossible to happen, since the changefeed registered successfully.
log.Warn("parse sink URI failed", zap.Error(err))
return
}
scheme := strings.ToLower(uri.Scheme)
protocol := uri.Query().Get(config.ProtocolKey)
info.Config.AdjustEnableOldValue(scheme, protocol)
}

func (info *ChangeFeedInfo) fixMQSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ func TestVerifyAndComplete(t *testing.T) {
},
}

err := info.VerifyAndComplete()
require.Nil(t, err)
info.VerifyAndComplete()
require.Equal(t, SortUnified, info.Engine)

marshalConfig1, err := info.Config.Marshal()
Expand Down
1 change: 0 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,6 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {

p.mg.r = entry.NewMounterGroup(p.ddlHandler.r.schemaStorage,
p.changefeed.Info.Config.Mounter.WorkerNum,
p.changefeed.Info.Config.EnableOldValue,
p.filter, tz, p.changefeedID, p.changefeed.Info.Config.Integrity)
p.mg.name = "MounterGroup"
p.mg.spawn(stdCtx)
Expand Down
9 changes: 1 addition & 8 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func convertRowChangedEvents(
// This indicates that it is an update event,
// and after enable old value internally by default(but disable in the configuration).
// We need to handle the update event to be compatible with the old format.
if !enableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen {
if e.Row.IsUpdate() && !enableOldValue {
if shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if err != nil {
Expand Down Expand Up @@ -507,13 +507,6 @@ func splitUpdateEvent(
deleteEvent.RawKV = &deleteEventRowKV

deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if deleteEvent.Row.PreColumns[i] != nil &&
!deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}

insertEvent := *updateEvent
insertEventRow := *updateEvent.Row
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ error = '''
illegal parameter for sorter: %s
'''

["CDC:ErrIncompatibleConfig"]
error = '''
incompatible configuration
'''

["CDC:ErrIncompatibleSinkConfig"]
error = '''
incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri
Expand Down
Loading