Skip to content

Commit

Permalink
add goroutine to update taskinfo
Browse files Browse the repository at this point in the history
  • Loading branch information
veeding committed Jun 14, 2022
1 parent dae1782 commit 5406700
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 22 deletions.
55 changes: 33 additions & 22 deletions server-v2/api/studio/internal/service/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) {
}

task, _ := GetTaskMgr().GetTask(taskID)
signal := make(chan struct{}, 1)
go func() {
for {
select {
case <-time.After(100 * time.Millisecond):
err := GetTaskMgr().UpdateTaskInfo(taskID)
if err != nil {
zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err))
}
case <-signal:
return
}
}
}()
go func() {
result := ImportResult{}
now := time.Now()
Expand Down Expand Up @@ -139,6 +153,8 @@ func Import(taskID string, conf *importconfig.YAMLConfig) (err error) {
}
zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result))
}

signal <- struct{}{}
}()
return nil
}
Expand Down Expand Up @@ -166,23 +182,17 @@ func GetImportTask(tasksDir, taskID, address, username string) (*types.GetImport
result := &types.GetImportTaskData{}

if id, err := strconv.Atoi(taskID); err != nil {
zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err))
return nil, errors.New("task not existed")
} else {
_, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username)
if err != nil {
zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err))
return nil, errors.New("task not existed")
}
}

err := GetTaskMgr().UpdateTaskInfo(taskID)
if err != nil {
zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err))
}
if t, ok := GetTaskMgr().GetTask(taskID); ok {
task = *t
result.Id = fmt.Sprintf("%d", task.TaskInfo.ID)
result.Id = strconv.Itoa(t.TaskInfo.ID)
result.Status = task.TaskInfo.TaskStatus
result.Message = task.TaskInfo.TaskMessage
result.CreateTime = task.TaskInfo.CreatedTime
Expand All @@ -197,32 +207,33 @@ func GetImportTask(tasksDir, taskID, address, username string) (*types.GetImport
return result, nil
}

func GetManyImportTask(tasksDir, address, username string, page, pageSize int) (*types.GetManyImportTaskData, error) {
func GetManyImportTask(tasksDir, address, username string, pageIndex, pageSize int) (*types.GetManyImportTaskData, error) {
result := &types.GetManyImportTaskData{
Total: 0,
List: []types.GetImportTaskData{},
}

taskIDs, err := GetTaskMgr().GetAllTaskIDs(address, username)
tasks, err := taskmgr.db.FindTaskInfoByAddressAndUser(address, username, pageIndex, pageSize)
if err != nil {
return nil, err
}

start := (page - 1) * pageSize
stop := page * pageSize
if len(taskIDs) <= start {
return nil, errors.New("invalid parameter")
} else {
if stop >= len(taskIDs) {
stop = len(taskIDs)
}
result.Total = int64(stop - start)

for i := start; i < stop; i++ {
data, _ := GetImportTask(tasksDir, taskIDs[i], address, username)
result.List = append(result.List, *data)
for _, t := range tasks {
data := types.GetImportTaskData{
Id: strconv.Itoa(t.ID),
Status: t.TaskStatus,
Message: t.TaskMessage,
CreateTime: t.CreatedTime,
UpdateTime: t.UpdatedTime,
Address: t.NebulaAddress,
User: t.User,
Name: t.Name,
Space: t.Space,
Stats: types.ImportTaskStats(t.Stats),
}
result.List = append(result.List, data)
}
result.Total = int64(len(result.List))

return result, nil
}
Expand Down
9 changes: 9 additions & 0 deletions server-v2/api/studio/internal/service/importer/taskdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func (t *TaskDb) FindTaskInfoByIdAndAddresssAndUser(id int, nebulaAddress, user
return taskInfo, nil
}

func (t *TaskDb) FindTaskInfoByAddressAndUser(nebulaAddress, user string, pageIndex, pageSize int) ([]*TaskInfo, error) {
tasks := make([]*TaskInfo, 0)
tx := t.Model(&TaskInfo{}).Where("nebula_address = ? And user = ?", nebulaAddress, user)
if err := tx.Offset((pageIndex - 1) * pageSize).Limit(pageSize).Find(&tasks).Error; err != nil {
return nil, err
}
return tasks, nil
}

func (t *TaskDb) InsertTaskInfo(info *TaskInfo) error {
return t.Create(info).Error
}
Expand Down

0 comments on commit 5406700

Please sign in to comment.