Skip to content

Commit

Permalink
owner(cdc): fix two metrics problems (#4703)
Browse files Browse the repository at this point in the history
close #4714
  • Loading branch information
liuzix authored Mar 1, 2022
1 parent e9a02f7 commit d141ee6
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 22 deletions.
6 changes: 5 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func (c *Capture) reset(ctx context.Context) error {
if c.pdClock != nil {
c.pdClock.Stop()
}
c.pdClock = pdtime.NewClock(c.PDClient)

c.pdClock, err = pdtime.NewClock(ctx, c.PDClient)
if err != nil {
return errors.Trace(err)
}

if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
Expand Down
33 changes: 22 additions & 11 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,25 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
if err != nil {
return errors.Trace(err)
}

pdTime, _ := ctx.GlobalVars().PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
// so in that case there is no need to advance the global watermarks.
if newCheckpointTs != schedulerv2.CheckpointCannotProceed {
pdTime, _ := ctx.GlobalVars().PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)
if newResolvedTs > barrierTs {
newResolvedTs = barrierTs
}
if newCheckpointTs > barrierTs {
newCheckpointTs = barrierTs
}
c.updateStatus(currentTs, newCheckpointTs, newResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
} else if c.state.Status != nil {
// We should keep the metrics updated even if the scheduler cannot
// advance the watermarks for now.
c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.state.Status.ResolvedTs)
}
return nil
}
Expand Down Expand Up @@ -546,7 +553,17 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs model.Ts) {
func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) {
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) {
c.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
if status == nil {
Expand All @@ -562,13 +579,6 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
}
return status, changed, nil
})
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) Close(ctx cdcContext.Context) {
Expand All @@ -582,6 +592,7 @@ func (c *changefeed) Close(ctx cdcContext.Context) {
changefeedCloseDuration.Observe(costTime.Seconds())
}

// GetInfoProvider returns an InfoProvider if one is available.
func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
if provider, ok := c.scheduler.(schedulerv2.InfoProvider); ok {
return provider
Expand Down
33 changes: 33 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -328,6 +329,38 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {

ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()

conf := config.GetGlobalServerConfig()

// TODO refactor this piece of code when the new scheduler is stabilized,
// and the old scheduler is removed.
if conf.Debug != nil && conf.Debug.EnableNewScheduler {
for cfID, cf := range o.changefeeds {
if cf.state != nil && cf.state.Info != nil {
changefeedStatusGauge.WithLabelValues(cfID).Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}

totalCounts := infoProvider.GetTotalTableCounts()
pendingCounts := infoProvider.GetPendingTableCounts()

for captureID, info := range o.captures {
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeTotal).Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeWip).Set(float64(pendingCounts[captureID]))
}
}
return
}

for changefeedID, changefeedState := range state.Changefeeds {
for captureID := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
Expand Down
6 changes: 6 additions & 0 deletions cdc/owner/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func TestSchedulerNoPeer(t *testing.T) {
mockCluster.Close()
}

func TestInfoProvider(t *testing.T) {
sched := scheduler(new(schedulerV2))
_, ok := sched.(pscheduler.InfoProvider)
require.True(t, ok)
}

func receiveToChannels(
ctx context.Context,
t *testing.T,
Expand Down
40 changes: 40 additions & 0 deletions cdc/scheduler/info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ type InfoProvider interface {

// GetTaskPositions returns the task positions.
GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error)

// GetTotalTableCounts returns the number of tables associated
// with each capture.
GetTotalTableCounts() map[model.CaptureID]int

// GetPendingTableCounts returns the number of tables in a non-ready
// status (Adding & Removing) associated with each capture.
GetPendingTableCounts() map[model.CaptureID]int
}

// GetTaskStatuses implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#tables) cost. USE WITH CARE.
// Functions with cost O(#tables) are NOT recommended for regular metrics
// collection.
func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -67,6 +78,7 @@ func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.T
}

// GetTaskPositions implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#captures) cost.
func (s *BaseScheduleDispatcher) GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -81,3 +93,31 @@ func (s *BaseScheduleDispatcher) GetTaskPositions() (map[model.CaptureID]*model.

return ret, nil
}

// GetTotalTableCounts implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#captures) cost.
func (s *BaseScheduleDispatcher) GetTotalTableCounts() map[model.CaptureID]int {
s.mu.Lock()
defer s.mu.Unlock()

ret := make(map[model.CaptureID]int, len(s.captureStatus))
for captureID := range s.captureStatus {
ret[captureID] = s.tables.CountTableByCaptureID(captureID)
}
return ret
}

// GetPendingTableCounts implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#captures) cost.
func (s *BaseScheduleDispatcher) GetPendingTableCounts() map[model.CaptureID]int {
s.mu.Lock()
defer s.mu.Unlock()

ret := make(map[model.CaptureID]int, len(s.captureStatus))
for captureID := range s.captureStatus {
addCount := s.tables.CountTableByCaptureIDAndStatus(captureID, util.AddingTable)
removeCount := s.tables.CountTableByCaptureIDAndStatus(captureID, util.RemovingTable)
ret[captureID] = addCount + removeCount
}
return ret
}
15 changes: 15 additions & 0 deletions cdc/scheduler/info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,18 @@ func TestInfoProviderTaskPosition(t *testing.T) {
},
}, taskPosition)
}

func TestInfoProviderTableCounts(t *testing.T) {
dispatcher := NewBaseScheduleDispatcher("cf-1", nil, 1300)
injectSchedulerStateForInfoProviderTest(dispatcher)

require.Equal(t, map[model.CaptureID]int{
"capture-1": 3,
"capture-2": 2,
}, dispatcher.GetTotalTableCounts())

require.Equal(t, map[model.CaptureID]int{
"capture-1": 1,
"capture-2": 1,
}, dispatcher.GetPendingTableCounts())
}
43 changes: 38 additions & 5 deletions cdc/scheduler/util/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type TableSet struct {
// a non-unique index to facilitate looking up tables
// assigned to a given capture.
captureIndex map[model.CaptureID]map[model.TableID]*TableRecord

// caches the number of tables in each status associated with
// the given capture.
// This is used to accelerate scheduler decision and metrics
// collection when the number of tables is very high.
statusCounts map[model.CaptureID]map[TableStatus]int
}

// TableRecord is a record to be inserted into TableSet.
Expand Down Expand Up @@ -65,6 +71,7 @@ func NewTableSet() *TableSet {
return &TableSet{
tableIDMap: map[model.TableID]*TableRecord{},
captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{},
statusCounts: map[model.CaptureID]map[TableStatus]int{},
}
}

Expand All @@ -83,8 +90,15 @@ func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) {
captureIndexEntry = make(map[model.TableID]*TableRecord)
s.captureIndex[record.CaptureID] = captureIndexEntry
}

captureIndexEntry[record.TableID] = recordCloned

statusCountEntry := s.statusCounts[record.CaptureID]
if statusCountEntry == nil {
statusCountEntry = make(map[TableStatus]int)
s.statusCounts[record.CaptureID] = statusCountEntry
}
statusCountEntry[record.Status]++

return true
}

Expand All @@ -103,6 +117,9 @@ func (s *TableSet) UpdateTableRecord(record *TableRecord) (successful bool) {
recordCloned := record.Clone()
s.tableIDMap[record.TableID] = recordCloned
s.captureIndex[record.CaptureID][record.TableID] = recordCloned

s.statusCounts[record.CaptureID][oldRecord.Status]--
s.statusCounts[record.CaptureID][record.Status]++
return true
}

Expand Down Expand Up @@ -145,6 +162,13 @@ func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool {
if len(captureIndexEntry) == 0 {
delete(s.captureIndex, record.CaptureID)
}

statusCountEntry, ok := s.statusCounts[record.CaptureID]
if !ok {
log.Panic("unreachable", zap.Int64("tableID", tableID))
}
statusCountEntry[record.Status]--

return true
}

Expand All @@ -164,6 +188,7 @@ func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) []*Ta
ret = append(ret, record)
}
delete(s.captureIndex, captureID)
delete(s.statusCounts, captureID)
return ret
}

Expand All @@ -172,6 +197,16 @@ func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int {
return len(s.captureIndex[captureID])
}

// CountTableByCaptureIDAndStatus counts the number of tables associated with the given captureID
// with the specified status.
func (s *TableSet) CountTableByCaptureIDAndStatus(captureID model.CaptureID, status TableStatus) int {
statusCountEntry, ok := s.statusCounts[captureID]
if !ok {
return 0
}
return statusCountEntry[status]
}

// GetDistinctCaptures counts distinct captures with tables.
func (s *TableSet) GetDistinctCaptures() []model.CaptureID {
var ret []model.CaptureID
Expand Down Expand Up @@ -205,10 +240,8 @@ func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model

// CountTableByStatus counts the number of tables with the given status.
func (s *TableSet) CountTableByStatus(status TableStatus) (count int) {
for _, record := range s.tableIDMap {
if record.Status == status {
count++
}
for _, statusEntryCount := range s.statusCounts {
count += statusEntryCount[status]
}
return
}
15 changes: 15 additions & 0 deletions cdc/scheduler/util/table_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func TestTableSetBasics(t *testing.T) {
CaptureID: "capture-1",
Status: AddingTable,
}, record)
require.Equal(t, 1, ts.CountTableByStatus(AddingTable))
require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-1", AddingTable))
require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-2", AddingTable))

ok = ts.RemoveTableRecord(1)
require.True(t, ok)
Expand Down Expand Up @@ -93,13 +96,18 @@ func TestTableSetCaptures(t *testing.T) {
require.Equal(t, 2, ts.CountTableByCaptureID("capture-2"))
require.Equal(t, 1, ts.CountTableByCaptureID("capture-3"))

require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-1", AddingTable))
require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-2", AddingTable))
require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-3", AddingTable))

ok = ts.AddTableRecord(&TableRecord{
TableID: 6,
CaptureID: "capture-3",
Status: AddingTable,
})
require.True(t, ok)
require.Equal(t, 2, ts.CountTableByCaptureID("capture-3"))
require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-3", AddingTable))

captures := ts.GetDistinctCaptures()
require.Len(t, captures, 3)
Expand All @@ -112,6 +120,8 @@ func TestTableSetCaptures(t *testing.T) {
ok = ts.RemoveTableRecord(4)
require.True(t, ok)

require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-2", AddingTable))

captures = ts.GetDistinctCaptures()
require.Len(t, captures, 2)
require.Contains(t, captures, "capture-1")
Expand Down Expand Up @@ -226,6 +236,7 @@ func TestCountTableByStatus(t *testing.T) {
require.Equal(t, 2, ts.CountTableByStatus(AddingTable))
require.Equal(t, 2, ts.CountTableByStatus(RunningTable))
require.Equal(t, 1, ts.CountTableByStatus(RemovingTable))
require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable))
}

func TestUpdateTableRecord(t *testing.T) {
Expand All @@ -243,6 +254,7 @@ func TestUpdateTableRecord(t *testing.T) {
Status: AddingTable,
})
require.True(t, ok)
require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable))

ok = ts.UpdateTableRecord(&TableRecord{
TableID: 5,
Expand All @@ -255,6 +267,7 @@ func TestUpdateTableRecord(t *testing.T) {
require.True(t, ok)
require.Equal(t, RunningTable, rec.Status)
require.Equal(t, RunningTable, ts.GetAllTablesGroupedByCaptures()["capture-3"][5].Status)
require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable))

ok = ts.UpdateTableRecord(&TableRecord{
TableID: 4,
Expand All @@ -267,4 +280,6 @@ func TestUpdateTableRecord(t *testing.T) {
require.Equal(t, RunningTable, rec.Status)
require.Equal(t, "capture-3", rec.CaptureID)
require.Equal(t, RunningTable, ts.GetAllTablesGroupedByCaptures()["capture-3"][4].Status)
require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable))
require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-3", AddingTable))
}
Loading

0 comments on commit d141ee6

Please sign in to comment.