diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 8e41d090790..8958fc2e163 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -299,6 +299,12 @@ func (info *ChangeFeedInfo) FixIncompatible() { info.fixMySQLSinkProtocol() log.Info("Fix incompatibility changefeed sink uri completed", zap.String("changefeed", info.String())) } + + if info.Config.MemoryQuota == uint64(0) { + log.Info("Start fixing incompatible memory quota", zap.String("changefeed", info.String())) + info.fixMemoryQuota() + log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String())) + } } // fixState attempts to fix state loss from upgrading the old owner to the new owner. @@ -428,3 +434,7 @@ func (info *ChangeFeedInfo) HasFastFailError() bool { } return cerror.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) } + +func (info *ChangeFeedInfo) fixMemoryQuota() { + info.Config.FixMemoryQuota() +} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 5c1a5dae443..9164d979daa 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -702,6 +702,53 @@ func TestFixMQSinkProtocol(t *testing.T) { } } +func TestFixMemoryQuotaIncompatible(t *testing.T) { + t.Parallel() + + testCases := []struct { + info *ChangeFeedInfo + expectedMemoryQuota uint64 + }{ + { + info: &ChangeFeedInfo{ + CreatorVersion: "", + SinkURI: "mysql://root:test@127.0.0.1:3306/", + Config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedMemoryQuota: config.DefaultChangefeedMemoryQuota, + }, + { + info: &ChangeFeedInfo{ + CreatorVersion: "6.5.0", + SinkURI: "mysql://root:test@127.0.0.1:3306/", + Config: &config.ReplicaConfig{ + MemoryQuota: 0, + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedMemoryQuota: config.DefaultChangefeedMemoryQuota, + }, + { + info: &ChangeFeedInfo{ + CreatorVersion: "6.5.0", + SinkURI: "mysql://root:test@127.0.0.1:3306/", + Config: &config.ReplicaConfig{ + MemoryQuota: 10485760, + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + }, + }, + expectedMemoryQuota: 10485760, + }, + } + + for _, tc := range testCases { + tc.info.FixIncompatible() + require.Equal(t, tc.expectedMemoryQuota, tc.info.Config.MemoryQuota) + } +} + func TestChangeFeedInfoClone(t *testing.T) { t.Parallel() diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2f3bfccc494..91d042e01a1 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -187,10 +187,18 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { minSyncPointRetention.String())) } } + if c.MemoryQuota == uint64(0) { + c.FixMemoryQuota() + } return nil } +// FixMemoryQuota adjusts memory quota to default value +func (c *ReplicaConfig) FixMemoryQuota() { + c.MemoryQuota = DefaultChangefeedMemoryQuota +} + // GetSinkURIAndAdjustConfigWithSinkURI parses sinkURI as a URI and adjust config with sinkURI. func GetSinkURIAndAdjustConfigWithSinkURI( sinkURIStr string, diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 506f1b528ea..d5d584e4727 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -130,6 +130,18 @@ func TestReplicaConfigValidate(t *testing.T) { require.Equal(t, "d1", rules[0].PartitionRule) require.Equal(t, "p1", rules[1].PartitionRule) require.Equal(t, "", rules[2].PartitionRule) + + // Test memory quota can be adjusted + conf = GetDefaultReplicaConfig() + conf.MemoryQuota = 0 + err = conf.ValidateAndAdjust(nil) + require.NoError(t, err) + require.Equal(t, uint64(DefaultChangefeedMemoryQuota), conf.MemoryQuota) + + conf.MemoryQuota = uint64(1024) + err = conf.ValidateAndAdjust(nil) + require.NoError(t, err) + require.Equal(t, uint64(1024), conf.MemoryQuota) } func TestValidateAndAdjust(t *testing.T) {