Skip to content

Commit

Permalink
changefeed(ticdc): fix memory quota incompatible from 6.5.0 (#8008)
Browse files Browse the repository at this point in the history
close #8007
  • Loading branch information
amyangfei authored Jan 3, 2023
1 parent 73bb333 commit f8711bd
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ func (info *ChangeFeedInfo) FixIncompatible() {
info.fixMySQLSinkProtocol()
log.Info("Fix incompatibility changefeed sink uri completed", zap.String("changefeed", info.String()))
}

if info.Config.MemoryQuota == uint64(0) {
log.Info("Start fixing incompatible memory quota", zap.String("changefeed", info.String()))
info.fixMemoryQuota()
log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String()))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
Expand Down Expand Up @@ -428,3 +434,7 @@ func (info *ChangeFeedInfo) HasFastFailError() bool {
}
return cerror.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

func (info *ChangeFeedInfo) fixMemoryQuota() {
info.Config.FixMemoryQuota()
}
47 changes: 47 additions & 0 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,53 @@ func TestFixMQSinkProtocol(t *testing.T) {
}
}

func TestFixMemoryQuotaIncompatible(t *testing.T) {
t.Parallel()

testCases := []struct {
info *ChangeFeedInfo
expectedMemoryQuota uint64
}{
{
info: &ChangeFeedInfo{
CreatorVersion: "",
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedMemoryQuota: config.DefaultChangefeedMemoryQuota,
},
{
info: &ChangeFeedInfo{
CreatorVersion: "6.5.0",
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
MemoryQuota: 0,
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedMemoryQuota: config.DefaultChangefeedMemoryQuota,
},
{
info: &ChangeFeedInfo{
CreatorVersion: "6.5.0",
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
MemoryQuota: 10485760,
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedMemoryQuota: 10485760,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
require.Equal(t, tc.expectedMemoryQuota, tc.info.Config.MemoryQuota)
}
}

func TestChangeFeedInfoClone(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 8 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,18 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error {
minSyncPointRetention.String()))
}
}
if c.MemoryQuota == uint64(0) {
c.FixMemoryQuota()
}

return nil
}

// FixMemoryQuota adjusts memory quota to default value
func (c *ReplicaConfig) FixMemoryQuota() {
c.MemoryQuota = DefaultChangefeedMemoryQuota
}

// GetSinkURIAndAdjustConfigWithSinkURI parses sinkURI as a URI and adjust config with sinkURI.
func GetSinkURIAndAdjustConfigWithSinkURI(
sinkURIStr string,
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ func TestReplicaConfigValidate(t *testing.T) {
require.Equal(t, "d1", rules[0].PartitionRule)
require.Equal(t, "p1", rules[1].PartitionRule)
require.Equal(t, "", rules[2].PartitionRule)

// Test memory quota can be adjusted
conf = GetDefaultReplicaConfig()
conf.MemoryQuota = 0
err = conf.ValidateAndAdjust(nil)
require.NoError(t, err)
require.Equal(t, uint64(DefaultChangefeedMemoryQuota), conf.MemoryQuota)

conf.MemoryQuota = uint64(1024)
err = conf.ValidateAndAdjust(nil)
require.NoError(t, err)
require.Equal(t, uint64(1024), conf.MemoryQuota)
}

func TestValidateAndAdjust(t *testing.T) {
Expand Down

0 comments on commit f8711bd

Please sign in to comment.