Skip to content

Commit

Permalink
fix: some issues (#541)
Browse files Browse the repository at this point in the history
* fix: some issues
* mod: code review
  • Loading branch information
hetao92 authored Apr 14, 2023
1 parent 6c8eab1 commit 73501a1
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 74 deletions.
1 change: 0 additions & 1 deletion app/pages/Import/TaskList/TemplateModal/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ const TemplateModal = (props: IProps) => {

const validateAddress = useCallback((client) => {
const msg = validateEmpty('address', client.address);
console.log('msg', msg);
if(msg) return msg;
const address = client.address.split(',');
if(address.some(i => i.startsWith('http'))) {
Expand Down
6 changes: 2 additions & 4 deletions app/pages/Import/TaskList/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ const TaskList = (props: IProps) => {
const handleTaskStop = useCallback(async (id: number) => {
clearTimeout(timer.current);
const { code } = await stopTask(id);
if(code === 0) {
message.success(intl.get('import.stopImportingSuccess'));
getTaskList();
}
code === 0 && message.success(intl.get('import.stopImportingSuccess'));
getTaskList();
}, []);
const handleTaskDelete = useCallback(async (id: number) => {
clearTimeout(timer.current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
}
.tip {
max-width: 600px;
min-width: 300px;
background: #DBEFFF;
border-radius: 3px;
padding: 13px;
Expand Down
8 changes: 3 additions & 5 deletions app/utils/function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ export const handleVidStringName = (name: string, spaceVidType?: string) => {
if (spaceVidType && spaceVidType === 'INT64') {
return convertBigNumberToString(name);
}
if (name.indexOf(`"`) > -1 && name.indexOf(`'`) === -1) {
return `'${name}'`;
} else {
return `"${name}"`;
}
// Add quotes to a string
// If there is '\n' in the string, it needs to be escaped
return JSON.stringify(name);
};

export const convertBigNumberToString = (value: any) => {
Expand Down
18 changes: 14 additions & 4 deletions server/api/studio/internal/service/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ func (d *datasourceService) Update(request types.DatasourceUpdateRequest) error
}
cfgStr, crypto, err := validate(typ, platform, cfg)
if err != nil {
return err
return ecode.WithErrorMessage(ecode.ErrBadRequest, err)
}
err = d.update(datasourceId, request.Type, request.Platform, request.Name, cfgStr, crypto)
if err != nil {
return err
return ecode.WithErrorMessage(ecode.ErrInternalServer, err)
}
return nil
}
Expand Down Expand Up @@ -184,14 +184,24 @@ func (d *datasourceService) Remove(request types.DatasourceRemoveRequest) error

func (d *datasourceService) BatchRemove(request types.DatasourceBatchRemoveRequest) error {
user := d.ctx.Value(auth.CtxKeyUserInfo{}).(*auth.AuthData)
var existingIDs []int
db.CtxDB.Model(&db.Datasource{}).Where("id in (?)", request.IDs).Pluck("id", &existingIDs)
if len(existingIDs) != len(request.IDs) {
var missingIDs []int
for _, id := range request.IDs {
if !utils.Contains(existingIDs, id) {
missingIDs = append(missingIDs, id)
}
}
return ecode.WithErrorMessage(ecode.ErrBadRequest, fmt.Errorf("some data are not found: %v", missingIDs))
}
result := db.CtxDB.Where("id IN (?) AND username = ?", request.IDs, user.Username).Delete(&db.Datasource{})

if result.Error != nil {
return d.gormErrorWrapper(result.Error)
}

if result.RowsAffected == 0 {
return ecode.WithErrorMessage(ecode.ErrBadRequest, fmt.Errorf("test"), "there is available item to delete")
return ecode.WithErrorMessage(ecode.ErrBadRequest, fmt.Errorf("no data found"))
}

return nil
Expand Down
1 change: 1 addition & 0 deletions server/api/studio/internal/service/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (i *importService) CreateImportTask(req *types.CreateImportTaskRequest) (*t
// start import
if err = importer.StartImport(taskID); err != nil {
task.TaskInfo.TaskStatus = importer.StatusAborted.String()
task.TaskInfo.TaskMessage = err.Error()
importer.GetTaskMgr().AbortTask(taskID)
return nil, ecode.WithErrorMessage(ecode.ErrInternalServer, err)
}
Expand Down
1 change: 1 addition & 0 deletions server/api/studio/internal/service/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func StartImport(taskID int) (err error) {
abort()
return
}
task.Client.HasStarted = true
err = mgr.Wait()
if err != nil {
task.TaskInfo.TaskStatus = StatusAborted.String()
Expand Down
7 changes: 4 additions & 3 deletions server/api/studio/internal/service/importer/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
)

type Client struct {
Cfg config.Configurator `json:"cfg,omitempty"`
Logger logger.Logger `json:"logger,omitempty"`
Manager manager.Manager `json:"manager,omitempty"`
Cfg config.Configurator `json:"cfg,omitempty"`
Logger logger.Logger `json:"logger,omitempty"`
Manager manager.Manager `json:"manager,omitempty"`
HasStarted bool `json:"has_started,omitempty"`
}
type Task struct {
Client *Client `json:"client,omitempty"`
Expand Down
25 changes: 19 additions & 6 deletions server/api/studio/internal/service/importer/taskmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package importer

import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -102,9 +103,10 @@ func (mgr *TaskMgr) NewTask(host string, user string, taskName string, cfg impor

task := &Task{
Client: &Client{
Cfg: cfg,
Manager: nil,
Logger: nil,
Cfg: cfg,
Manager: nil,
Logger: nil,
HasStarted: false,
},
TaskInfo: taskInfo,
}
Expand Down Expand Up @@ -214,12 +216,23 @@ and then call FinishTask
*/
func (mgr *TaskMgr) StopTask(taskID int) error {
if task, ok := mgr.getTaskFromMap(taskID); ok {
var err error
manager := task.Client.Manager
task.TaskInfo.TaskStatus = StatusStoped.String()
err := manager.Stop()
if manager != nil {
if task.Client.HasStarted {
err = manager.Stop()
} else {
// hack import not support stop before start()
err = errors.New("task has not started, please try later")
}
} else {
err = errors.New("manager is nil, please try later")
}

if err != nil {
return errors.New("stop task fail")
return fmt.Errorf("stop task failed: %w", err)
}
task.TaskInfo.TaskStatus = StatusStoped.String()
if err := mgr.FinishTask(taskID); err != nil {
return ecode.WithErrorMessage(ecode.ErrInternalServer, err)
}
Expand Down
52 changes: 52 additions & 0 deletions server/api/studio/pkg/utils/mutexMap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package utils

import "sync"

type mutexMap[T any] struct {
mu sync.RWMutex
data map[string]T
}

func NewMutexMap[T any]() *mutexMap[T] {
return &mutexMap[T]{
data: make(map[string]T),
}
}

func (m *mutexMap[T]) Get(key string) (T, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}

func (m *mutexMap[T]) Set(key string, val T) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = val
}

func (m *mutexMap[T]) Delete(key string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.data, key)
}
func (m *mutexMap[T]) Size() int {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.data)
}

func (m *mutexMap[T]) ForEach(f func(key string, val T)) {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.data {
f(k, v)
}
}

func (m *mutexMap[T]) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.data = make(map[string]T)
}
54 changes: 6 additions & 48 deletions server/api/studio/pkg/utils/tools.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,10 @@
package utils

import "sync"

type mutexMap[T any] struct {
mu sync.RWMutex
data map[string]T
}

func NewMutexMap[T any]() *mutexMap[T] {
return &mutexMap[T]{
data: make(map[string]T),
func Contains[T comparable](s []T, e T) bool {
for _, a := range s {
if a == e {
return true
}
}
}

func (m *mutexMap[T]) Get(key string) (T, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}

func (m *mutexMap[T]) Set(key string, val T) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = val
}

func (m *mutexMap[T]) Delete(key string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.data, key)
}
func (m *mutexMap[T]) Size() int {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.data)
}

func (m *mutexMap[T]) ForEach(f func(key string, val T)) {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.data {
f(k, v)
}
}

func (m *mutexMap[T]) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.data = make(map[string]T)
return false
}
4 changes: 1 addition & 3 deletions server/api/studio/studio.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func main() {
var c config.Config
conf.MustLoad(*configFile, &c, conf.UseEnv())

var lc logx.LogConf
conf.MustLoad(*configFile, &lc)
logx.MustSetup(lc)
logx.MustSetup(c.Log)
defer logx.Close()

// init logger
Expand Down

0 comments on commit 73501a1

Please sign in to comment.