From 68cb5b11875223745893b819dc2da21f13421c10 Mon Sep 17 00:00:00 2001 From: veeding <104374143+veeding@users.noreply.github.com> Date: Tue, 14 Jun 2022 08:53:34 +0800 Subject: [PATCH] add goroutine to update taskinfo --- .../internal/service/importer/importer.go | 55 +++++++++++-------- .../internal/service/importer/taskdb.go | 9 +++ 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/server-v2/api/studio/internal/service/importer/importer.go b/server-v2/api/studio/internal/service/importer/importer.go index 6f8862ec..0478ef22 100644 --- a/server-v2/api/studio/internal/service/importer/importer.go +++ b/server-v2/api/studio/internal/service/importer/importer.go @@ -93,6 +93,20 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { } task, _ := GetTaskMgr().GetTask(taskID) + signal := make(chan struct{}, 1) + go func() { + for { + select { + case <-time.After(100 * time.Millisecond): + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + } + case <-signal: + return + } + } + }() go func() { result := ImportResult{} now := time.Now() @@ -139,6 +153,8 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { } zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) } + + signal <- struct{}{} }() return nil } @@ -166,23 +182,17 @@ func GetImportTask(tasksDir, taskID, address, username string) (*types.GetImport result := &types.GetImportTaskData{} if id, err := strconv.Atoi(taskID); err != nil { - zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) return nil, errors.New("task not existed") } else { _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) if err != nil { - zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) return nil, errors.New("task not existed") } } - err := GetTaskMgr().UpdateTaskInfo(taskID) - if err != nil { - zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) - } if t, ok := GetTaskMgr().GetTask(taskID); ok { task = *t - result.Id = fmt.Sprintf("%d", task.TaskInfo.ID) + result.Id = strconv.Itoa(t.TaskInfo.ID) result.Status = task.TaskInfo.TaskStatus result.Message = task.TaskInfo.TaskMessage result.CreateTime = task.TaskInfo.CreatedTime @@ -197,32 +207,33 @@ func GetImportTask(tasksDir, taskID, address, username string) (*types.GetImport return result, nil } -func GetManyImportTask(tasksDir, address, username string, page, pageSize int) (*types.GetManyImportTaskData, error) { +func GetManyImportTask(tasksDir, address, username string, pageIndex, pageSize int) (*types.GetManyImportTaskData, error) { result := &types.GetManyImportTaskData{ Total: 0, List: []types.GetImportTaskData{}, } - taskIDs, err := GetTaskMgr().GetAllTaskIDs(address, username) + tasks, err := taskmgr.db.FindTaskInfoByAddressAndUser(address, username, pageIndex, pageSize) if err != nil { return nil, err } - start := (page - 1) * pageSize - stop := page * pageSize - if len(taskIDs) <= start { - return nil, errors.New("invalid parameter") - } else { - if stop >= len(taskIDs) { - stop = len(taskIDs) - } - result.Total = int64(stop - start) - - for i := start; i < stop; i++ { - data, _ := GetImportTask(tasksDir, taskIDs[i], address, username) - result.List = append(result.List, *data) + for _, t := range tasks { + data := types.GetImportTaskData{ + Id: strconv.Itoa(t.ID), + Status: t.TaskStatus, + Message: t.TaskMessage, + CreateTime: t.CreatedTime, + UpdateTime: t.UpdatedTime, + Address: t.NebulaAddress, + User: t.User, + Name: t.Name, + Space: t.Space, + Stats: types.ImportTaskStats(t.Stats), } + result.List = append(result.List, data) } + result.Total = int64(len(result.List)) return result, nil } diff --git a/server-v2/api/studio/internal/service/importer/taskdb.go b/server-v2/api/studio/internal/service/importer/taskdb.go index 6619b2f8..d50abc89 100644 --- a/server-v2/api/studio/internal/service/importer/taskdb.go +++ b/server-v2/api/studio/internal/service/importer/taskdb.go @@ -44,6 +44,15 @@ func (t *TaskDb) FindTaskInfoByIdAndAddresssAndUser(id int, nebulaAddress, user return taskInfo, nil } +func (t *TaskDb) FindTaskInfoByAddressAndUser(nebulaAddress, user string, pageIndex, pageSize int) ([]*TaskInfo, error) { + tasks := make([]*TaskInfo, 0) + tx := t.Model(&TaskInfo{}).Where("nebula_address = ? And user = ?", nebulaAddress, user) + if err := tx.Offset((pageIndex - 1) * pageSize).Limit(pageSize).Find(&tasks).Error; err != nil { + return nil, err + } + return tasks, nil +} + func (t *TaskDb) InsertTaskInfo(info *TaskInfo) error { return t.Create(info).Error }