Skip to content

Commit

Permalink
Merge branch '2.0-dev' into quit_cdc_task_when_pause
Browse files Browse the repository at this point in the history
  • Loading branch information
sukki37 authored Nov 29, 2024
2 parents c696d03 + a41cc75 commit 2079bda
Show file tree
Hide file tree
Showing 54 changed files with 2,074 additions and 771 deletions.
24 changes: 19 additions & 5 deletions pkg/cdc/watermark_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (

getWatermarkFormat = "select watermark from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s' and table_id = '%s'"

getWatermarkCountFormat = "select count(1) from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s'"
getAllWatermarkFormat = "select table_id, watermark from mo_catalog.mo_cdc_watermark where account_id = %d and task_id = '%s'"

updateWatermarkFormat = "update mo_catalog.mo_cdc_watermark set watermark='%s' where account_id = %d and task_id = '%s' and table_id = '%s'"

Expand Down Expand Up @@ -128,14 +128,28 @@ func (u *WatermarkUpdater) GetFromDb(tableIdStr string) (watermark types.TS, err
return types.StringToTS(watermarkStr), nil
}

func (u *WatermarkUpdater) GetCountFromDb() (uint64, error) {
sql := fmt.Sprintf(getWatermarkCountFormat, u.accountId, u.taskId)
func (u *WatermarkUpdater) GetAllFromDb() (mp map[string]types.TS, err error) {
sql := fmt.Sprintf(getAllWatermarkFormat, u.accountId, u.taskId)
ctx := defines.AttachAccountId(context.Background(), catalog.System_Account)
res := u.ie.Query(ctx, sql, ie.SessionOverrideOptions{})
if res.Error() != nil {
return 0, res.Error()
err = res.Error()
return
}

var tableIdStr string
var watermarkStr string
mp = make(map[string]types.TS)
for i := uint64(0); i < res.RowCount(); i++ {
if tableIdStr, err = res.GetString(ctx, i, 0); err != nil {
return
}
if watermarkStr, err = res.GetString(ctx, i, 1); err != nil {
return
}
mp[tableIdStr] = types.StringToTS(watermarkStr)
}
return res.GetUint64(ctx, 0, 0)
return
}

func (u *WatermarkUpdater) UpdateMem(tableIdStr string, watermark types.TS) {
Expand Down
81 changes: 51 additions & 30 deletions pkg/cdc/watermark_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,35 @@ package cdc
import (
"context"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type wmMockSQLExecutor struct {
mp map[string]string
insertRe *regexp.Regexp
updateRe *regexp.Regexp
selectRe *regexp.Regexp
mp map[string]string
insertRe *regexp.Regexp
updateRe *regexp.Regexp
selectRe *regexp.Regexp
selectAllRe *regexp.Regexp
}

func newWmMockSQLExecutor() *wmMockSQLExecutor {
return &wmMockSQLExecutor{
mp: make(map[string]string),
insertRe: regexp.MustCompile(`^insert .* values \(.*\, .*\, \'(.*)\'\, .*\, .*\, \'(.*)\'\, \'\'\)$`),
updateRe: regexp.MustCompile(`^update .* set watermark\=\'(.*)\' where .* and table_id \= '(.*)'$`),
selectRe: regexp.MustCompile(`^select .* and table_id \= '(.*)'$`),
mp: make(map[string]string),
insertRe: regexp.MustCompile(`^insert .* values \(.*\, .*\, \'(.*)\'\, .*\, .*\, \'(.*)\'\, \'\'\)$`),
updateRe: regexp.MustCompile(`^update .* set watermark\=\'(.*)\' where .* and table_id \= '(.*)'$`),
selectRe: regexp.MustCompile(`^select .* and table_id \= '(.*)'$`),
selectAllRe: regexp.MustCompile(`^select .* where account_id \= (.*) and task_id .*`),
}
}

Expand Down Expand Up @@ -97,7 +100,7 @@ func (res *internalExecResult) Column(ctx context.Context, i uint64) (name strin
}

func (res *internalExecResult) RowCount() uint64 {
return 1
return uint64(len(res.resultSet.Data))
}

func (res *internalExecResult) Row(ctx context.Context, i uint64) ([]interface{}, error) {
Expand All @@ -117,17 +120,25 @@ func (res *internalExecResult) GetString(ctx context.Context, i uint64, j uint64
}

func (m *wmMockSQLExecutor) Query(ctx context.Context, sql string, pts ie.SessionOverrideOptions) ie.InternalExecResult {
if strings.HasPrefix(sql, "select count") {
return &internalExecResult{
affectedRows: 1,
resultSet: &MysqlResultSet{
Columns: nil,
Name2Index: nil,
Data: [][]interface{}{
{uint64(len(m.mp))},
if strings.HasPrefix(sql, "select table_id") {
matches := m.selectAllRe.FindStringSubmatch(sql)
accountId, _ := strconv.Atoi(matches[1])
if accountId == 1 { // normal path
var data [][]interface{}
for k, v := range m.mp {
data = append(data, []interface{}{k, v})
}
return &internalExecResult{
affectedRows: 1,
resultSet: &MysqlResultSet{
Columns: nil,
Name2Index: nil,
Data: data,
},
},
err: nil,
err: nil,
}
} else { // error path
return &internalExecResult{err: moerr.NewInternalErrorNoCtx("error")}
}
} else if strings.HasPrefix(sql, "select") {
matches := m.selectRe.FindStringSubmatch(sql)
Expand Down Expand Up @@ -212,9 +223,9 @@ func TestWatermarkUpdater_DbOps(t *testing.T) {
}

// ---------- init count is 0
count, err := u.GetCountFromDb()
mp, err := u.GetAllFromDb()
assert.NoError(t, err)
assert.Equal(t, uint64(0), count)
assert.Equal(t, 0, len(mp))

// ---------- insert into a record
t1 := types.BuildTS(1, 1)
Expand All @@ -226,9 +237,9 @@ func TestWatermarkUpdater_DbOps(t *testing.T) {
err = u.InsertIntoDb(info1, t1)
assert.NoError(t, err)
// count is 1
count, err = u.GetCountFromDb()
mp, err = u.GetAllFromDb()
assert.NoError(t, err)
assert.Equal(t, uint64(1), count)
assert.Equal(t, 1, len(mp))
// get value of tableId 1
actual, err := u.GetFromDb("1_0")
assert.NoError(t, err)
Expand Down Expand Up @@ -263,17 +274,17 @@ func TestWatermarkUpdater_DbOps(t *testing.T) {
err = u.DeleteFromDb("1_0")
assert.NoError(t, err)
// count is 2
count, err = u.GetCountFromDb()
mp, err = u.GetAllFromDb()
assert.NoError(t, err)
assert.Equal(t, uint64(2), count)
assert.Equal(t, 2, len(mp))

// ---------- delete all
err = u.DeleteAllFromDb()
assert.NoError(t, err)
// count is 0
count, err = u.GetCountFromDb()
mp, err = u.GetAllFromDb()
assert.NoError(t, err)
assert.Equal(t, uint64(0), count)
assert.Equal(t, 0, len(mp))
}

func TestWatermarkUpdater_Run(t *testing.T) {
Expand Down Expand Up @@ -337,3 +348,13 @@ func TestWatermarkUpdater_flushAll(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, t2, actual)
}

func TestWatermarkUpdater_GetAllFromDb(t *testing.T) {
u := &WatermarkUpdater{
accountId: 2,
taskId: uuid.New(),
ie: newWmMockSQLExecutor(),
}
_, err := u.GetAllFromDb()
assert.Error(t, err)
}
Loading

0 comments on commit 2079bda

Please sign in to comment.