Skip to content

Commit

Permalink
Merge branch 'release-6.5' into cherry-pick-7981-to-release-6.5
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Jan 10, 2023
2 parents fbd7ebf + a4b5efa commit 1c41fdd
Show file tree
Hide file tree
Showing 106 changed files with 2,270 additions and 1,637 deletions.
2 changes: 2 additions & 0 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
_ = c.Error(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped"))
return
}
info.ID = changefeedID.ID
info.Namespace = changefeedID.Namespace

// can only update target-ts, sink-uri
// filter_rules, ignore_txn_start_ts, mounter_worker_num, sink_config
Expand Down
2 changes: 2 additions & 0 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func verifyCreateChangefeedConfig(

// init ChangefeedInfo
info := &model.ChangeFeedInfo{
Namespace: model.DefaultNamespace,
ID: changefeedConfig.ID,
UpstreamID: up.ID,
SinkURI: changefeedConfig.SinkURI,
CreateTime: time.Now(),
Expand Down
2 changes: 2 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}
cfInfo.Namespace = changefeedID.Namespace
cfInfo.ID = changefeedID.ID
upInfo, err := etcdClient.GetUpstreamInfo(ctx, cfInfo.UpstreamID,
cfInfo.Namespace)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type ChangefeedConfig struct {

// ReplicaConfig is a duplicate of config.ReplicaConfig
type ReplicaConfig struct {
MemoryQuota uint64 `json:"memory_quota"`
CaseSensitive bool `json:"case_sensitive"`
EnableOldValue bool `json:"enable_old_value"`
ForceReplicate bool `json:"force_replicate"`
Expand All @@ -107,6 +108,7 @@ type ReplicaConfig struct {
// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res := config.GetDefaultReplicaConfig()
res.MemoryQuota = c.MemoryQuota
res.CaseSensitive = c.CaseSensitive
res.EnableOldValue = c.EnableOldValue
res.ForceReplicate = c.ForceReplicate
Expand Down Expand Up @@ -215,6 +217,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
cloned := c.Clone()
res := &ReplicaConfig{
MemoryQuota: cloned.MemoryQuota,
CaseSensitive: cloned.CaseSensitive,
EnableOldValue: cloned.EnableOldValue,
ForceReplicate: cloned.ForceReplicate,
Expand Down Expand Up @@ -338,7 +341,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: 1000,
FlushIntervalInMs: config.MinFlushIntervalInMs,
Storage: "",
},
}
Expand Down
14 changes: 8 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ func (c *captureImpl) reset(ctx context.Context) error {

c.captureMu.Lock()
defer c.captureMu.Unlock()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: c.config.AdvertiseAddr,
Version: version.ReleaseVersion,
}
c.EtcdClient = etcdClient
c.migrator = migrate.NewMigrator(c.EtcdClient, c.pdEndpoints, c.config)

Expand All @@ -224,12 +229,6 @@ func (c *captureImpl) reset(ctx context.Context) error {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
}

c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: c.config.AdvertiseAddr,
Version: version.ReleaseVersion,
}

if c.upstreamManager != nil {
c.upstreamManager.Close()
}
Expand Down Expand Up @@ -574,6 +573,9 @@ func (c *captureImpl) resign(ctx context.Context) error {
failpoint.Inject("capture-resign-failed", func() {
failpoint.Return(errors.New("capture resign failed"))
})
if c.election == nil {
return nil
}
return cerror.WrapError(cerror.ErrCaptureResignOwner, c.election.resign(ctx))
}

Expand Down
10 changes: 10 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ func (info *ChangeFeedInfo) FixIncompatible() {
info.fixMySQLSinkProtocol()
log.Info("Fix incompatibility changefeed sink uri completed", zap.String("changefeed", info.String()))
}

if info.Config.MemoryQuota == uint64(0) {
log.Info("Start fixing incompatible memory quota", zap.String("changefeed", info.String()))
info.fixMemoryQuota()
log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String()))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
Expand Down Expand Up @@ -428,3 +434,7 @@ func (info *ChangeFeedInfo) HasFastFailError() bool {
}
return cerror.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

func (info *ChangeFeedInfo) fixMemoryQuota() {
info.Config.FixMemoryQuota()
}
47 changes: 47 additions & 0 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,53 @@ func TestFixMQSinkProtocol(t *testing.T) {
}
}

func TestFixMemoryQuotaIncompatible(t *testing.T) {
t.Parallel()

testCases := []struct {
info *ChangeFeedInfo
expectedMemoryQuota uint64
}{
{
info: &ChangeFeedInfo{
CreatorVersion: "",
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedMemoryQuota: config.DefaultChangefeedMemoryQuota,
},
{
info: &ChangeFeedInfo{
CreatorVersion: "6.5.0",
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
MemoryQuota: 0,
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedMemoryQuota: config.DefaultChangefeedMemoryQuota,
},
{
info: &ChangeFeedInfo{
CreatorVersion: "6.5.0",
SinkURI: "mysql://root:[email protected]:3306/",
Config: &config.ReplicaConfig{
MemoryQuota: 10485760,
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedMemoryQuota: 10485760,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
require.Equal(t, tc.expectedMemoryQuota, tc.info.Config.MemoryQuota)
}
}

func TestChangeFeedInfoClone(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -662,7 +663,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
log.Warn("changefeed is removed, but state is not complete", zap.Any("state", c.state))
return
}
if !redo.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
if !redoCfg.IsConsistentEnabled(c.state.Info.Config.Consistent.Level) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,9 @@ func TestRemoveChangefeed(t *testing.T) {
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.MinFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down
20 changes: 12 additions & 8 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func New(
m.redoWorkers = make([]*redoWorker, 0, redoWorkerNum)
m.redoTaskChan = make(chan *redoTask)
m.redoWorkerAvailable = make(chan struct{}, 1)
// TODO: maybe should use at most 1/3 memory quota for redo event cache.
// Use 3/4 memory quota as redo event cache. A large value is helpful to cache hit ratio.
m.eventCache = newRedoEventCache(changefeedID, changefeedInfo.Config.MemoryQuota/4*3)
}

m.startWorkers(changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
Expand All @@ -168,10 +169,9 @@ func New(

// start all workers and report the error to the error channel.
func (m *SinkManager) startWorkers(splitTxn bool, enableOldValue bool) {
redoEnabled := m.redoManager != nil && m.redoManager.Enabled()
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager, m.memQuota,
m.eventCache, redoEnabled, splitTxn, enableOldValue)
m.eventCache, splitTxn, enableOldValue)
m.sinkWorkers = append(m.sinkWorkers, w)
m.wg.Add(1)
go func() {
Expand Down Expand Up @@ -795,28 +795,32 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,

// GetTableStats returns the state of the table.
func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
tableSink, ok := m.tableSinks.Load(tableID)
value, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when getting table stats",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
checkpointTs := tableSink.(*tableSinkWrapper).getCheckpointTs()
tableSink := value.(*tableSinkWrapper)

checkpointTs := tableSink.getCheckpointTs()
m.memQuota.release(tableID, checkpointTs)

var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoManager != nil {
resolvedTs = m.redoManager.GetResolvedTs(tableID)
} else {
resolvedTs = m.sourceManager.GetTableResolvedTs(tableID)
resolvedTs = tableSink.getReceivedSorterResolvedTs()
}

return TableStats{
CheckpointTs: checkpointTs.ResolvedMark(),
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
ReceivedMaxCommitTs: tableSink.(*tableSinkWrapper).getReceivedSorterCommitTs(),
ReceivedMaxResolvedTs: tableSink.(*tableSinkWrapper).getReceivedSorterResolvedTs(),
ReceivedMaxCommitTs: tableSink.getReceivedSorterCommitTs(),
ReceivedMaxResolvedTs: tableSink.getReceivedSorterResolvedTs(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(tableID)
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, uint64(0), tableSink.(*tableSinkWrapper).getCheckpointTs().Ts)
require.Equal(t, uint64(1), tableSink.(*tableSinkWrapper).getCheckpointTs().Ts)
}

func TestClose(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/mem_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func newMemQuota(changefeedID model.ChangeFeedID, totalBytes uint64) *memQuota {
m.metricTotal.Set(float64(totalBytes))
m.metricUsed.Set(float64(0))

log.Info("New memory quota",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeed", changefeedID.ID),
zap.Uint64("total", totalBytes))
return m
}

Expand Down
26 changes: 19 additions & 7 deletions cdc/processor/sinkmanager/redo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type eventAppender struct {

mu sync.Mutex
// If an eventAppender is broken, it means it missed some events.
broken bool
events []*model.RowChangedEvent
sizes []uint64
readyCount int // Count of ready events

// Several RowChangedEvent can come from one PolymorphicEvent.
broken bool
events []*model.RowChangedEvent
sizes []uint64
// Count of ready events.
readyCount int
// Multiple RowChangedEvents can come from one PolymorphicEvent.
pushCounts []byte

// Both of them are included.
Expand Down Expand Up @@ -114,6 +114,7 @@ func (r *redoEventCache) maybeCreateAppender(tableID model.TableID, lowerBound e
if item.broken {
if item.readyCount == 0 {
item.broken = false
item.events = nil
item.lowerBound = lowerBound
item.upperBound = engine.Position{}
} else {
Expand All @@ -135,10 +136,17 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
defer e.mu.Unlock()

if lowerBound.Compare(e.lowerBound) < 0 {
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
res.boundary = e.lowerBound
return
}
if !e.upperBound.Valid() {
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
res.boundary = upperBound.Next()
return
}

res.success = true
if upperBound.Compare(e.upperBound) > 0 {
Expand All @@ -162,7 +170,7 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
} else {
endIdx = sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(upperBound) > 0
return pos.Compare(res.boundary) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
Expand All @@ -176,7 +184,11 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
e.sizes = e.sizes[endIdx:]
e.pushCounts = e.pushCounts[endIdx:]
e.readyCount -= endIdx
// Update boundaries. Set upperBound to invalid if the range has been drained.
e.lowerBound = res.boundary.Next()
if e.lowerBound.Compare(e.upperBound) > 0 {
e.upperBound = engine.Position{}
}

atomic.AddUint64(&e.cache.allocated, ^(res.releaseSize - 1))
e.cache.metricRedoEventCache.Sub(float64(res.releaseSize))
Expand Down
Loading

0 comments on commit 1c41fdd

Please sign in to comment.