Skip to content

Commit

Permalink
simulator(dm): implement some core components (#4838)
Browse files Browse the repository at this point in the history
ref #4835
  • Loading branch information
dsdashun authored Apr 25, 2022
1 parent eb637b1 commit 06f54d0
Show file tree
Hide file tree
Showing 10 changed files with 1,422 additions and 0 deletions.
31 changes: 31 additions & 0 deletions dm/simulator/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package config is the configuration definitions used by the simulator.
package config

// TableConfig is the sub config for describing a simulating table in the data source.
type TableConfig struct {
TableID string `yaml:"id"`
DatabaseName string `yaml:"db"`
TableName string `yaml:"table"`
Columns []*ColumnDefinition `yaml:"columns"`
UniqueKeyColumnNames []string `yaml:"unique_keys"`
}

// ColumnDefinition is the sub config for describing a column in a simulating table.
type ColumnDefinition struct {
ColumnName string `yaml:"name"`
DataType string `yaml:"type"`
DataLen int `yaml:"length"`
}
28 changes: 28 additions & 0 deletions dm/simulator/internal/mcp/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mcp

import (
"github.com/pingcap/errors"
)

var (
// ErrMCPCapacityFull means the capacity of the modification candidate pool (MCP) is full.
ErrMCPCapacityFull = errors.New("the capacity of the modification candidate pool is full")
// ErrInvalidRowID means the row ID of the unique key is invalid.
// For example, when the row ID is greater than the current MCP size, this error will be triggered.
ErrInvalidRowID = errors.New("invalid row ID")
// ErrDeleteUKNotFound means the unique key to be deleted is not found in the MCP.
ErrDeleteUKNotFound = errors.New("delete UK not found")
)
117 changes: 117 additions & 0 deletions dm/simulator/internal/mcp/mcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package mcp defines the Modification Candidate Pool (MCP).
package mcp

import (
"math/rand"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
)

// ModificationCandidatePool is the core container storing all the current unique keys for a table.
type ModificationCandidatePool struct {
sync.RWMutex
keyPool []*UniqueKey
theRand *rand.Rand
randLock sync.Mutex
}

// NewModificationCandidatePool create a new MCP.
func NewModificationCandidatePool(capcity int) *ModificationCandidatePool {
theKeyPool := make([]*UniqueKey, 0, capcity)
theRand := rand.New(rand.NewSource(time.Now().Unix()))
return &ModificationCandidatePool{
keyPool: theKeyPool,
theRand: theRand,
}
}

// NextUK randomly picks a unique key in the MCP.
func (mcp *ModificationCandidatePool) NextUK() *UniqueKey {
mcp.RLock()
defer mcp.RUnlock()
if len(mcp.keyPool) == 0 {
return nil
}
mcp.randLock.Lock()
idx := mcp.theRand.Intn(len(mcp.keyPool))
mcp.randLock.Unlock()
return mcp.keyPool[idx] // pass by reference
}

// Len gets the current length of the MCP.
func (mcp *ModificationCandidatePool) Len() int {
mcp.RLock()
defer mcp.RUnlock()
return len(mcp.keyPool)
}

// AddUK adds the unique key into the MCP.
// It has side effect: the input UK's row ID will be changed.
func (mcp *ModificationCandidatePool) AddUK(uk *UniqueKey) error {
mcp.Lock()
defer mcp.Unlock()
if len(mcp.keyPool)+1 > cap(mcp.keyPool) {
return errors.Trace(ErrMCPCapacityFull)
}
currentLen := len(mcp.keyPool)
uk.SetRowID(currentLen)
mcp.keyPool = append(mcp.keyPool, uk)
return nil
}

// DeleteUK deletes the unique key from the MCP.
// It will get the row ID of the UK and delete the UK on that position.
// If the actual value is different from the input UK, the element will still be deleted.
// It has side effect: after the deletion, the input UK's row ID will be set to -1,
// to prevent deleting a dangling UK multiple times.
func (mcp *ModificationCandidatePool) DeleteUK(uk *UniqueKey) error {
var (
deletedUK *UniqueKey
deleteIdx int
)
if uk == nil {
return nil
}
mcp.Lock()
defer mcp.Unlock()
deleteIdx = uk.GetRowID()
if deleteIdx < 0 {
return errors.Trace(ErrDeleteUKNotFound)
}
if deleteIdx >= len(mcp.keyPool) {
log.L().Error("the delete UK row ID > MCP's total length", zap.Int("delete row ID", deleteIdx), zap.Int("current key pool length", len(mcp.keyPool)))
return errors.Trace(ErrInvalidRowID)
}
deletedUK = mcp.keyPool[deleteIdx]
curLen := len(mcp.keyPool)
lastUK := mcp.keyPool[curLen-1]
lastUK.SetRowID(deleteIdx)
mcp.keyPool[deleteIdx] = lastUK
mcp.keyPool = mcp.keyPool[:curLen-1]
deletedUK.SetRowID(-1)
return nil
}

// Reset cleans up all the items in the MCP.
func (mcp *ModificationCandidatePool) Reset() {
mcp.Lock()
defer mcp.Unlock()
mcp.keyPool = mcp.keyPool[:0]
}
Loading

0 comments on commit 06f54d0

Please sign in to comment.