Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader: add lock for memory cached checkpoint (#1145) #1223

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 91 additions & 24 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ type RemoteCheckPoint struct {
id string
schema string
tableName string // tableName contains schema name
<<<<<<< HEAD
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected '}', found '<<' (and 7 more errors)

restoringFiles map[string]map[string]FilePosSet
=======
restoringFiles struct {
sync.RWMutex
pos map[string]map[string]FilePosSet // schema -> table -> FilePosSet(filename -> [cur, end])
}
>>>>>>> 109750b7... loader: add lock for memory cached checkpoint (#1145)
finishedTables map[string]struct{}
logCtx *tcontext.Context
}
Expand All @@ -95,12 +102,12 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s
db: db,
conn: dbConns[0],
id: id,
restoringFiles: make(map[string]map[string]FilePosSet),
finishedTables: make(map[string]struct{}),
schema: dbutil.ColumnName(cfg.MetaSchema),
tableName: dbutil.TableName(cfg.MetaSchema, fmt.Sprintf("%s_loader_checkpoint", cfg.Name)),
logCtx: tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))),
}
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet)

err = cp.prepare(tctx)
if err != nil {
Expand Down Expand Up @@ -174,17 +181,19 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
endPos int64
)

cp.restoringFiles = make(map[string]map[string]FilePosSet) // reset to empty
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet) // reset to empty
for rows.Next() {
err := rows.Scan(&filename, &schema, &table, &offset, &endPos)
if err != nil {
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}

if _, ok := cp.restoringFiles[schema]; !ok {
cp.restoringFiles[schema] = make(map[string]FilePosSet)
if _, ok := cp.restoringFiles.pos[schema]; !ok {
cp.restoringFiles.pos[schema] = make(map[string]FilePosSet)
}
tables := cp.restoringFiles[schema]
tables := cp.restoringFiles.pos[schema]
if _, ok := tables[table]; !ok {
tables[table] = make(map[string][]int64)
}
Expand All @@ -197,27 +206,56 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {

// GetRestoringFileInfo implements CheckPoint.GetRestoringFileInfo
func (cp *RemoteCheckPoint) GetRestoringFileInfo(db, table string) map[string][]int64 {
if tables, ok := cp.restoringFiles[db]; ok {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
results := make(map[string][]int64)
if tables, ok := cp.restoringFiles.pos[db]; ok {
if restoringFiles, ok := tables[table]; ok {
return restoringFiles
// make a copy of restoringFiles, and its slice value
for k, v := range restoringFiles {
results[k] = make([]int64, len(v))
copy(results[k], v)
}
return results
}
}
return make(map[string][]int64)
return results
}

// GetAllRestoringFileInfo implements CheckPoint.GetAllRestoringFileInfo
func (cp *RemoteCheckPoint) GetAllRestoringFileInfo() map[string][]int64 {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
results := make(map[string][]int64)
for _, tables := range cp.restoringFiles {
for _, tables := range cp.restoringFiles.pos {
for _, files := range tables {
for file, pos := range files {
results[file] = pos
results[file] = make([]int64, len(pos))
copy(results[file], pos)
}
}
}
return results
}

<<<<<<< HEAD
=======
// IsTableCreated implements CheckPoint.IsTableCreated
func (cp *RemoteCheckPoint) IsTableCreated(db, table string) bool {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
tables, ok := cp.restoringFiles.pos[db]
if !ok {
return false
}
if table == "" {
return true
}
_, ok = tables[table]
return ok
}

>>>>>>> 109750b7... loader: add lock for memory cached checkpoint (#1145)
// IsTableFinished implements CheckPoint.IsTableFinished
func (cp *RemoteCheckPoint) IsTableFinished(db, table string) bool {
key := strings.Join([]string{db, table}, ".")
Expand All @@ -229,8 +267,10 @@ func (cp *RemoteCheckPoint) IsTableFinished(db, table string) bool {

// CalcProgress implements CheckPoint.CalcProgress
func (cp *RemoteCheckPoint) CalcProgress(allFiles map[string]Tables2DataFiles) error {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
cp.finishedTables = make(map[string]struct{}) // reset to empty
for db, tables := range cp.restoringFiles {
for db, tables := range cp.restoringFiles.pos {
dbTables, ok := allFiles[db]
if !ok {
return terror.ErrCheckpointDBNotExistInFile.Generate(db)
Expand Down Expand Up @@ -274,6 +314,23 @@ func (cp *RemoteCheckPoint) allFilesFinished(files map[string][]int64) bool {
return true
}

<<<<<<< HEAD
=======
// AllFinished implements CheckPoint.AllFinished
func (cp *RemoteCheckPoint) AllFinished() bool {
cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
for _, tables := range cp.restoringFiles.pos {
for _, restoringFiles := range tables {
if !cp.allFilesFinished(restoringFiles) {
return false
}
}
}
return true
}

>>>>>>> 109750b7... loader: add lock for memory cached checkpoint (#1145)
// Init implements CheckPoint.Init
func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos int64) error {
idx := strings.Index(filename, ".sql")
Expand Down Expand Up @@ -309,10 +366,12 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream)
}
// checkpoint not exists and no error, cache endPos in memory
if _, ok := cp.restoringFiles[schema]; !ok {
cp.restoringFiles[schema] = make(map[string]FilePosSet)
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
if _, ok := cp.restoringFiles.pos[schema]; !ok {
cp.restoringFiles.pos[schema] = make(map[string]FilePosSet)
}
tables := cp.restoringFiles[schema]
tables := cp.restoringFiles.pos[schema]
if _, ok := tables[table]; !ok {
tables[table] = make(map[string][]int64)
}
Expand Down Expand Up @@ -345,6 +404,21 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
return sql
}

<<<<<<< HEAD
=======
// UpdateOffset implements CheckPoint.UpdateOffset
func (cp *RemoteCheckPoint) UpdateOffset(filename string, offset int64) {
cp.restoringFiles.Lock()
defer cp.restoringFiles.Unlock()
db, table, err := getDBAndTableFromFilename(filename)
if err != nil {
cp.logCtx.L().Error("error in checkpoint UpdateOffset", zap.Error(err))
return
}
cp.restoringFiles.pos[db][table][filename][0] = offset
}

>>>>>>> 109750b7... loader: add lock for memory cached checkpoint (#1145)
// Clear implements CheckPoint.Clear
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s'", cp.tableName, cp.id)
Expand Down Expand Up @@ -379,17 +453,10 @@ func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) {
}

func (cp *RemoteCheckPoint) String() string {
// `String` is often used to log something, it's not a big problem even fail,
// so 1min should be enough.
tctx2, cancel := cp.logCtx.WithTimeout(time.Minute)
defer cancel()

if err := cp.Load(tctx2); err != nil {
return err.Error()
}

cp.restoringFiles.RLock()
defer cp.restoringFiles.RUnlock()
result := make(map[string][]int64)
for _, tables := range cp.restoringFiles {
for _, tables := range cp.restoringFiles.pos {
for _, files := range tables {
for file, set := range files {
result[file] = set
Expand Down
18 changes: 18 additions & 0 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,21 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(err, IsNil)
c.Assert(count, Equals, 0)
}

func (t *testCheckPointSuite) TestDeepCopy(c *C) {
cp := RemoteCheckPoint{}
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet)
cp.restoringFiles.pos["db"] = make(map[string]FilePosSet)
cp.restoringFiles.pos["db"]["table"] = make(map[string][]int64)
cp.restoringFiles.pos["db"]["table"]["file"] = []int64{0, 100}

ret := cp.GetRestoringFileInfo("db", "table")
cp.restoringFiles.pos["db"]["table"]["file"][0] = 10
cp.restoringFiles.pos["db"]["table"]["file2"] = []int64{0, 100}
c.Assert(ret, DeepEquals, map[string][]int64{"file": {0, 100}})

ret = cp.GetAllRestoringFileInfo()
cp.restoringFiles.pos["db"]["table"]["file"][0] = 20
cp.restoringFiles.pos["db"]["table"]["file3"] = []int64{0, 100}
c.Assert(ret, DeepEquals, map[string][]int64{"file": {10, 100}, "file2": {0, 100}})
}