Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-flowcontrol
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 25, 2022
2 parents 49ed662 + 83ecda9 commit b837acb
Show file tree
Hide file tree
Showing 15 changed files with 1,462 additions and 8 deletions.
7 changes: 5 additions & 2 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,11 +790,14 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req))

w := s.getSourceWorker(true)
w.RLock()
sourceID := w.cfg.SourceID
w.RUnlock()
if w == nil {
log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
} else if req.Source != w.cfg.SourceID {
log.L().Error("fail to call OperateSchema, because source mismatch", zap.String("request", req.Source), zap.String("current", w.cfg.SourceID))
} else if req.Source != sourceID {
log.L().Error("fail to call OperateSchema, because source mismatch", zap.String("request", req.Source), zap.String("current", sourceID))
return makeCommonWorkerResponse(terror.ErrWorkerSourceNotMatch.Generate()), nil
}

Expand Down
15 changes: 11 additions & 4 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ import (

// SourceWorker manages a source(upstream) which is mainly related to subtasks and relay.
type SourceWorker struct {
// ensure no other operation can be done when closing (we can use `WatGroup`/`Context` to archive this)
// ensure no other operation can be done when closing (we can use `WaitGroup`/`Context` to archive this)
// TODO: check what does it guards. Now it's used to guard relayHolder and relayPurger (maybe subTaskHolder?) since
// query-status maybe access them when closing/disable functionalities
// This lock is used to guards source worker's source config and subtask holder(subtask configs)
sync.RWMutex

wg sync.WaitGroup
Expand Down Expand Up @@ -249,9 +250,12 @@ func (w *SourceWorker) Stop(graceful bool) {
// updateSourceStatus updates w.sourceStatus.
func (w *SourceWorker) updateSourceStatus(ctx context.Context) error {
w.sourceDBMu.Lock()
w.RLock()
cfg := w.cfg
w.RUnlock()
if w.sourceDB == nil {
var err error
w.sourceDB, err = conn.DefaultDBProvider.Apply(&w.cfg.DecryptPassword().From)
w.sourceDB, err = conn.DefaultDBProvider.Apply(&cfg.DecryptPassword().From)
if err != nil {
w.sourceDBMu.Unlock()
return err
Expand All @@ -262,7 +266,7 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context) error {
var status binlog.SourceStatus
ctx, cancel := context.WithTimeout(ctx, utils.DefaultDBTimeout)
defer cancel()
pos, gtidSet, err := utils.GetPosAndGs(ctx, w.sourceDB.DB, w.cfg.Flavor)
pos, gtidSet, err := utils.GetPosAndGs(ctx, w.sourceDB.DB, cfg.Flavor)
if err != nil {
return err
}
Expand Down Expand Up @@ -1160,7 +1164,10 @@ func (w *SourceWorker) observeValidatorStage(ctx context.Context, lastUsedRev in
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
startRevision, err = w.getCurrentValidatorRevision(w.cfg.SourceID)
w.RLock()
sourceID := w.cfg.SourceID
w.RUnlock()
startRevision, err = w.getCurrentValidatorRevision(sourceID)
if err != nil {
log.L().Error("reset validator stage failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
}
Expand Down
5 changes: 4 additions & 1 deletion dm/dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func (w *SourceWorker) GetValidateStatus(stName string, filterStatus pb.Stage) [
if st == nil {
return res
}
sourceIP := w.cfg.From.Host + ":" + strconv.Itoa(w.cfg.From.Port)
w.RLock()
cfg := w.cfg
w.RUnlock()
sourceIP := cfg.From.Host + ":" + strconv.Itoa(cfg.From.Port)
tblStats := st.GetValidatorStatus()
for _, stat := range tblStats {
if filterStatus == pb.Stage_InvalidStage || stat.ValidationStatus == filterStatus.String() {
Expand Down
31 changes: 31 additions & 0 deletions dm/simulator/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package config is the configuration definitions used by the simulator.
package config

// TableConfig is the sub config for describing a simulating table in the data source.
type TableConfig struct {
TableID string `yaml:"id"`
DatabaseName string `yaml:"db"`
TableName string `yaml:"table"`
Columns []*ColumnDefinition `yaml:"columns"`
UniqueKeyColumnNames []string `yaml:"unique_keys"`
}

// ColumnDefinition is the sub config for describing a column in a simulating table.
type ColumnDefinition struct {
ColumnName string `yaml:"name"`
DataType string `yaml:"type"`
DataLen int `yaml:"length"`
}
28 changes: 28 additions & 0 deletions dm/simulator/internal/mcp/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mcp

import (
"github.com/pingcap/errors"
)

var (
// ErrMCPCapacityFull means the capacity of the modification candidate pool (MCP) is full.
ErrMCPCapacityFull = errors.New("the capacity of the modification candidate pool is full")
// ErrInvalidRowID means the row ID of the unique key is invalid.
// For example, when the row ID is greater than the current MCP size, this error will be triggered.
ErrInvalidRowID = errors.New("invalid row ID")
// ErrDeleteUKNotFound means the unique key to be deleted is not found in the MCP.
ErrDeleteUKNotFound = errors.New("delete UK not found")
)
117 changes: 117 additions & 0 deletions dm/simulator/internal/mcp/mcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package mcp defines the Modification Candidate Pool (MCP).
package mcp

import (
"math/rand"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
)

// ModificationCandidatePool is the core container storing all the current unique keys for a table.
type ModificationCandidatePool struct {
sync.RWMutex
keyPool []*UniqueKey
theRand *rand.Rand
randLock sync.Mutex
}

// NewModificationCandidatePool create a new MCP.
func NewModificationCandidatePool(capcity int) *ModificationCandidatePool {
theKeyPool := make([]*UniqueKey, 0, capcity)
theRand := rand.New(rand.NewSource(time.Now().Unix()))
return &ModificationCandidatePool{
keyPool: theKeyPool,
theRand: theRand,
}
}

// NextUK randomly picks a unique key in the MCP.
func (mcp *ModificationCandidatePool) NextUK() *UniqueKey {
mcp.RLock()
defer mcp.RUnlock()
if len(mcp.keyPool) == 0 {
return nil
}
mcp.randLock.Lock()
idx := mcp.theRand.Intn(len(mcp.keyPool))
mcp.randLock.Unlock()
return mcp.keyPool[idx] // pass by reference
}

// Len gets the current length of the MCP.
func (mcp *ModificationCandidatePool) Len() int {
mcp.RLock()
defer mcp.RUnlock()
return len(mcp.keyPool)
}

// AddUK adds the unique key into the MCP.
// It has side effect: the input UK's row ID will be changed.
func (mcp *ModificationCandidatePool) AddUK(uk *UniqueKey) error {
mcp.Lock()
defer mcp.Unlock()
if len(mcp.keyPool)+1 > cap(mcp.keyPool) {
return errors.Trace(ErrMCPCapacityFull)
}
currentLen := len(mcp.keyPool)
uk.SetRowID(currentLen)
mcp.keyPool = append(mcp.keyPool, uk)
return nil
}

// DeleteUK deletes the unique key from the MCP.
// It will get the row ID of the UK and delete the UK on that position.
// If the actual value is different from the input UK, the element will still be deleted.
// It has side effect: after the deletion, the input UK's row ID will be set to -1,
// to prevent deleting a dangling UK multiple times.
func (mcp *ModificationCandidatePool) DeleteUK(uk *UniqueKey) error {
var (
deletedUK *UniqueKey
deleteIdx int
)
if uk == nil {
return nil
}
mcp.Lock()
defer mcp.Unlock()
deleteIdx = uk.GetRowID()
if deleteIdx < 0 {
return errors.Trace(ErrDeleteUKNotFound)
}
if deleteIdx >= len(mcp.keyPool) {
log.L().Error("the delete UK row ID > MCP's total length", zap.Int("delete row ID", deleteIdx), zap.Int("current key pool length", len(mcp.keyPool)))
return errors.Trace(ErrInvalidRowID)
}
deletedUK = mcp.keyPool[deleteIdx]
curLen := len(mcp.keyPool)
lastUK := mcp.keyPool[curLen-1]
lastUK.SetRowID(deleteIdx)
mcp.keyPool[deleteIdx] = lastUK
mcp.keyPool = mcp.keyPool[:curLen-1]
deletedUK.SetRowID(-1)
return nil
}

// Reset cleans up all the items in the MCP.
func (mcp *ModificationCandidatePool) Reset() {
mcp.Lock()
defer mcp.Unlock()
mcp.keyPool = mcp.keyPool[:0]
}
Loading

0 comments on commit b837acb

Please sign in to comment.