diff --git a/server-v2/api/studio/etc/studio-api.yaml b/server-v2/api/studio/etc/studio-api.yaml index 0d12cbb3..b20e9e38 100644 --- a/server-v2/api/studio/etc/studio-api.yaml +++ b/server-v2/api/studio/etc/studio-api.yaml @@ -1,6 +1,6 @@ Name: studio-api Host: 0.0.0.0 -Port: 9000 +Port: 7002 MaxBytes: 1073741824 Debug: Enable: false @@ -8,4 +8,4 @@ Auth: AccessSecret: "login_secret" AccessExpire: 1800 File: - UploadDir: "./upload/" \ No newline at end of file + UploadDir: "./upload/" diff --git a/server-v2/api/studio/internal/config/config.go b/server-v2/api/studio/internal/config/config.go index 3efe2fcf..00a52800 100644 --- a/server-v2/api/studio/internal/config/config.go +++ b/server-v2/api/studio/internal/config/config.go @@ -1,6 +1,8 @@ package config -import "github.com/zeromicro/go-zero/rest" +import ( + "github.com/zeromicro/go-zero/rest" +) type Config struct { rest.RestConf diff --git a/server-v2/api/studio/internal/handler/file/fileuploadhandler.go b/server-v2/api/studio/internal/handler/file/fileuploadhandler.go index e379481b..93cc8169 100644 --- a/server-v2/api/studio/internal/handler/file/fileuploadhandler.go +++ b/server-v2/api/studio/internal/handler/file/fileuploadhandler.go @@ -10,7 +10,7 @@ import ( func FileUploadHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - l := file.NewFileUploadLogic(r, svcCtx) + l := file.NewFileUploadLogic(r.Context(), svcCtx) err := l.FileUpload() svcCtx.ResponseHandler.Handle(w, r, nil, err) } diff --git a/server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go new file mode 100644 index 00000000..e043a6f5 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func CreateImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.CreateImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewCreateImportTaskLogic(r.Context(), svcCtx) + data, err := l.CreateImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go new file mode 100644 index 00000000..c6a12618 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func DeleteImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.DeleteImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewDeleteImportTaskLogic(r.Context(), svcCtx) + err := l.DeleteImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go b/server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go new file mode 100644 index 00000000..0400faaf --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func DownloadConfigHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.DownloadConfigsRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewDownloadConfigLogic(r.Context(), svcCtx) + err := l.DownloadConfig(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go b/server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go new file mode 100644 index 00000000..cb4d4b34 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func DownloadLogsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.DownloadLogsRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewDownloadLogsLogic(r.Context(), svcCtx) + err := l.DownloadLogs(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go new file mode 100644 index 00000000..cb53642c --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetImportTaskLogic(r.Context(), svcCtx) + data, err := l.GetImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go b/server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go new file mode 100644 index 00000000..84aa1e01 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetImportTaskLogNamesHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetImportTaskLogNamesRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetImportTaskLogNamesLogic(r.Context(), svcCtx) + data, err := l.GetImportTaskLogNames(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go new file mode 100644 index 00000000..0937d48c --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetManyImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetManyImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetManyImportTaskLogic(r.Context(), svcCtx) + data, err := l.GetManyImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go new file mode 100644 index 00000000..b030ea08 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetManyImportTaskLogHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetManyImportTaskLogRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetManyImportTaskLogLogic(r.Context(), svcCtx) + data, err := l.GetManyImportTaskLog(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go new file mode 100644 index 00000000..3c5d4928 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func StopImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.StopImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewStopImportTaskLogic(r.Context(), svcCtx) + err := l.StopImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/routes.go b/server-v2/api/studio/internal/handler/routes.go index c6b5e76d..907b082b 100644 --- a/server-v2/api/studio/internal/handler/routes.go +++ b/server-v2/api/studio/internal/handler/routes.go @@ -7,6 +7,7 @@ import ( file "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/file" gateway "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/gateway" health "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/health" + importtask "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/importtask" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/zeromicro/go-zero/rest" @@ -59,7 +60,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { []rest.Route{ { Method: http.MethodPost, - Path: "/api/file", + Path: "/api/file/upload", Handler: file.FileUploadHandler(serverCtx), }, { @@ -74,4 +75,54 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { }, }, ) + + server.AddRoutes( + []rest.Route{ + { + Method: http.MethodPost, + Path: "/api/import-tasks", + Handler: importtask.CreateImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id", + Handler: importtask.GetImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks", + Handler: importtask.GetManyImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/logs", + Handler: importtask.GetManyImportTaskLogHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/task-log-names", + Handler: importtask.GetImportTaskLogNamesHandler(serverCtx), + }, + { + Method: http.MethodDelete, + Path: "/api/import-tasks/:id", + Handler: importtask.DeleteImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/stop/:id", + Handler: importtask.StopImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/download/logs/:id", + Handler: importtask.DownloadLogsHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/download/config/:id", + Handler: importtask.DownloadConfigHandler(serverCtx), + }, + }, + ) } diff --git a/server-v2/api/studio/internal/logic/file/filedestroylogic.go b/server-v2/api/studio/internal/logic/file/filedestroylogic.go index b70b97f3..4c75f53f 100644 --- a/server-v2/api/studio/internal/logic/file/filedestroylogic.go +++ b/server-v2/api/studio/internal/logic/file/filedestroylogic.go @@ -2,6 +2,7 @@ package file import ( "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" diff --git a/server-v2/api/studio/internal/logic/file/filesindexlogic.go b/server-v2/api/studio/internal/logic/file/filesindexlogic.go index 21c5d3d5..50595cf4 100644 --- a/server-v2/api/studio/internal/logic/file/filesindexlogic.go +++ b/server-v2/api/studio/internal/logic/file/filesindexlogic.go @@ -2,6 +2,7 @@ package file import ( "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" diff --git a/server-v2/api/studio/internal/logic/file/fileuploadlogic.go b/server-v2/api/studio/internal/logic/file/fileuploadlogic.go index 81f5eaf6..9776a01c 100644 --- a/server-v2/api/studio/internal/logic/file/fileuploadlogic.go +++ b/server-v2/api/studio/internal/logic/file/fileuploadlogic.go @@ -1,9 +1,9 @@ package file import ( - "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "net/http" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/zeromicro/go-zero/core/logx" ) diff --git a/server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go new file mode 100644 index 00000000..1c7a1a65 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go @@ -0,0 +1,28 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type CreateImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewCreateImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateImportTaskLogic { + return &CreateImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *CreateImportTaskLogic) CreateImportTask(req types.CreateImportTaskRequest) (resp *types.CreateImportTaskData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).CreateImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go new file mode 100644 index 00000000..2e277e6b --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go @@ -0,0 +1,28 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DeleteImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewDeleteImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteImportTaskLogic { + return &DeleteImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *DeleteImportTaskLogic) DeleteImportTask(req types.DeleteImportTaskRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).DeleteImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go b/server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go new file mode 100644 index 00000000..4b2bee45 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DownloadConfigLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewDownloadConfigLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DownloadConfigLogic { + return &DownloadConfigLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *DownloadConfigLogic) DownloadConfig(req types.DownloadConfigsRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).DownloadConfig(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go b/server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go new file mode 100644 index 00000000..ba4f093e --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DownloadLogsLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewDownloadLogsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DownloadLogsLogic { + return &DownloadLogsLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *DownloadLogsLogic) DownloadLogs(req types.DownloadLogsRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).DownloadLogs(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go new file mode 100644 index 00000000..356aa478 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go @@ -0,0 +1,28 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetImportTaskLogic { + return &GetImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetImportTaskLogic) GetImportTask(req types.GetImportTaskRequest) (resp *types.GetImportTaskData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go b/server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go new file mode 100644 index 00000000..c9c3f882 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go @@ -0,0 +1,28 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetImportTaskLogNamesLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetImportTaskLogNamesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetImportTaskLogNamesLogic { + return &GetImportTaskLogNamesLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetImportTaskLogNamesLogic) GetImportTaskLogNames(req types.GetImportTaskLogNamesRequest) (resp *types.GetImportTaskLogNamesData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetImportTaskLogNames(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go new file mode 100644 index 00000000..9429da34 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go @@ -0,0 +1,28 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetManyImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetManyImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetManyImportTaskLogic { + return &GetManyImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetManyImportTaskLogic) GetManyImportTask(req types.GetManyImportTaskRequest) (resp *types.GetManyImportTaskData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetManyImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go b/server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go new file mode 100644 index 00000000..d052d4eb --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetManyImportTaskLogLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetManyImportTaskLogLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetManyImportTaskLogLogic { + return &GetManyImportTaskLogLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetManyImportTaskLogLogic) GetManyImportTaskLog(req types.GetManyImportTaskLogRequest) (resp *types.GetManyImportTaskLogData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetManyImportTaskLog(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go new file mode 100644 index 00000000..9a208b9c --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type StopImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewStopImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StopImportTaskLogic { + return &StopImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *StopImportTaskLogic) StopImportTask(req types.StopImportTaskRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).StopImportTask(&req) +} diff --git a/server-v2/api/studio/internal/service/file.go b/server-v2/api/studio/internal/service/file.go index 9fd1a8a2..72812261 100644 --- a/server-v2/api/studio/internal/service/file.go +++ b/server-v2/api/studio/internal/service/file.go @@ -6,12 +6,6 @@ import ( "encoding/csv" "errors" "fmt" - "github.com/axgle/mahonia" - "github.com/saintfish/chardet" - "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" - "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" - "github.com/zeromicro/go-zero/core/logx" - "go.uber.org/zap" "io" "io/ioutil" "mime/multipart" @@ -19,6 +13,15 @@ import ( "os" "path/filepath" "strings" + + "github.com/axgle/mahonia" + "github.com/saintfish/chardet" + "github.com/vesoft-inc/go-pkg/middleware" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + "github.com/zeromicro/go-zero/core/logx" + "go.uber.org/zap" ) const ( @@ -61,6 +64,14 @@ func NewFileService(r *http.Request, ctx context.Context, svcCtx *svc.ServiceCon } } +func NewFileService(ctx context.Context, svcCtx *svc.ServiceContext) FileService { + return &fileService{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + func (f *fileService) FileDestroy(name string) error { dir := f.svcCtx.Config.File.UploadDir target := filepath.Join(dir, name) @@ -132,8 +143,13 @@ func (f *fileService) FileUpload() error { } } + httpReq, ok := middleware.GetRequest(f.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepRequest")) + } + logx.Infof("dir:", dir) - files, _, err := f.UploadFormFiles(dir) + files, _, err := UploadFormFiles(httpReq, dir) if err != nil { logx.Infof("upload file error:%v", err) return err @@ -157,20 +173,20 @@ func (f *fileService) FileUpload() error { return nil } -func (f *fileService) UploadFormFiles(destDirectory string) (uploaded []*multipart.FileHeader, n int64, err error) { - err = f.r.ParseMultipartForm(defaultMulipartMemory) +func UploadFormFiles(r *http.Request, destDirectory string) (uploaded []*multipart.FileHeader, n int64, err error) { + err = r.ParseMultipartForm(defaultMulipartMemory) if err != nil { return nil, 0, err } - if f.r.MultipartForm != nil { - if fhs := f.r.MultipartForm.File; fhs != nil { + if r.MultipartForm != nil { + if fhs := r.MultipartForm.File; fhs != nil { for _, files := range fhs { for _, file := range files { file.Filename = strings.ReplaceAll(file.Filename, "../", "") file.Filename = strings.ReplaceAll(file.Filename, "..\\", "") - n0, err0 := f.SaveFormFile(file, filepath.Join(destDirectory, file.Filename)) + n0, err0 := SaveFormFile(file, filepath.Join(destDirectory, file.Filename)) if err0 != nil { return nil, 0, err0 } @@ -185,7 +201,7 @@ func (f *fileService) UploadFormFiles(destDirectory string) (uploaded []*multipa return nil, 0, http.ErrMissingFile } -func (f *fileService) SaveFormFile(fh *multipart.FileHeader, dest string) (int64, error) { +func SaveFormFile(fh *multipart.FileHeader, dest string) (int64, error) { src, err := fh.Open() if err != nil { return 0, err diff --git a/server-v2/api/studio/internal/service/import.go b/server-v2/api/studio/internal/service/import.go new file mode 100644 index 00000000..660df854 --- /dev/null +++ b/server-v2/api/studio/internal/service/import.go @@ -0,0 +1,334 @@ +package service + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/vesoft-inc/go-pkg/middleware" + importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" + importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service/importer" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + Config "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" + "github.com/zeromicro/go-zero/core/logx" + "go.uber.org/zap" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "sync" +) + +var ( + _ ImportService = (*importService)(nil) + muTaskId sync.RWMutex +) + +const ( + importLogName = "import.log" + errContentDir = "err" +) + +type ( + ImportService interface { + CreateImportTask(*types.CreateImportTaskRequest) (*types.CreateImportTaskData, error) + StopImportTask(request *types.StopImportTaskRequest) error + DownloadConfig(*types.DownloadConfigsRequest) error + DownloadLogs(request *types.DownloadLogsRequest) error + DeleteImportTask(*types.DeleteImportTaskRequest) error + GetImportTask(*types.GetImportTaskRequest) (*types.GetImportTaskData, error) + GetManyImportTask(request *types.GetManyImportTaskRequest) (*types.GetManyImportTaskData, error) + GetImportTaskLogNames(request *types.GetImportTaskLogNamesRequest) (*types.GetImportTaskLogNamesData, error) + GetManyImportTaskLog(request *types.GetManyImportTaskLogRequest) (*types.GetManyImportTaskLogData, error) + } + + importService struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext + } +) + +func NewImportService(ctx context.Context, svcCtx *svc.ServiceContext) ImportService { + return &importService{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (i *importService) CreateImportTask(req *types.CreateImportTaskRequest) (*types.CreateImportTaskData, error) { + jsons, err := json.Marshal(req.Config) + if err != nil { + return nil, errors.New("importDataParams get fail") + } + + conf := importconfig.YAMLConfig{} + err = json.Unmarshal(jsons, &conf) + if err != nil { + return nil, err + } + + if err = validClientParams(&conf); err != nil { + err = importerErrors.Wrap(importerErrors.InvalidConfigPathOrFormat, err) + zap.L().Warn("client params is wrong", zap.Error(err)) + return nil, err + } + + taskDir, err := importer.GetNewTaskDir() + if err != nil { + return nil, err + } + logPath := filepath.Join(taskDir, "import.log") + conf.LogPath = &logPath + + // create config file + if err := importer.CreateConfigFile(taskDir, conf); err != nil { + return nil, err + } + + // create err dir + taskErrDir := filepath.Join(taskDir, "err") + if err = utils.CreateDir(taskErrDir); err != nil { + return nil, err + } + + // import + nebulaAddress := *conf.NebulaClientSettings.Connection.Address + user := *conf.NebulaClientSettings.Connection.User + name := req.Name + space := *conf.NebulaClientSettings.Space + task, taskID, err := importer.GetTaskMgr().NewTask(nebulaAddress, user, name, space) + if err != nil { + zap.L().Warn("init task fail", zap.Error(err)) + return nil, err + } + if err = importer.Import(taskID, &conf); err != nil { + // task err: import task not start err + task.TaskInfo.TaskStatus = importer.StatusAborted.String() + err1 := importer.GetTaskMgr().AbortTask(taskID) + if err != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskID, err)) + return nil, err + } + + // write taskId to file + muTaskId.Lock() + taskIDBytes, err := ioutil.ReadFile(Config.Cfg.Web.TaskIdPath) + if err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + return nil, err + } + taskIdJSON := make(map[string]bool) + if len(taskIDBytes) != 0 { + if err := json.Unmarshal(taskIDBytes, &taskIdJSON); err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + return nil, err + } + } + taskIdJSON[taskID] = true + bytes, err := json.Marshal(taskIdJSON) + if err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + } + err = ioutil.WriteFile(Config.Cfg.Web.TaskIdPath, bytes, 777) + if err != nil { + zap.L().Warn("write taskId file error", zap.Error(err)) + } + defer muTaskId.Unlock() + + return &types.CreateImportTaskData{ + Id: taskID, + }, nil +} + +func (i *importService) StopImportTask(req *types.StopImportTaskRequest) error { + return importer.StopImportTask(req.Id, req.Address+":"+req.Port, req.Username) +} + +func (i *importService) DownloadConfig(req *types.DownloadConfigsRequest) error { + if req.Id == "" { + return errors.New("invalid Id") + } + + httpReq, ok := middleware.GetRequest(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepRequest")) + } + + httpResp, ok := middleware.GetResponseWriter(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepResponse Writer")) + } + + configPath := filepath.Join(Config.Cfg.Web.TasksDir, req.Id, "config.yaml") + httpResp.Header().Set("Content-Type", "application/octet-stream") + httpResp.Header().Set("Content-Disposition", "attachment;filename="+filepath.Base(configPath)) + http.ServeFile(httpResp, httpReq, configPath) + + return nil +} + +func (i *importService) DownloadLogs(req *types.DownloadLogsRequest) error { + id := req.Id + if id == "" { + return errors.New("id parse failed") + } + + httpReq, ok := middleware.GetRequest(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepRequest")) + } + + httpResp, ok := middleware.GetResponseWriter(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepResponse Writer")) + } + + filename := req.Name + path := "" + if filename == "import.log" { + path = filepath.Join(Config.Cfg.Web.TasksDir, id, filename) + } else { + path = filepath.Join(Config.Cfg.Web.TasksDir, id, "err", filename) + } + + fmt.Println("------------------------") + fmt.Println("test") + + httpResp.Header().Set("Content-Type", "application/octet-stream") + httpResp.Header().Set("Content-Disposition", "attachment;filename="+filepath.Base(path)) + http.ServeFile(httpResp, httpReq, path) + return nil +} + +func (i *importService) DeleteImportTask(req *types.DeleteImportTaskRequest) error { + return importer.DeleteImportTask(req.Id, req.Address+":"+req.Port, req.Username) +} + +func (i *importService) GetImportTask(req *types.GetImportTaskRequest) (*types.GetImportTaskData, error) { + return importer.GetImportTask(req.Id, req.Address+":"+req.Port, req.Username) +} + +func (i *importService) GetManyImportTask(req *types.GetManyImportTaskRequest) (*types.GetManyImportTaskData, error) { + return importer.GetManyImportTask(req.Address+":"+req.Port, req.Username, req.Page, req.PageSize) +} + +// GetImportTaskLogNames :Get all log file's name of a task +func (i *importService) GetImportTaskLogNames(req *types.GetImportTaskLogNamesRequest) (*types.GetImportTaskLogNamesData, error) { + id := req.Id + if id == "" { + return nil, errors.New("id parse failed") + } + + errLogDir := filepath.Join(Config.Cfg.Web.TasksDir, id, "err") + fileInfos, err := ioutil.ReadDir(errLogDir) + if err != nil { + return nil, err + } + + data := &types.GetImportTaskLogNamesData{ + Names: []string{}, + } + data.Names = append(data.Names, "import.log") + for _, fileInfo := range fileInfos { + name := fileInfo.Name() + data.Names = append(data.Names, name) + } + return data, nil +} + +func (i *importService) GetManyImportTaskLog(req *types.GetManyImportTaskLogRequest) (*types.GetManyImportTaskLogData, error) { + path := "" + if req.File == importLogName { + path = filepath.Join(Config.Cfg.Web.TasksDir, req.Id, req.File) + } else { + path = filepath.Join(Config.Cfg.Web.TasksDir, req.Id, errContentDir, req.File) + } + lines, err := readFile(path, req.Offset, req.Limit) + if err != nil { + return nil, err + } + + muTaskId.RLock() + taskIdBytes, err := ioutil.ReadFile(Config.Cfg.Web.TaskIdPath) + muTaskId.RUnlock() + if err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + return nil, err + } + taskIdJSON := make(map[string]bool) + if len(taskIdBytes) != 0 { + err = json.Unmarshal(taskIdBytes, &taskIdJSON) + if err != nil { + zap.L().Warn("parse taskId file error", zap.Error(err)) + return nil, err + } + } + + if len(lines) == 0 && taskIdJSON[req.Id] { + return nil, nil + } + if len(lines) == 0 { + return nil, errors.New("no task") + } + + data := &types.GetManyImportTaskLogData{ + Logs: lines, + } + + return data, nil +} + +func validClientParams(conf *importconfig.YAMLConfig) error { + if conf.NebulaClientSettings.Connection == nil || + conf.NebulaClientSettings.Connection.Address == nil || + *conf.NebulaClientSettings.Connection.Address == "" || + conf.NebulaClientSettings.Connection.User == nil || + *conf.NebulaClientSettings.Connection.User == "" || + conf.NebulaClientSettings.Space == nil || + *conf.NebulaClientSettings.Space == "" { + return errors.New("client params is wrong") + } + + for _, fn := range conf.Files { + if fn.CSV.Delimiter == nil || *fn.CSV.Delimiter == "" { + delimiter := "," + fn.CSV.Delimiter = &delimiter + } + } + + return nil +} + +func readFile(path string, offset int64, limit int64) ([]string, error) { + file, err := os.Open(path) + if err != nil { + zap.L().Warn("open file error", zap.Error(err)) + return nil, err + } + defer file.Close() + scanner := bufio.NewScanner(file) + res := make([]string, 0) + if limit != -1 { + for lineIndex := int64(0); scanner.Scan() && lineIndex < offset+limit; lineIndex++ { + if lineIndex >= offset { + res = append(res, scanner.Text()) + } + } + } else { + for lineIndex := int64(0); scanner.Scan(); lineIndex++ { + if lineIndex >= offset { + res = append(res, scanner.Text()) + } + } + } + return res, nil +} diff --git a/server-v2/api/studio/internal/service/importer/importer.go b/server-v2/api/studio/internal/service/importer/importer.go new file mode 100644 index 00000000..b4de26ca --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/importer.go @@ -0,0 +1,261 @@ +package importer + +import ( + "errors" + "fmt" + importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" + importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-importer/pkg/logger" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + Config "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" + "go.uber.org/zap" + "os" + "path/filepath" + "strconv" + "time" + + "gopkg.in/yaml.v2" +) + +type ImportResult struct { + TaskId string `json:"taskId"` + TimeCost string `json:"timeCost"` // Milliseconds + FailedRows int64 `json:"failedRows"` + ErrorResult struct { + ErrorCode int `json:"errorCode"` + ErrorMsg string `json:"errorMsg"` + } +} + +func GetNewTaskDir() (string, error) { + taskId, err := GetTaskMgr().NewTaskID() + if err != nil { + return "", err + } + taskDir := filepath.Join(Config.Cfg.Web.TasksDir, taskId) + return taskDir, nil +} + +func CreateConfigFile(dir string, config importconfig.YAMLConfig) error { + fileName := "config.yaml" + err := utils.CreateDir(dir) + if err != nil { + return err + } + path := filepath.Join(dir, fileName) + // erase user information + address := *config.NebulaClientSettings.Connection.Address + user := *config.NebulaClientSettings.Connection.User + password := *config.NebulaClientSettings.Connection.Password + *config.NebulaClientSettings.Connection.Address = "" + *config.NebulaClientSettings.Connection.User = "" + *config.NebulaClientSettings.Connection.Password = "" + + // erase path infomation + logPath := *config.LogPath + *config.LogPath = "import.log" + paths := make([]string, 0) + failDataPaths := make([]string, 0) + for _, file := range config.Files { + paths = append(paths, filepath.Join(Config.Cfg.Web.UploadDir, *file.Path)) + failDataPaths = append(failDataPaths, filepath.Join(dir, "err", *file.FailDataPath)) + _, fileName := filepath.Split(*file.Path) + _, fileDataName := filepath.Split(*file.FailDataPath) + *file.Path = fileName + *file.FailDataPath = fileDataName + } + + outYaml, err := yaml.Marshal(config) + if err != nil { + return err + } + if err := os.WriteFile(path, outYaml, 0644); err != nil { + zap.L().Warn("write"+path+"file error", zap.Error(err)) + return err + } + + *config.LogPath = logPath + *config.NebulaClientSettings.Connection.Address = address + *config.NebulaClientSettings.Connection.User = user + *config.NebulaClientSettings.Connection.Password = password + for i, file := range config.Files { + *file.Path = paths[i] + *file.FailDataPath = failDataPaths[i] + } + return nil +} + +func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { + runnerLogger := logger.NewRunnerLogger(*conf.LogPath) + if err := conf.ValidateAndReset("", runnerLogger); err != nil { + return err + } + + task, _ := GetTaskMgr().GetTask(taskID) + go func() { + result := ImportResult{} + now := time.Now() + task.GetRunner().Run(conf) + timeCost := time.Since(now).Milliseconds() + result.TaskId = taskID + result.TimeCost = fmt.Sprintf("%dms", timeCost) + if rerrs := task.GetRunner().Errors(); len(rerrs) != 0 { + allErrIsNotCompleteError := true + for _, rerr := range rerrs { + err := rerr.(importerErrors.ImporterError) + if err.ErrCode != importerErrors.NotCompleteError { + allErrIsNotCompleteError = false + break + } + } + if allErrIsNotCompleteError { + task.TaskInfo.TaskStatus = StatusFinished.String() + result.FailedRows = task.GetRunner().NumFailed + err1 := GetTaskMgr().FinishTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + return + } + // TODO: return all errors + task.TaskInfo.TaskStatus = StatusAborted.String() + err, _ := rerrs[0].(importerErrors.ImporterError) + result.ErrorResult.ErrorCode = err.ErrCode + result.ErrorResult.ErrorMsg = err.ErrMsg.Error() + task.TaskInfo.TaskMessage = err.ErrMsg.Error() + err1 := GetTaskMgr().AbortTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Warn(fmt.Sprintf("Failed to finish a import task: `%s`, task result: `%v`", taskID, result)) + } else { + task.TaskInfo.TaskStatus = StatusFinished.String() + result.FailedRows = task.GetRunner().NumFailed + err := GetTaskMgr().FinishTask(taskID) + if err != nil { + zap.L().Warn("finish task fail", zap.Error(err)) + } + zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + } + }() + return nil +} + +func ImportStatus(taskID string) (*TaskInfo, error) { + if t, ok := GetTaskMgr().GetTask(taskID); ok { + if t.GetRunner() != nil { + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + return nil, err + } + } + return t.TaskInfo, nil + } + return nil, errors.New("task is not exist") +} + +func DeleteImportTask(taskID, address, username string) error { + if id, err := strconv.Atoi(taskID); err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } else { + _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) + if err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } + } + err := GetTaskMgr().DelTask(taskID) + if err != nil { + return fmt.Errorf("task del fail, %s", err.Error()) + } + return nil +} + +func GetImportTask(taskID, address, username string) (*types.GetImportTaskData, error) { + task := Task{} + result := &types.GetImportTaskData{} + + if id, err := strconv.Atoi(taskID); err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + return nil, errors.New("task not existed") + } else { + _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) + if err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + return nil, errors.New("task not existed") + } + } + + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + } + if t, ok := GetTaskMgr().GetTask(taskID); ok { + task = *t + result.Id = fmt.Sprintf("%d", task.TaskInfo.ID) + result.Status = task.TaskInfo.TaskStatus + result.CreateTime = task.TaskInfo.CreatedTime + result.UpdateTime = task.TaskInfo.UpdatedTime + result.Address = task.TaskInfo.NebulaAddress + result.User = task.TaskInfo.User + result.Name = task.TaskInfo.Name + result.Space = task.TaskInfo.Space + result.Stats = types.Stats(task.TaskInfo.Stats) + } + + return result, nil +} + +func GetManyImportTask(address, username string, page, pageSize int) (*types.GetManyImportTaskData, error) { + result := &types.GetManyImportTaskData{ + Total: 0, + List: []types.GetImportTaskData{}, + } + + taskIDs, err := GetTaskMgr().GetAllTaskIDs(address, username) + if err != nil { + return nil, err + } + + start := (page - 1) * pageSize + stop := page * pageSize + if len(taskIDs) <= start { + return nil, errors.New("invalid parameter") + } else { + if stop >= len(taskIDs) { + stop = len(taskIDs) + } + result.Total = int64(stop - start) + + for i := start; i < stop; i++ { + data, _ := GetImportTask(taskIDs[i], address, username) + result.List = append(result.List, *data) + } + } + + return result, nil +} + +func StopImportTask(taskID, address, username string) error { + if id, err := strconv.Atoi(taskID); err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } else { + _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) + if err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } + } + + err := GetTaskMgr().StopTask(taskID) + if err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return err + } else { + return nil + } +} diff --git a/server-v2/api/studio/internal/service/importer/task.go b/server-v2/api/studio/internal/service/importer/task.go new file mode 100644 index 00000000..58b9c1ee --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/task.go @@ -0,0 +1,21 @@ +package importer + +import ( + "github.com/vesoft-inc/nebula-importer/pkg/cmd" + "github.com/zeromicro/go-zero/core/logx" +) + +type Task struct { + runner *cmd.Runner `json:"runner,omitempty"` + TaskInfo *TaskInfo `json:"task_info,omitempty"` +} + +func (t *Task) UpdateQueryStats() error { + stats, err := t.runner.QueryStats() + if err != nil { + logx.Infof("query import stats fail: %s", err) + return err + } + t.TaskInfo.Stats = *stats + return nil +} diff --git a/server-v2/api/studio/internal/service/importer/taskInfo.go b/server-v2/api/studio/internal/service/importer/taskInfo.go new file mode 100644 index 00000000..dd393131 --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/taskInfo.go @@ -0,0 +1,16 @@ +package importer + +import "github.com/vesoft-inc/nebula-importer/pkg/stats" + +type TaskInfo struct { + ID int `json:"taskID" gorm:"primaryKey;autoIncrement"` + Name string `json:"name"` + Space string `json:"space"` + NebulaAddress string `json:"nebulaAddress"` + CreatedTime int64 `json:"createdTime"` + UpdatedTime int64 `json:"updatedTime"` + User string `json:"user"` + TaskStatus string `json:"taskStatus"` + TaskMessage string `json:"taskMessage"` + Stats stats.Stats `json:"stats" gorm:"embedded"` +} diff --git a/server-v2/api/studio/internal/service/importer/taskdb.go b/server-v2/api/studio/internal/service/importer/taskdb.go new file mode 100644 index 00000000..7ca6050e --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/taskdb.go @@ -0,0 +1,88 @@ +package importer + +import ( + Config "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/config" + "github.com/zeromicro/go-zero/core/logx" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +type TaskDb struct { + *gorm.DB +} + +func InitDB() { + dbFilePath := Config.Cfg.Web.SqlitedbFilePath + db, err := gorm.Open(sqlite.Open(dbFilePath), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info), + }) + if err != nil { + logx.Errorf("init db fail: %s", err) + } + + err = db.AutoMigrate(&TaskInfo{}) + if err != nil { + logx.Errorf("init taskInfo table fail: %s", err) + panic(err) + } + GetTaskMgr().db = &TaskDb{ + DB: db, + } + if err := GetTaskMgr().db.UpdateProcessingTasks2Aborted(); err != nil { + logx.Errorf("update processing tasks to aborted failed: %s", err) + panic(err) + } +} + +// FindTaskInfoByIdAndAddresssAndUser used to check whether the task belongs to the user +func (t *TaskDb) FindTaskInfoByIdAndAddresssAndUser(id int, nebulaAddress, user string) (*TaskInfo, error) { + taskInfo := new(TaskInfo) + if err := t.Model(&TaskInfo{}).Where("id = ? AND nebula_address = ? And user = ?", id, nebulaAddress, + user).First(&taskInfo).Error; err != nil { + return nil, err + } + return taskInfo, nil +} + +func (t *TaskDb) InsertTaskInfo(info *TaskInfo) error { + return t.Create(info).Error +} + +func (t *TaskDb) UpdateTaskInfo(info *TaskInfo) error { + return t.Model(&TaskInfo{}).Where("id = ?", info.ID).Updates(info).Error +} + +func (t *TaskDb) DelTaskInfo(ID int) error { + return t.Delete(&TaskInfo{}, ID).Error +} + +func (t *TaskDb) LastId() (int, error) { + var id int + if err := t.Raw("SELECT MAX(id) FROM task_infos").Scan(&id).Error; err != nil { + if err.Error() == "sql: Scan error on column index 0, name \"MAX(id)\": converting NULL to int is unsupported" { + return 0, nil + } + return 0, err + } + return id, nil +} + +func (t *TaskDb) SelectAllIds(nebulaAddress, user string) ([]int, error) { + var taskInfos []TaskInfo + ids := make([]int, 0) + if err := t.Select("id").Where("nebula_address = ? And user = ?", nebulaAddress, user).Find(&taskInfos).Error; err != nil { + return nil, err + } + for _, taskInfo := range taskInfos { + ids = append(ids, taskInfo.ID) + } + return ids, nil +} + +func (t *TaskDb) UpdateProcessingTasks2Aborted() error { + if err := t.Model(&TaskInfo{}).Where("task_status = ?", StatusProcessing.String()).Update("task_status", StatusAborted.String()).Error; err != nil { + return err + } + return nil +} diff --git a/server-v2/api/studio/internal/service/importer/taskmgr.go b/server-v2/api/studio/internal/service/importer/taskmgr.go new file mode 100644 index 00000000..7c20ebeb --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/taskmgr.go @@ -0,0 +1,269 @@ +package importer + +import ( + "errors" + "fmt" + "github.com/vesoft-inc/nebula-importer/pkg/cmd" + Config "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/config" + "github.com/zeromicro/go-zero/core/logx" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +var ( + taskmgr *TaskMgr = &TaskMgr{ + tasks: sync.Map{}, + db: &TaskDb{}, + } + + mux sync.Mutex +) + +type TaskMgr struct { + tasks sync.Map + db *TaskDb +} + +func newTask(nebulaAddress string, user string, name string, space string) *Task { + timeUnix := time.Now().Unix() + return &Task{ + runner: &cmd.Runner{}, + TaskInfo: &TaskInfo{ + Name: name, + Space: space, + CreatedTime: timeUnix, + UpdatedTime: timeUnix, + TaskStatus: StatusProcessing.String(), + NebulaAddress: nebulaAddress, + User: user, + }, + } +} + +func (task *Task) GetRunner() *cmd.Runner { + return task.runner +} + +func (mgr *TaskMgr) NewTaskID() (string, error) { + tid, err := mgr.db.LastId() + if err != nil { + return "", err + } + taskID := fmt.Sprintf("%v", tid+1) + return taskID, nil +} + +func (mgr *TaskMgr) NewTask(nebulaAddress string, user string, name string, space string) (*Task, string, error) { + mux.Lock() + defer mux.Unlock() + task := newTask(nebulaAddress, user, name, space) + if err := mgr.db.InsertTaskInfo(task.TaskInfo); err != nil { + return nil, "", err + } + tid, err := mgr.db.LastId() + if err != nil { + return nil, "", err + } + task.TaskInfo.ID = tid + taskID := fmt.Sprintf("%v", tid) + mgr.PutTask(taskID, task) + return task, taskID, nil +} + +func GetTaskMgr() *TaskMgr { + return taskmgr +} + +/* + GetTask get task from map and local sql +*/ +func (mgr *TaskMgr) GetTask(taskID string) (*Task, bool) { + if task, ok := mgr.getTaskFromMap(taskID); ok { + return task, true + } + task := mgr.getTaskFromSQL(taskID) + // did not find task + if task.TaskInfo.ID == 0 { + return nil, false + } + return task, true +} + +/* + PutTask put task into tasks map +*/ +func (mgr *TaskMgr) PutTask(taskID string, task *Task) { + mgr.tasks.Store(taskID, task) +} + +/* + FinishTask will query task stats, delete task in the map + and update the taskInfo in local sql +*/ +func (mgr *TaskMgr) FinishTask(taskID string) (err error) { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return + } + if err := task.UpdateQueryStats(); err != nil { + return err + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + err = mgr.db.UpdateTaskInfo(task.TaskInfo) + if err != nil { + return err + } + mgr.tasks.Delete(taskID) + return +} + +func (mgr *TaskMgr) AbortTask(taskID string) (err error) { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + err = mgr.db.UpdateTaskInfo(task.TaskInfo) + if err != nil { + return err + } + mgr.tasks.Delete(taskID) + return +} + +func (mgr *TaskMgr) DelTask(taskID string) error { + _, ok := mgr.getTaskFromMap(taskID) + if ok { + mgr.tasks.Delete(taskID) + } + id, err := strconv.Atoi(taskID) + if err != nil { + return errors.New("taskID is wrong") + } + if err = mgr.db.DelTaskInfo(id); err != nil { + return err + } + taskDir := filepath.Join(Config.Cfg.Web.TasksDir, taskID) + return os.RemoveAll(taskDir) +} + +/* + UpdateTaskInfo will query task stats, update task in the map + and update the taskInfo in local sql +*/ +func (mgr *TaskMgr) UpdateTaskInfo(taskID string) error { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return nil + } + if err := task.UpdateQueryStats(); err != nil { + return err + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + return mgr.db.UpdateTaskInfo(task.TaskInfo) +} + +/* + StopTask will change the task status to `StatusStoped`, + and then call FinishTask +*/ +func (mgr *TaskMgr) StopTask(taskID string) error { + if task, ok := mgr.getTaskFromMap(taskID); ok { + if task.GetRunner().Readers == nil { + return errors.New("task is not initialized") + } + for _, r := range task.GetRunner().Readers { + r.Stop() + } + task.TaskInfo.TaskStatus = StatusStoped.String() + if err := mgr.FinishTask(taskID); err != nil { + logx.Alert(fmt.Sprintf("finish task fail: %s", err)) + return err + } + return nil + } + return errors.New("task is finished or not exist") +} + +/* + `GetAllTaskIDs` will return all task ids in map +*/ +func (mgr *TaskMgr) GetAllTaskIDs(nebulaAddress, username string) ([]string, error) { + ids := make([]string, 0) + allIds, err := mgr.db.SelectAllIds(nebulaAddress, username) + if err != nil { + return nil, err + } + for _, id := range allIds { + ids = append(ids, strconv.Itoa(id)) + } + return ids, nil +} + +func (mgr *TaskMgr) getTaskFromMap(taskID string) (*Task, bool) { + if task, ok := mgr.tasks.Load(taskID); ok { + return task.(*Task), true + } + return nil, false +} + +func (mgr *TaskMgr) getTaskFromSQL(taskID string) *Task { + taskInfo := new(TaskInfo) + mgr.db.First(taskInfo, taskID) + task := new(Task) + task.TaskInfo = taskInfo + return task +} + +type TaskStatus int + +/* + the task in memory (map) has 2 status: processing, aborted; + and the task in local sql has 2 status: finished, stoped; +*/ +const ( + StatusUnknown TaskStatus = iota + StatusFinished + StatusStoped + StatusProcessing + StatusNotExisted + StatusAborted +) + +var taskStatusMap = map[TaskStatus]string{ + StatusFinished: "statusFinished", + StatusStoped: "statusStoped", + StatusProcessing: "statusProcessing", + StatusNotExisted: "statusNotExisted", + StatusAborted: "statusAborted", +} + +var taskStatusRevMap = map[string]TaskStatus{ + "statusFinished": StatusFinished, + "statusStoped": StatusStoped, + "statusProcessing": StatusProcessing, + "statusNotExisted": StatusNotExisted, + "statusAborted": StatusAborted, +} + +func NewTaskStatus(status string) TaskStatus { + if v, ok := taskStatusRevMap[status]; ok { + return v + } + return StatusUnknown +} + +func (status TaskStatus) String() string { + if v, ok := taskStatusMap[status]; ok { + return v + } + return "statusUnknown" +} diff --git a/server-v2/api/studio/internal/svc/servicecontext.go b/server-v2/api/studio/internal/svc/servicecontext.go index 6564ff82..9cd437fa 100644 --- a/server-v2/api/studio/internal/svc/servicecontext.go +++ b/server-v2/api/studio/internal/svc/servicecontext.go @@ -3,14 +3,14 @@ package svc import ( "database/sql" "errors" - "github.com/vesoft-inc/go-pkg/httpclient" "github.com/vesoft-inc/go-pkg/response" "github.com/vesoft-inc/go-pkg/validator" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/config" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" - "github.com/zeromicro/go-zero/core/logx" + "net/http" + "strings" ) type ServiceContext struct { @@ -49,5 +49,11 @@ func createResponseHandler(c config.Config) response.Handler { // nolint:gocriti }, Errorf: logx.Errorf, DebugInfo: c.Debug.Enable, + CheckBodyType: func(r *http.Request) response.StandardHandlerBodyType { + if strings.HasPrefix(r.URL.Path, "/api/import-tasks/download") { + return response.StandardHandlerBodyNone + } + return response.StandardHandlerBodyJson + }, }) } diff --git a/server-v2/api/studio/internal/types/types.go b/server-v2/api/studio/internal/types/types.go index ee697dba..dba27cba 100644 --- a/server-v2/api/studio/internal/types/types.go +++ b/server-v2/api/studio/internal/types/types.go @@ -51,3 +51,201 @@ type FileStat struct { type FilesIndexData struct { List []FileStat `json:"list"` } + +type Connection struct { + User string `json:"user" validate:"required"` + Password string `json:"password" validate:"required"` + Address string `json:"address" validate:"required"` +} + +type ClientSettings struct { + Retry int `json:"retry,optional"` + Concurrency int `json:"concurrency,optional"` + ChannelBufferSize int `json:"channelBufferSize,optional"` + Space string `json:"space" validate:"required"` + Connection Connection `json:"connection" validate:"required"` + PostStart PostStart `json:"postStart,optional"` + PreStop PreStop `json:"preStop,optional"` +} + +type PostStart struct { + Commands string `json:"commands" validate:"required"` + AfterPeriod string `json:"afterPeriod" validate:"required"` +} + +type PreStop struct { + Commands string `json:"commands,optional"` +} + +type CSV struct { + WithHeader bool `json:"withHeader,optional"` + WithLabel bool `json:"withLabel,optional"` + Delimiter string `json:"delimiter,optional" default:","` +} + +type VID struct { + Index int64 `json:"index" validate:"required"` + Type string `json:"type" validate:"required"` + Function string `json:"function,optional"` + Prefix string `json:"prefix,optional"` +} + +type TagProp struct { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index" validate:"required"` +} + +type Tag struct { + Name string `json:"name" validate:"required"` + Props []TagProp `json:"props" validate:"required"` +} + +type Vertex struct { + VID VID `json:"vid" validate:"required"` + Tags []Tag `json:"tags" validate:"required"` +} + +type EdgeID struct { + Index int64 `json:"index" validate:"required"` + Function string `json:"function,optional"` + Type string `json:"type" validate:"required"` + Prefix string `json:"prefix,optional"` +} + +type EdgeRank struct { + Index int64 `json:"index"` +} + +type EdgeProp struct { + Name string `json:"name"` + Type string `json:"type"` + Index int64 `json:"index"` +} + +type Edge struct { + Name string `json:"name" validate:"required"` + SrcVID EdgeID `json:"srcVID" validate:"required"` + DstVID EdgeID `json:"dstVID" validate:"required"` + Rank EdgeRank `json:"rank, optional"` + Props []EdgeProp `json:"props" validate:"required"` +} + +type Schema struct { + Type string `json:"type" validate:"required"` + Edge Edge `json:"edge,optional"` + Vertex Vertex `json:"vertex,optional"` +} + +type File struct { + Path string `json:"path" validate:"required"` + FailDataPath string `json:"failDataPath" validate:"required"` + BatchSize int `json:"batchSize,optional"` + Limit int `json:"limit, optional"` + InOrder bool `json:"inOrder, optional"` + Type string `json:"type" validate:"required"` + CSV CSV `json:"csv" validate:"required"` + Schema Schema `json:"schema" validate:"required"` +} + +type ImportConfig struct { + Version string `json:"version" validate:"required"` + Description string `json:"description,optional"` + RemoveTempFiles bool `json:"removeTempFiles,optional"` + ClientSettings ClientSettings `json:"clientSettings" validate:"required"` + Files []File `json:"files" validate:"required"` +} + +type CreateImportTaskRequest struct { + Name string `json:"name" validate:"required"` + Config ImportConfig `json:"config" validate:"required"` +} + +type CreateImportTaskData struct { + Id string `json:"id"` +} + +type GetImportTaskRequest struct { + Id string `path:"id" validate:"required"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` +} + +type GetImportTaskData struct { + Id string `json:"id"` + Name string `json:"name"` + User string `json:"user"` + Address string `json:"address"` + Space string `json:"space"` + Status string `json:"status"` + CreateTime int64 `json:"createTime"` + UpdateTime int64 `json:"updateTime"` + Stats Stats `json:"stats"` +} + +type Stats struct { + NumFailed int64 `json:"numFailed"` + NumReadFailed int64 `json:"numReadFailed"` + TotalCount int64 `json:"totalCount"` + TotalBatches int64 `json:"totalBatches"` + TotalLatency int64 `json:"totalLatency"` + TotalReqTime int64 `json:"totalReqTime"` + TotalBytes int64 `json:"totalBytes"` + TotalImportedBytes int64 `json:"totalImportedBytes"` +} + +type GetManyImportTaskRequest struct { + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + Page int `form:"page,default=1"` + PageSize int `form:"pageSize,default=100"` +} + +type GetManyImportTaskData struct { + Total int64 `json:"total"` + List []GetImportTaskData `json:"list"` +} + +type GetManyImportTaskLogRequest struct { + Id string `path:"id" validate:"required"` + File string `form:"file" validate:"required"` + Offset int64 `form:"offset" validate:"min=0"` + Limit int64 `form:"limit" validate:"min=1"` +} + +type GetManyImportTaskLogData struct { + Logs []string `json:"logs"` +} + +type GetImportTaskLogNamesRequest struct { + Id string `path:"id" validate:"required""` +} + +type GetImportTaskLogNamesData struct { + Names []string `json:"names"` +} + +type DeleteImportTaskRequest struct { + Id string `path:"id"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` +} + +type StopImportTaskRequest struct { + Id string `path:"id"` + Address string `form:"address"` + Port string `form:"port"` + Username string `form:"username"` +} + +type DownloadLogsRequest struct { + Id string `path:"id" validate:"required"` + Name string `form:"name" validate:"required"` +} + +type DownloadConfigsRequest struct { + Id string `path:"id" validate:"required"` +} diff --git a/server-v2/api/studio/pkg/config/config.go b/server-v2/api/studio/pkg/config/config.go index 4c3a99fc..e206938b 100644 --- a/server-v2/api/studio/pkg/config/config.go +++ b/server-v2/api/studio/pkg/config/config.go @@ -1,4 +1,4 @@ -package config +package Config import ( "io/ioutil" @@ -21,8 +21,6 @@ type ( UploadDir string `yaml:"upload_dir"` TasksDir string `yaml:"tasks_dir"` SqlitedbFilePath string `yaml:"sqlitedb_file_path"` - Address string `yaml:"address"` - Port int `yaml:"port"` } ) @@ -32,8 +30,6 @@ const ( DefaultUploadDir = "data/upload" DefaultTasksDir = "data/tasks" DefaultSqlitedbFilePath = "data/tasks.db" - DefaultAddress = "0.0.0.0" - DefaultPort = 9000 ) func (c *Config) Validate() error { @@ -92,12 +88,6 @@ func (w *Web) Complete() { abs, _ := filepath.Abs(DefaultSqlitedbFilePath) w.SqlitedbFilePath = abs } - if w.Address == "" { - w.Address = DefaultAddress - } - if w.Port == 0 { - w.Port = DefaultPort - } } func InitConfig(path string) error { diff --git a/server-v2/api/studio/restapi/file.api b/server-v2/api/studio/restapi/file.api index f4ae3b71..d403bbed 100644 --- a/server-v2/api/studio/restapi/file.api +++ b/server-v2/api/studio/restapi/file.api @@ -25,10 +25,10 @@ type ( service studio-api { @doc "Upload File" @handler FileUpload - post /api/file + post /api/file/upload @doc "delete file" @handler FileDestroy - delete /api/file/:name returns(FileDestroyRequest) + delete /api/file/:name(FileDestroyRequest) @doc "preview file" @handler FilesIndex get /api/file returns(FilesIndexData) diff --git a/server-v2/api/studio/restapi/import.api b/server-v2/api/studio/restapi/import.api new file mode 100644 index 00000000..3f343553 --- /dev/null +++ b/server-v2/api/studio/restapi/import.api @@ -0,0 +1,235 @@ +syntax = "v1" + +type ( + Connection { + User string `json:"user" validate:"required"` + Password string `json:"password" validate:"required"` + Address string `json:"address" validate:"required"` + } + + ClientSettings { + Retry int `json:"retry,optional"` + Concurrency int `json:"concurrency,optional"` + ChannelBufferSize int `json:"channelBufferSize,optional"` + Space string `json:"space" validate:"required"` + Connection Connection `json:"connection" validate:"required"` + PostStart PostStart `json:"postStart,optional"` + PreStop PreStop `json:"preStop,optional"` + } + + PostStart { + Commands string `json:"commands" validate:"required"` + AfterPeriod string `json:"afterPeriod" validate:"required"` + } + + PreStop { + Commands string `json:"commands,optional"` + } + + CSV { + WithHeader bool `json:"withHeader,optional"` + WithLabel bool `json:"withLabel,optional"` + Delimiter string `json:"delimiter,optional" default:","` + } + + VID { + Index int64 `json:"index" validate:"required"` + Type string `json:"type" validate:"required"` + Function string `json:"function,optional"` + Prefix string `json:"prefix,optional"` + } + + TagProp { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index" validate:"required"` + } + + Tag { + Name string `json:"name" validate:"required"` + Props []TagProp `json:"props" validate:"required"` + } + + Vertex { + VID VID `json:"vid" validate:"required"` + Tags []Tag `json:"tags" validate:"required"` + } + + EdgeID { + Index int64 `json:"index" validate:"required"` + Function string `json:"function,optional"` + Type string `json:"type" validate:"required"` + Prefix string `json:"prefix,optional"` + } + + EdgeRank { + Index int64 `json:"index"` + } + + EdgeProp { + Name string `json:"name"` + Type string `json:"type"` + Index int64 `json:"index"` + } + + Edge { + Name string `json:"name" validate:"required"` + SrcVID EdgeID `json:"srcVID" validate:"required"` + DstVID EdgeID `json:"dstVID" validate:"required"` + Rank EdgeRank `json:"rank, optional"` + Props []EdgeProp `json:"props" validate:"required"` + } + + Schema { + Type string `json:"type" validate:"required"` + Edge Edge `json:"edge,optional"` + Vertex Vertex `json:"vertex,optional"` + } + + File { + Path string `json:"path" validate:"required"` + FailDataPath string `json:"failDataPath" validate:"required"` + BatchSize int `json:"batchSize,optional"` + Limit int `json:"limit, optional"` + InOrder bool `json:"inOrder, optional"` + Type string `json:"type" validate:"required"` + CSV CSV `json:"csv" validate:"required"` + Schema Schema `json:"schema" validate:"required"` + } + + ImportConfig { + Version string `json:"version" validate:"required"` + Description string `json:"description,optional"` + RemoveTempFiles bool `json:"removeTempFiles,optional"` + ClientSettings ClientSettings `json:"clientSettings" validate:"required"` + Files []File `json:"files" validate:"required"` + } + + CreateImportTaskRequest { + Name string `json:"name" validate:"required"` + Config ImportConfig `json:"config" validate:"required"` + } + + CreateImportTaskData { + Id string `json:"id"` + } + + GetImportTaskRequest { + Id string `path:"id" validate:"required"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + } + + GetImportTaskData { + Id string `json:"id"` + Name string `json:"name"` + User string `json:"user"` + Address string `json:"address"` + Space string `json:"space"` + Status string `json:"status"` + CreateTime int64 `json:"createTime"` + UpdateTime int64 `json:"updateTime"` + Stats Stats `json:"stats"` + } + + Stats { + NumFailed int64 `json:"numFailed"` + NumReadFailed int64 `json:"numReadFailed"` + TotalCount int64 `json:"totalCount"` + TotalBatches int64 `json:"totalBatches"` + TotalLatency int64 `json:"totalLatency"` + TotalReqTime int64 `json:"totalReqTime"` + TotalBytes int64 `json:"totalBytes"` + TotalImportedBytes int64 `json:"totalImportedBytes"` + } + + GetManyImportTaskRequest { + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + Page int `form:"page,default=1"` + PageSize int `form:"pageSize,default=100"` + } + + GetManyImportTaskData { + Total int64 `json:"total"` + List []GetImportTaskData `json:"list"` + } + + GetManyImportTaskLogRequest { + Id string `path:"id" validate:"required"` + File string `form:"file" validate:"required"` + Offset int64 `form:"offset" validate:"min=0"` + Limit int64 `form:"limit" validate:"min=1"` + } + + GetManyImportTaskLogData { + Logs []string `json:"logs"` + } + + GetImportTaskLogNamesRequest { + Id string `path:"id" validate:"required""` + } + + GetImportTaskLogNamesData { + Names []string `json:"names"` + } + + DeleteImportTaskRequest { + Id string `path:"id"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + } + + StopImportTaskRequest { + Id string `path:"id"` + Address string `form:"address"` + Port string `form:"port"` + Username string `form:"username"` + } + + DownloadLogsRequest { + Id string `path:"id" validate:"required"` + Name string `form:"name" validate:"required"` + } + + DownloadConfigsRequest { + Id string `path:"id" validate:"required"` + } +) + +@server( + group: importtask +) + +service studio-api { + @doc "Create Import Task" + @handler CreateImportTask + post /api/import-tasks(CreateImportTaskRequest) returns(CreateImportTaskData) + @doc "Get Import Task" + @handler GetImportTask + get /api/import-tasks/:id(GetImportTaskRequest) returns(GetImportTaskData) + @doc "Get Many Import Task" + @handler GetManyImportTask + get /api/import-tasks(GetManyImportTaskRequest) returns(GetManyImportTaskData) + @doc "Get Many Import Task Log" + @handler GetManyImportTaskLog + get /api/import-tasks/:id/logs(GetManyImportTaskLogRequest) returns(GetManyImportTaskLogData) + @doc "Get all logs file name of a Task" + @handler GetImportTaskLogNames + get /api/import-tasks/:id/task-log-names(GetImportTaskLogNamesRequest) returns(GetImportTaskLogNamesData) + @doc "Delete Import Task" + @handler DeleteImportTask + delete /api/import-tasks/:id(DeleteImportTaskRequest) + @doc "Stop Import Task" + @handler StopImportTask + get /api/import-tasks/stop/:id(StopImportTaskRequest) + @doc "Download logs" + @handler DownloadLogs + get /api/import-tasks/download/logs/:id(DownloadLogsRequest) + @doc "Download Config" + @handler DownloadConfig + get /api/import-tasks/download/config/:id(DownloadConfigsRequest) +} \ No newline at end of file diff --git a/server-v2/api/studio/restapi/studio.api b/server-v2/api/studio/restapi/studio.api index 9c362460..0b6cf67a 100644 --- a/server-v2/api/studio/restapi/studio.api +++ b/server-v2/api/studio/restapi/studio.api @@ -9,4 +9,5 @@ import ( "health.api" "gateway.api" "file.api" + "import.api" ) \ No newline at end of file diff --git a/server-v2/api/studio/studio.go b/server-v2/api/studio/studio.go index 55a1258e..9e8644d4 100644 --- a/server-v2/api/studio/studio.go +++ b/server-v2/api/studio/studio.go @@ -5,12 +5,17 @@ import ( "flag" "fmt" "net/http" + "strings" "github.com/vesoft-inc/go-pkg/middleware" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/config" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service/importer" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/auth" + Config "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/logging" + "go.uber.org/zap" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/rest" @@ -27,6 +32,18 @@ func main() { var c config.Config conf.MustLoad(*configFile, &c, conf.UseEnv()) + // init logger + loggingOptions := logging.NewOptions() + if err := loggingOptions.InitGlobals(); err != nil { + panic(err) + } + + if err := Config.InitConfig(*configFile); err != nil { + zap.L().Fatal("init config failed", zap.Error(err)) + } + + importer.InitDB() + svcCtx := svc.NewServiceContext(c) server := rest.MustNewServer(c.RestConf, rest.WithNotFoundHandler(middleware.NewAssetsHandler(middleware.AssetsConfig{ Root: "assets", @@ -38,6 +55,25 @@ func main() { // global middleware server.Use(auth.AuthMiddlewareWithCtx(svcCtx)) + server.Use(rest.ToMiddleware(middleware.ReserveRequest(middleware.ReserveRequestConfig{ + Skipper: func(r *http.Request) bool { + if strings.HasPrefix(r.URL.Path, "/api/file/upload") { + return false + } + if strings.HasPrefix(r.URL.Path, "/api/import-tasks/download") { + return false + } + return true + }, + }))) + server.Use(rest.ToMiddleware(middleware.ReserveResponseWriter(middleware.ReserveResponseWriterConfig{ + Skipper: func(r *http.Request) bool { + if strings.HasPrefix(r.URL.Path, "/api/import-tasks/download") { + return false + } + return true + }, + }))) // api handlers handler.RegisterHandlers(server, svcCtx)