From 8336460a5ebf5b5c2c6c9d3f13bf6129b2cb73dc Mon Sep 17 00:00:00 2001 From: Jingliu Xiong Date: Wed, 16 Mar 2022 20:20:16 +0800 Subject: [PATCH] mutil task import (#117) --- server/.golangci.yml | 9 + server/README.md | 352 ++++++++++---- server/config/example-config.yaml | 2 +- server/go.mod | 8 +- server/go.sum | 19 +- server/pkg/api/routes/gateway.go | 8 - server/pkg/api/routes/import.go | 44 +- server/pkg/utils/file.go | 13 + server/pkg/webserver/base/scheme.go | 4 +- server/pkg/webserver/base/types.go | 5 +- server/pkg/webserver/controller/files.go | 91 +++- server/pkg/webserver/controller/gateway.go | 143 +----- server/pkg/webserver/controller/import.go | 438 +++++++++++++++--- .../webserver/service/importer/importer.go | 174 +++++-- server/pkg/webserver/service/importer/task.go | 22 + .../webserver/service/importer/taskInfo.go | 16 + .../pkg/webserver/service/importer/taskdb.go | 73 +++ .../pkg/webserver/service/importer/taskmgr.go | 217 +++++---- 18 files changed, 1191 insertions(+), 447 deletions(-) create mode 100644 server/.golangci.yml create mode 100644 server/pkg/utils/file.go create mode 100644 server/pkg/webserver/service/importer/task.go create mode 100644 server/pkg/webserver/service/importer/taskInfo.go create mode 100644 server/pkg/webserver/service/importer/taskdb.go diff --git a/server/.golangci.yml b/server/.golangci.yml new file mode 100644 index 00000000..b531214b --- /dev/null +++ b/server/.golangci.yml @@ -0,0 +1,9 @@ +linters: + enable: + - errcheck + - goimports + - golint + - govet + - staticcheck + - gofmt + - gocritic \ No newline at end of file diff --git a/server/README.md b/server/README.md index 2028e3e2..f4197ca7 100644 --- a/server/README.md +++ b/server/README.md @@ -28,16 +28,21 @@ $ ./nebula-studio-server -studio-config="./config/example-config.yaml" | ExecNGQL | /api-nebula/db/exec | POST | | ConnectDB | /api-nebula/db/connect | POST | | DisconnectDB | /api-nebula/db/disconnect | POST | -| ImportData | /api-nebula/task/import | POST | -| HandleImportAction | /api-nebula/import/action | POST | -| FilesIndex | /api/files | GET | -| FilesDestroy | /api/files/{id:string} | DELETE | -| FilesUpload | /api/files | PUT | -| ReadLog | /api/import/log | GET | -| CreateConfigFile | /api/import/config | POST | -| Callback | /api/import/finish | POST | -| GetWorkingDir | /api/import/working_dir | GET | -| Index | / | GET,HEAD | +| ImportData | /api/import-tasks/import | POST | +| HandleImportAction | /api/import-tasks/action | POST | +| QueryImportStats | /api/import-tasks/stats/{id:string} | GET | +| DownloadConfig | /api/import-tasks/config/{id:string} | GET | +| DownloadImportLog | /api/import-tasks/{id:string}/log | GET | +| DownloadErrLog | /api/import-tasks/{id:string}/err-logs | GET | +| ReadLog | /api/import-tasks/logs | GET | +| ReadErrLog | /api/import-tasks/err-logs | GET | +| Callback | /api/import-tasks/finish | POST | +| GetWorkingDir | /api/import-tasks/working-dir | GET | +| GetTaskDir | /api/import-tasks/task-dir | GET | +| GetTaskLogNames | /api/import-tasks/{id:string}/task-log-names | GET | +| FilesIndex | /api/files | GET | +| FilesDestroy | /api/files/{id:string} | DELETE | +| FilesUpload | /api/files | PUT | #### ExecNGQL API @@ -100,7 +105,7 @@ Response: The request json body: ```json -{"configBody":{"version":"v2","description":"web console import","clientSettings":{"retry":3,"concurrency":10,"channelBufferSize":128,"space":"basketballplayer","connection":{"user":"root","password":"123","address":"192.168.8.145:9669"}},"logPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/import.log","files":[{"path":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv","failDataPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/err/data 1Fail.csv","batchSize":10,"type":"csv","csv":{"withHeader":false,"withLabel":false},"schema":{"type":"vertex","vertex":{"vid":{"index":0,"type":"string"},"tags":[{"name":"player","props":[{"name":"name","type":"string","index":1},{"name":"age","type":"int","index":2}]}]}}}]},"configPath":""} +{"configBody":{"version":"v2","description":"web console import","clientSettings":{"retry":3,"concurrency":10,"channelBufferSize":128,"space":"basketballplayer","connection":{"user":"root","password":"123","address":"192.168.8.145:9669"}},"logPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/import.log","files":[{"path":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv","failDataPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/err/data 1Fail.csv","batchSize":10,"type":"csv","csv":{"withHeader":false,"withLabel":false},"schema":{"type":"vertex","vertex":{"vid":{"index":0,"type":"string"},"tags":[{"name":"player","props":[{"name":"name","type":"string","index":1},{"name":"age","type":"int","index":2}]}]}}}]},"configPath":"","name":"task1"} ``` Response: @@ -119,145 +124,320 @@ Response: The request json body: +```json +{"taskID":"1","taskAction":"actionQuery"} +``` + +Response: + ```json { - "code": 0, - "message": "Import task 4 submit successfully", - "data": [ - "4" - ] + "code": 0, + "message": "Processing a task action successfully", + "data": { + "results": [ + { + "taskID": 1, + "name": "task1", + "space": "test", + "nebulaAddress": "192.168.8.243:9669", + "createdTime": 1644386055, + "updatedTime": 1644386056, + "user": "root", + "taskStatus": "statusAborted", + "taskMessage": "failed to open connection, error: incompatible version between client and server: Graph client version(3.0.0) is not accepted, current graph client white list: 2.6.1:2.5.0:2.5.1:2.6.0. ", + "stats": { + "numFailed": 0, + "numReadFailed": 0, + "totalCount": 0, + "totalBatches": 0, + "totalLatency": 0, + "totalReqTime": 0, + "totalBytes": 0, + "totalImportedBytes": 0 + } + } + ], + "msg": "Task query successfully" + } } ``` +#### QueryImportStats API + + /api/import-tasks/stats/3 + Response: ```json { - "code": 0, - "message": "Processing a task action successfully", - "data": { - "results": [ - { - "taskID": "4", + "code": 0, + "message": "Processing a task action successfully", + "data": { + "taskID": 3, + "name": "task1", + "space": "test", + "nebulaAddress": "192.168.8.233:9669", + "createdTime": 1646989643, + "updatedTime": 1646989646, + "user": "root", "taskStatus": "statusFinished", - "taskMessage": "" - } - ], - "msg": "Task query successfully" - } + "taskMessage": "", + "stats": { + "numFailed": 0, + "numReadFailed": 0, + "totalCount": 52, + "totalBatches": 10, + "totalLatency": 30089, + "totalReqTime": 532718, + "totalBytes": 1583, + "totalImportedBytes": 1583 + } + } } ``` -#### FilesIndex API +#### DownloadConfig API + + /api/import-tasks/config/2 Response: +``` +version: v2 +description: web console import +removeTempFiles: null +clientSettings: + retry: 3 + concurrency: 10 + channelBufferSize: 128 + space: test + connection: + user: "" + password: "" + address: "" + postStart: null + preStop: null +logPath: import.log +files: +- path: player.csv + failDataPath: 数据源 1Fail.csv + batchSize: 10 + limit: null + inOrder: null + type: csv + csv: + withHeader: false + withLabel: false + delimiter: null + schema: + type: vertex + edge: null + vertex: + vid: + index: 0 + function: null + type: string + prefix: null + tags: + - name: player + props: + - name: name + type: string + index: 1 + - name: age + type: int + index: 2 + +``` + +#### DownloadImportLog API + +The request : + +http://localhost:9000/api/import-tasks/1/log + +Response: + +a file + +#### DownloadErrLog + +The request : + +http://localhost:9000/api/import-tasks/1/err-logs?name=1Fail.csv + +Response: + +a file + +#### ReadLog API +Use params: +http://localhost:9000/api/import-tasks/logs?offset=0&limit=2&id=1 +Response: + ```json { - "code": 0, - "message": "", - "data": [ - { - "content": [ - [ - "Nobody", - "Nobody", - "0" - ], - [ - "Amar'e Stoudemire", - "Amar'e Stoudemire", - "36" - ], - [ - "Russell Westbrook", - "Russell Westbrook", - "30" - ] - ], - "path": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv", - "withHeader": false, - "dataType": "all", - "name": "player.csv", - "size": 1583 - } - ] + "code": 0, + "message": "", + "data": [ + "2022/03/16 15:23:00 [INFO] clientmgr.go:28: Create 10 Nebula Graph clients", + "2022/03/16 15:23:00 [INFO] reader.go:65: Start to read file(0): E:\\NebulaProject\\player.csv, schema: < :VID(string),player.name:string,player.age:int >" + ] } - ``` -#### FilesDestroy API +#### ReadErrLog API + +Use params: + +http://localhost:9000/api/import-tasks/err-logs?offset=0&limit=2&id=1&name=err-import.log + Response: + ```json { "code": 0, "message": "", - "data": "null" + "data": [ + "2022/03/16 15:23:00 [INFO] clientmgr.go:28: Create 10 Nebula Graph clients", + "2022/03/16 15:23:00 [INFO] reader.go:65: Start to read file(0): E:\\NebulaProject\\player.csv, schema: < :VID(string),player.name:string,player.age:int >" + ] } ``` -#### FilesUpload API -Request: -```http -Content-type:multipart/form-data -form-data: -key:file1 value:bachelor.csv -key:file2 value:like.csv + +#### Callback API + +The request json body: + +```json +{ + "task_id": "123456" +} ``` + Response: + ```json { "code": 0, "message": "", - "data": "null" + "data": "" } ``` -#### ReadLog API -Use params: -http://localhost:7001/api/import/log?dir=E:\NebulaProject\test\nebula-studio\server\data\tasks&startByte=0&endByte=1000000&taskId=4 + +#### GetWorkingDir API + Response: ```json { - "code": 0, - "message": "", - "data": "2022/01/11 16:33:48 [INFO] clientmgr.go:28: Create 10 Nebula Graph clients\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:64: Start to read file(0): E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv, schema: \u003c :VID(string),player.name:string,player.age:int \u003e\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:180: Total lines of file(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv) is: 52, error lines: 0\u003cbr /\u003e2022/01/11 16:33:48 [INFO] statsmgr.go:62: Done(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv): Time(1.45s), Finished(52), Failed(0), Read Failed(0), Latency AVG(6182us), Batches Req AVG(48887us), Rows AVG(35.87/s)\u003cbr /\u003e" -}xxxxxxxxxx {  "code": 0,  "message": "",  "data": "2022/01/11 16:33:48 [INFO] clientmgr.go:28: Create 10 Nebula Graph clients\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:64: Start to read file(0): E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv, schema: \u003c :VID(string),player.name:string,player.age:int \u003e\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:180: Total lines of file(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv) is: 52, error lines: 0\u003cbr /\u003e2022/01/11 16:33:48 [INFO] statsmgr.go:62: Done(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv): Time(1.45s), Finished(52), Failed(0), Read Failed(0), Latency AVG(6182us), Batches Req AVG(48887us), Rows AVG(35.87/s)\u003cbr /\u003e"}{    "code": 0,    "message": "",    "data": "hell"} + "code": 0, + "message": "", + "data": { + "uploadDir": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload" + } +} ``` -#### CreateConfigFile API -Create a config file in config_dir + +#### GetTaskDir API + +Response: + ```json -{"config":{"version":"v2","description":"web console import","clientSettings":{"retry":3,"concurrency":10,"channelBufferSize":128,"space":"basketballplayer","connection":{"user":"root","password":"123","address":"192.168.8.145:9669"}},"logPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/import.log","files":[]},"mountPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks"} +{ + "code": 0, + "message": "", + "data": { + "taskDir": "E:\\NebulaProject\\nebula-studio\\server\\data\\tasks\\1" + } +} ``` +#### GetTaskLogNames API + +Request: + +http://localhost:9000/api/import-tasks/1/task-log-names + Response: ```json { "code": 0, "message": "", - "data": "" + "data": [ + { + "name": "import.log" + { + "name": "err 1.log" + }, + { + "name": "err 2.log" + } + ] } ``` -#### Callback API -The request json body: +#### FilesIndex API + +Response: ```json { - "task_id": "123456" + "code": 0, + "message": "", + "data": [ + { + "content": [ + [ + "Nobody", + "Nobody", + "0" + ], + [ + "Amar'e Stoudemire", + "Amar'e Stoudemire", + "36" + ], + [ + "Russell Westbrook", + "Russell Westbrook", + "30" + ] + ], + "path": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv", + "withHeader": false, + "dataType": "all", + "name": "player.csv", + "size": 1583 + } + ] } + ``` +#### FilesDestroy API + Response: ```json { "code": 0, "message": "", - "data": "" + "data": "null" } ``` -#### GetWorkingDir API +#### FilesUpload API + +Request: + +```http +Content-type:multipart/form-data +form-data: +key:file1 value:bachelor.csv +key:file2 value:like.csv +``` Response: @@ -265,10 +445,8 @@ Response: { "code": 0, "message": "", - "data": { - "taskDir": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks", - "uploadDir": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload" - } + "data": "null" } ``` +#### \ No newline at end of file diff --git a/server/config/example-config.yaml b/server/config/example-config.yaml index 5b635b2b..218b5d5e 100644 --- a/server/config/example-config.yaml +++ b/server/config/example-config.yaml @@ -4,4 +4,4 @@ web: # tasks_dir: # sqlitedb_file_path: # ip: - port: 9000 \ No newline at end of file +# port: 7001 # (production env), defaut is 9000 (test env) \ No newline at end of file diff --git a/server/go.mod b/server/go.mod index b1a84c16..e8fe1f8e 100644 --- a/server/go.mod +++ b/server/go.mod @@ -3,12 +3,16 @@ module github.com/vesoft-inc/nebula-studio/server go 1.17 require ( + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 github.com/kataras/iris/v12 v12.2.0-alpha4 github.com/mattn/go-sqlite3 v2.0.3+incompatible + github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220124093655-f7f5b4c63e6a - github.com/vesoft-inc/nebula-importer v1.0.1-0.20220214094549-1dd8e730a586 + github.com/vesoft-inc/nebula-importer v1.0.1-0.20220314055243-ece1a79d37e5 go.uber.org/zap v1.17.0 gopkg.in/yaml.v2 v2.4.0 + gorm.io/driver/sqlite v1.2.6 + gorm.io/gorm v1.22.5 ) require ( @@ -29,6 +33,8 @@ require ( github.com/gorilla/css v1.0.0 // indirect github.com/iris-contrib/jade v1.1.4 // indirect github.com/iris-contrib/schema v0.0.6 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.4 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kataras/blocks v0.0.4 // indirect github.com/kataras/golog v0.1.7 // indirect diff --git a/server/go.sum b/server/go.sum index a9396333..09186994 100644 --- a/server/go.sum +++ b/server/go.sum @@ -16,6 +16,8 @@ github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0 github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible h1:Ppm0npCCsmuR9oQaBtRuZcmILVE74aXE+AmrJj8L2ns= @@ -96,6 +98,11 @@ github.com/iris-contrib/jade v1.1.4 h1:WoYdfyJFfZIUgqNAeOyRfTNQZOksSlZ6+FnXR3AEp github.com/iris-contrib/jade v1.1.4/go.mod h1:EDqR+ur9piDl6DUgs6qRrlfzmlx/D5UybogqrXvJTBE= github.com/iris-contrib/schema v0.0.6 h1:CPSBLyx2e91H2yJzPuhGuifVRnZBBJ3pCOMbOvPZaTw= github.com/iris-contrib/schema v0.0.6/go.mod h1:iYszG0IOsuIsfzjymw1kMzTL8YQcCWlm65f3wX8J5iA= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.4 h1:tHnRBy1i5F2Dh8BAFxqFzxKqqvezXrL2OW1TnX+Mlas= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= @@ -129,6 +136,7 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs= +github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mediocregopher/radix/v3 v3.6.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= @@ -164,6 +172,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca h1:NugYot0LIVPxTvN8n+Kvkn6TrbMyxQiuvKdEwFdR9vI= +github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/closestmatch v2.1.0+incompatible h1:Uel2GXEpJqOWBrlyI+oY9LTiyyjYS17cCYRqP13/SHk= @@ -205,8 +215,8 @@ github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220214062853-d0c59964d0af h1:5z3eC9o github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220214062853-d0c59964d0af/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220124093655-f7f5b4c63e6a h1:tirwUK05V3LPsIFaLa0cHmVqFD3F1Tb8PlDTK7YWQGc= github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220124093655-f7f5b4c63e6a/go.mod h1:sFEvE+cY4TgwqWx6H6msOqAUzRhsEHHKaaMgIZENHuQ= -github.com/vesoft-inc/nebula-importer v1.0.1-0.20220214094549-1dd8e730a586 h1:7SSeICfj3knCvUy9TP/L6wbyYiM0OTVVEkP0x0Q+DOI= -github.com/vesoft-inc/nebula-importer v1.0.1-0.20220214094549-1dd8e730a586/go.mod h1:Hgn05JWXQcxyoJAjJVN34PnUndY+JcNKzpb0DoyFrsM= +github.com/vesoft-inc/nebula-importer v1.0.1-0.20220314055243-ece1a79d37e5 h1:YghW/f27/NEpP8h3yAn2p7WCZYUIx3rc+pdS3v9QmRc= +github.com/vesoft-inc/nebula-importer v1.0.1-0.20220314055243-ece1a79d37e5/go.mod h1:Hgn05JWXQcxyoJAjJVN34PnUndY+JcNKzpb0DoyFrsM= github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= @@ -323,5 +333,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.2.6 h1:SStaH/b+280M7C8vXeZLz/zo9cLQmIGwwj3cSj7p6l4= +gorm.io/driver/sqlite v1.2.6/go.mod h1:gyoX0vHiiwi0g49tv+x2E7l8ksauLK0U/gShcdUsjWY= +gorm.io/gorm v1.22.3/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= +gorm.io/gorm v1.22.5 h1:lYREBgc02Be/5lSCTuysZZDb6ffL2qrat6fg9CFbvXU= +gorm.io/gorm v1.22.5/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8= moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE= diff --git a/server/pkg/api/routes/gateway.go b/server/pkg/api/routes/gateway.go index a3c3dda6..51d0e74c 100644 --- a/server/pkg/api/routes/gateway.go +++ b/server/pkg/api/routes/gateway.go @@ -25,13 +25,5 @@ var GatewayRoute = base.Route{ Path: "db/disconnect", POST: controller.DisconnectDB, }, - { - Path: "task/import", - POST: controller.ImportData, - }, - { - Path: "task/import/action", - POST: controller.HandleImportAction, - }, }, } diff --git a/server/pkg/api/routes/import.go b/server/pkg/api/routes/import.go index 51775130..fd184a18 100644 --- a/server/pkg/api/routes/import.go +++ b/server/pkg/api/routes/import.go @@ -6,24 +6,56 @@ import ( ) var ImportRoute = base.Route{ - Path: "/api/import", + Path: "/api/import-tasks", Desc: "import", SubRoutes: []base.Route{ { - Path: "log", - GET: controller.ReadLog, + Path: "import", + POST: controller.ImportData, }, { - Path: "config", - POST: controller.CreateConfigFile, + Path: "action", + POST: controller.HandleImportAction, + }, + { + Path: "stats/{id:string}", + GET: controller.QueryImportStats, + }, + { + Path: "config/{id:string}", + GET: controller.DownloadConfigFile, + }, + { + Path: "{id:string}/log", + GET: controller.DownloadImportLog, + }, + { + Path: "{id:string}/err-logs", + GET: controller.DownloadErrLog, + }, + { + Path: "logs", + GET: controller.ReadImportLog, + }, + { + Path: "err-logs", + GET: controller.ReadErrLog, }, { Path: "finish", POST: controller.Callback, }, { - Path: "working_dir", + Path: "working-dir", GET: controller.GetWorkingDir, }, + { + Path: "task-dir", + GET: controller.GetTaskDir, + }, + { + Path: "{id:string}/task-log-names", + GET: controller.GetTaskLogNames, + }, }, } diff --git a/server/pkg/utils/file.go b/server/pkg/utils/file.go new file mode 100644 index 00000000..828fa342 --- /dev/null +++ b/server/pkg/utils/file.go @@ -0,0 +1,13 @@ +package utils + +import "os" + +func CreateDir(dir string) error { + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return err + } + return os.MkdirAll(dir, os.ModePerm) + } + return nil +} diff --git a/server/pkg/webserver/base/scheme.go b/server/pkg/webserver/base/scheme.go index c10344e6..fe03dc19 100644 --- a/server/pkg/webserver/base/scheme.go +++ b/server/pkg/webserver/base/scheme.go @@ -36,7 +36,9 @@ func WrapHandler(handler Handler) iris.Handler { return func(ctx iris.Context) { result := handler(ctx) ctx.StatusCode(iris.StatusOK) - _, _ = ctx.JSON(&result) + if result != nil { + _, _ = ctx.JSON(&result) + } } } diff --git a/server/pkg/webserver/base/types.go b/server/pkg/webserver/base/types.go index 27b906ad..bba7567a 100644 --- a/server/pkg/webserver/base/types.go +++ b/server/pkg/webserver/base/types.go @@ -3,7 +3,8 @@ package base type StatusCode int const ( - Error StatusCode = -1 - Success StatusCode = 0 + Error StatusCode = -1 + Success StatusCode = 0 + // TODO: need to del it AuthorizationError StatusCode = 401 ) diff --git a/server/pkg/webserver/controller/files.go b/server/pkg/webserver/controller/files.go index 472e9e7e..06e7fc04 100644 --- a/server/pkg/webserver/controller/files.go +++ b/server/pkg/webserver/controller/files.go @@ -1,17 +1,20 @@ package controller import ( + "bufio" "encoding/csv" "fmt" "io/ioutil" - "net/http" + "mime/multipart" "os" "path/filepath" "github.com/vesoft-inc/nebula-studio/server/pkg/config" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/base" + "github.com/axgle/mahonia" "github.com/kataras/iris/v12" + "github.com/saintfish/chardet" "go.uber.org/zap" ) @@ -28,8 +31,7 @@ func FilesDestroy(ctx iris.Context) base.Result { id := ctx.Params().GetString("id") dir := config.Cfg.Web.UploadDir target := filepath.Join(dir, id) - _, err := os.Stat(target) - if err != nil { + if _, err := os.Stat(target); err != nil { zap.L().Warn("del file error", zap.Error(err)) return base.Response{ Code: base.Error, @@ -37,8 +39,7 @@ func FilesDestroy(ctx iris.Context) base.Result { } } // if target is directory, it is not empty. - err = os.Remove(target) - if err != nil { + if err := os.Remove(target); err != nil { zap.L().Warn("del file error", zap.Error(err)) return base.Response{ Code: base.Error, @@ -51,7 +52,6 @@ func FilesDestroy(ctx iris.Context) base.Result { } func FilesIndex(ctx iris.Context) base.Result { - dir := config.Cfg.Web.UploadDir filesInfo, err := ioutil.ReadDir(dir) if err != nil { @@ -64,7 +64,6 @@ func FilesIndex(ctx iris.Context) base.Result { data := make([]*fileStat, 0) for _, fileInfo := range filesInfo { - if fileInfo.IsDir() { continue } @@ -78,14 +77,12 @@ func FilesIndex(ctx iris.Context) base.Result { count := 0 content := make([][]string, 0) for count < 3 { - line, err := reader.Read() count++ if err != nil { break } content = append(content, line) - } data = append(data, &fileStat{ Content: content, @@ -95,17 +92,14 @@ func FilesIndex(ctx iris.Context) base.Result { Name: fileInfo.Name(), Size: fileInfo.Size(), }) - } return base.Response{ Code: base.Success, Data: data, } - } func FilesUpload(ctx iris.Context) base.Result { - dir := config.Cfg.Web.UploadDir files, _, err := ctx.UploadFormFiles(dir) if err != nil { @@ -115,10 +109,79 @@ func FilesUpload(ctx iris.Context) base.Result { Message: err.Error(), } } - fmt.Println(len(files)) - ctx.StatusCode(http.StatusOK) + for _, file := range files { + charSet, err := checkCharset(file) + if err != nil { + zap.L().Warn("upload file error, check charset fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + if charSet == "UTF-8" { + continue + } + path := filepath.Join(dir, file.Filename) + if err = changeFileCharset2UTF8(path, charSet); err != nil { + zap.L().Warn("upload file error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + } + zap.L().Info(fmt.Sprintf("upload %d files", len(files))) return base.Response{ Code: base.Success, } +} + +func checkCharset(file *multipart.FileHeader) (string, error) { + f, err := file.Open() + if err != nil { + return "", err + } + defer f.Close() + bytes := make([]byte, 1024) + if _, err = f.Read(bytes); err != nil { + return "", err + } + detector := chardet.NewTextDetector() + best, err := detector.DetectBest(bytes) + if err != nil { + return "", err + } + return best.Charset, nil +} +func changeFileCharset2UTF8(filePath string, charSet string) error { + fileUTF8Path := filePath + "-copy" + err := func() error { + file, err := os.OpenFile(filePath, os.O_RDONLY, 0666) + if err != nil { + zap.L().Warn("open file fail", zap.Error(err)) + return err + } + defer file.Close() + reader := bufio.NewReader(file) + decoder := mahonia.NewDecoder(charSet) + decodeReader := decoder.NewReader(reader) + fileUTF8, err := os.OpenFile(fileUTF8Path, os.O_RDONLY|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + defer fileUTF8.Close() + writer := bufio.NewWriter(fileUTF8) + if _, err = writer.ReadFrom(decodeReader); err != nil { + return err + } + return nil + }() + if err != nil { + return err + } + if err = os.Rename(fileUTF8Path, filePath); err != nil { + return err + } + return nil } diff --git a/server/pkg/webserver/controller/gateway.go b/server/pkg/webserver/controller/gateway.go index 18037042..365dfa78 100644 --- a/server/pkg/webserver/controller/gateway.go +++ b/server/pkg/webserver/controller/gateway.go @@ -2,15 +2,11 @@ package controller import ( "encoding/base64" - "fmt" "strings" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/gateway/dao" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" - importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" - importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/base" - "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/service/importer" "github.com/kataras/iris/v12" "go.uber.org/zap" @@ -39,68 +35,9 @@ type disConnectDBParams struct { Nsid string `json:"nsid"` } -type importDataParams struct { - ConfigPath string `json:"configPath"` - ConfigBody *importconfig.YAMLConfig `json:"configBody"` -} - -type handleImportActionParams struct { - TaskId string `json:"taskId"` - TaskAction string `json:"taskAction"` -} - -func BatchExecNGQL(ctx iris.Context) base.Result { - params := new(batchExecNGQLParams) - err := ctx.ReadJSON(params) - if err != nil { - zap.L().Warn("execNGQLParams get fail", zap.Error(err)) - return base.Response{ - Code: base.Error, - Message: err.Error(), - } - } - nsid := ctx.GetCookie("nsid") - data := make([]map[string]interface{}, 0) - - for i := 0; i < len(params.Gqls); i++ { - gql := params.Gqls[i] - execute, _, err := dao.Execute(nsid, gql, make([]string, 0)) - gqlRes := make(map[string]interface{}) - gqlRes["gql"] = gql - if err != nil { - gqlRes["message"] = err.Error() - gqlRes["code"] = base.Error - } else { - gqlRes["code"] = base.Success - } - gqlRes["data"] = execute - data = append(data, gqlRes) - } - - if len(params.ParamList) > 0 { - execute, _, err := dao.Execute(nsid, "", params.ParamList) - gqlRes := make(map[string]interface{}) - gqlRes["gql"] = strings.Join(params.ParamList, "; ") - if err != nil { - gqlRes["message"] = err.Error() - gqlRes["code"] = base.Error - } else { - gqlRes["code"] = base.Success - } - gqlRes["data"] = execute - data = append(data, gqlRes) - } - - return base.Response{ - Code: base.Success, - Data: data, - } -} - func ExecNGQL(ctx iris.Context) base.Result { params := new(execNGQLParams) - err := ctx.ReadJSON(params) - if err != nil { + if err := ctx.ReadJSON(params); err != nil { zap.L().Warn("execNGQLParams get fail", zap.Error(err)) return base.Response{ Code: base.Error, @@ -140,11 +77,16 @@ func ConnectDB(ctx iris.Context) base.Result { } } account := strings.Split(string(decode), ":") + if len(account) < 2 { + return base.Response{ + Code: base.AuthorizationError, + Message: "len of account is less than two", + } + } username, password := account[0], account[1] params := new(connectDBParams) - err = ctx.ReadJSON(params) - if err != nil { + if err = ctx.ReadJSON(params); err != nil { zap.L().Warn("connectDBParams get fail", zap.Error(err)) return base.Response{ Code: base.Error, @@ -152,9 +94,6 @@ func ConnectDB(ctx iris.Context) base.Result { } } clientInfo, err := dao.Connect(params.Address, params.Port, username, password) - if err != nil { - return nil - } if err != nil { zap.L().Warn("connect DB fail", zap.Error(err)) return base.Response{ @@ -163,11 +102,9 @@ func ConnectDB(ctx iris.Context) base.Result { } } data := make(map[string]string) - nsid := clientInfo.ClientID - version := clientInfo.NebulaVersion - data["nsid"] = nsid - data["version"] = string(version) - ctx.SetCookieKV("nsid", nsid) + data["nsid"] = clientInfo.ClientID + data["version"] = string(clientInfo.NebulaVersion) + ctx.SetCookieKV("nsid", data["nsid"]) return base.Response{ Code: base.Success, Data: data, @@ -176,8 +113,7 @@ func ConnectDB(ctx iris.Context) base.Result { func DisconnectDB(ctx iris.Context) base.Result { params := new(disConnectDBParams) - err := ctx.ReadJSON(params) - if err != nil { + if err := ctx.ReadJSON(params); err != nil { zap.L().Warn("disConnectDBParams get fail", zap.Error(err)) return base.Response{ Code: base.Error, @@ -189,58 +125,3 @@ func DisconnectDB(ctx iris.Context) base.Result { Code: base.Success, } } - -func ImportData(ctx iris.Context) base.Result { - taskId := importer.NewTaskID() - task := importer.NewTask(taskId) - importer.GetTaskMgr().PutTask(taskId, &task) - params := new(importDataParams) - err := ctx.ReadJSON(params) - if err != nil { - zap.L().Warn("importDataParams get fail", zap.Error(err)) - err = importerErrors.Wrap(importerErrors.InvalidConfigPathOrFormat, err) - } else { - err = importer.Import(taskId, params.ConfigPath, params.ConfigBody) - } - - if err != nil { - // task err: import task not start err handle - task.TaskStatus = importer.StatusAborted.String() - zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskId, err)) - return base.Response{ - Code: base.Error, - Message: err.Error(), - } - } - - return base.Response{ - Code: base.Success, - Data: []string{taskId}, - Message: fmt.Sprintf("Import task %s submit successfully", taskId), - } -} - -func HandleImportAction(ctx iris.Context) base.Result { - params := new(handleImportActionParams) - err := ctx.ReadJSON(params) - if err != nil { - zap.L().Warn("handleImportActionParams get fail", zap.Error(err)) - return base.Response{ - Code: base.Error, - Message: err.Error(), - } - } - data, err := importer.ImportAction(params.TaskId, importer.NewTaskAction(params.TaskAction)) - if err != nil { - zap.L().Warn("importAction fail", zap.Error(err)) - return base.Response{ - Code: base.Error, - Message: err.Error(), - } - } - return base.Response{ - Code: base.Success, - Message: "Processing a task action successfully", - Data: data, - } -} diff --git a/server/pkg/webserver/controller/import.go b/server/pkg/webserver/controller/import.go index ff6cf81f..49dca92b 100644 --- a/server/pkg/webserver/controller/import.go +++ b/server/pkg/webserver/controller/import.go @@ -1,21 +1,25 @@ package controller import ( + "bufio" "encoding/json" + "errors" + "fmt" "io/ioutil" "os" "path/filepath" "strconv" - "strings" "sync" importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" + importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" "github.com/vesoft-inc/nebula-studio/server/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/pkg/utils" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/base" + "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/service/importer" "github.com/kataras/iris/v12" "go.uber.org/zap" - "gopkg.in/yaml.v2" ) type dirResponse struct { @@ -23,16 +27,250 @@ type dirResponse struct { UploadDir string `json:"uploadDir,omitempty"` } +type log struct { + Name string `json:"name"` +} + +type importDataParams struct { + ConfigPath string `json:"configPath"` + ConfigBody *importconfig.YAMLConfig `json:"configBody"` + Name string `json:"name"` +} + +type handleImportActionParams struct { + TaskId string `json:"taskId"` + TaskAction string `json:"taskAction"` +} + +const ( + importLogName = "import.log" + errContentDir = "err" +) + var muTaskId sync.RWMutex -func ReadLog(ctx iris.Context) base.Result { - startByte, _ := strconv.ParseInt(ctx.URLParam("startByte"), 10, 64) - endByte, _ := strconv.ParseInt(ctx.URLParam("endByte"), 10, 64) - dir := ctx.URLParam("dir") - taskId := ctx.URLParam("ReadJSON") +func ImportData(ctx iris.Context) base.Result { + params := new(importDataParams) + err := ctx.ReadJSON(params) + if err != nil { + zap.L().Warn("importDataParams get fail", zap.Error(err)) + err = importerErrors.Wrap(importerErrors.InvalidConfigPathOrFormat, err) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + if err = validImportDataParams(params); err != nil { + zap.L().Warn("importDataParams get fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + // create config file + taskDir, err := importer.GetNewTaskDir() + if err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + if err := importer.CreateConfigFile(taskDir, *params.ConfigBody); err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + // create err dir + taskErrDir := filepath.Join(taskDir, "err") + if err = utils.CreateDir(taskErrDir); err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + // import + nebulaAddress := *params.ConfigBody.NebulaClientSettings.Connection.Address + user := *params.ConfigBody.NebulaClientSettings.Connection.User + name := params.Name + space := *params.ConfigBody.NebulaClientSettings.Space + task, taskID, err := importer.GetTaskMgr().NewTask(nebulaAddress, user, name, space) + if err != nil { + zap.L().Warn("init task fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + if err = importer.Import(taskID, params.ConfigPath, params.ConfigBody); err != nil { + // task err: import task not start err handle + task.TaskInfo.TaskStatus = importer.StatusAborted.String() + err1 := importer.GetTaskMgr().FinishTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskID, err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + return base.Response{ + Code: base.Success, + Data: []string{taskID}, + Message: fmt.Sprintf("Import task %s submit successfully", taskID), + } +} + +func QueryImportStats(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + if id == "" { + zap.L().Warn("queryImportStats id get fail") + return base.Response{ + Code: base.Error, + Message: "queryImportStats id get fail", + } + } + taskInfo, err := importer.ImportStatus(id) + if err != nil { + zap.L().Warn("queryImportStats fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + return base.Response{ + Code: base.Success, + Message: "Processing a task action successfully", + Data: taskInfo, + } +} - path := filepath.Join(dir, "import.log") - bytes, err := readFile(path, startByte, endByte) +func HandleImportAction(ctx iris.Context) base.Result { + params := new(handleImportActionParams) + err := ctx.ReadJSON(params) + if err != nil { + zap.L().Warn("handleImportActionParams get fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + data, err := importer.ImportAction(params.TaskId, importer.NewTaskAction(params.TaskAction)) + if err != nil { + zap.L().Warn("importAction fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + return base.Response{ + Code: base.Success, + Message: "Processing a task action successfully", + Data: data, + } +} + +func DownloadConfigFile(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + if id == "" { + return base.Response{ + Code: base.Error, + Message: "id parse failed", + } + } + configPath := filepath.Join(config.Cfg.Web.TasksDir, id, "config.yaml") + if err := ctx.SendFile(configPath, "config.yaml"); err != nil { + return base.Response{ + Code: base.Error, + Message: "id parse failed", + } + } + return nil +} + +func DownloadImportLog(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + if id == "" { + return base.Response{ + Code: base.Error, + Message: "id parse failed", + } + } + path := filepath.Join(config.Cfg.Web.TasksDir, id, importLogName) + if err := ctx.SendFile(path, importLogName); err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + return nil +} + +func DownloadErrLog(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + if id == "" { + return base.Response{ + Code: base.Error, + Message: "id parse failed", + } + } + name := ctx.URLParam("name") + if name == "" { + return base.Response{ + Code: base.Error, + Message: "name parse failed", + } + } + path := filepath.Join(config.Cfg.Web.TasksDir, id, errContentDir, name) + if err := ctx.SendFile(path, name); err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + return nil +} + +func validImportDataParams(params *importDataParams) error { + if params.ConfigBody.NebulaClientSettings.Connection.Address == nil || params.ConfigBody.NebulaClientSettings. + Connection.User == nil || params.ConfigBody.NebulaClientSettings.Space == nil { + return errors.New("importDataParams is wrong") + } + return nil +} + +func ReadImportLog(ctx iris.Context) base.Result { + offset, err := strconv.ParseInt(ctx.URLParam("offset"), 10, 64) + if err != nil { + zap.L().Warn("offset parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + limitStr := ctx.URLParam("limit") + var limit int64 = -1 + if limitStr != "" { + l, err := strconv.ParseInt(limitStr, 10, 64) + if err != nil { + zap.L().Warn("limit parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + limit = l + } + taskId := ctx.URLParam("id") + if taskId == "" { + return base.Response{ + Code: base.Error, + Message: "parse id fail", + } + } + path := filepath.Join(config.Cfg.Web.TasksDir, taskId, importLogName) + lines, err := readFile(path, offset, limit) if err != nil { return base.Response{ Code: base.Error, @@ -62,92 +300,131 @@ func ReadLog(ctx iris.Context) base.Result { } } } - if len(bytes) == 0 && taskIdJSON[taskId]{ + if len(lines) == 0 && taskIdJSON[taskId] { return base.Response{ Code: base.Success, Data: "", } } - if len(bytes) == 0 { + if len(lines) == 0 { return base.Response{ Code: base.Error, } } - - log := string(bytes) - log = strings.Replace(log, "\n", "
", -1) return base.Response{ Code: base.Success, - Data: log, + Data: lines, } } -func readFile(path string, startByte, endByte int64) ([]byte, error) { - file, err := os.Open(path) - defer file.Close() +func ReadErrLog(ctx iris.Context) base.Result { + offset, err := strconv.ParseInt(ctx.URLParam("offset"), 10, 64) if err != nil { - zap.L().Warn("open file error", zap.Error(err)) - return nil, err - } - _, err = file.Seek(startByte, 0) - if err != nil { - zap.L().Warn("file seek error", zap.Error(err)) - return nil, err - } - stat, _ := file.Stat() - if stat.Size() < endByte { - endByte = stat.Size() + zap.L().Warn("offset parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } } - if endByte < startByte { - bytes := make([]byte, 0) - return bytes, nil + limitStr := ctx.URLParam("limit") + var limit int64 = -1 + if limitStr != "" { + l, err := strconv.ParseInt(limitStr, 10, 64) + if err != nil { + zap.L().Warn("limit parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + limit = l } - bytes := make([]byte, endByte-startByte) - _, err = file.Read(bytes) - if err != nil { - zap.L().Warn("read file error", zap.Error(err)) - return nil, err + name := ctx.URLParam("name") + if name == "" { + return base.Response{ + Code: base.Error, + Message: "parse name fail", + } } - return bytes, nil -} - -func CreateConfigFile(ctx iris.Context) base.Result { - type Params struct { - MountPath string `json:"mountPath"` - Config importconfig.YAMLConfig `json:"config"` + taskId := ctx.URLParam("id") + if taskId == "" { + return base.Response{ + Code: base.Error, + Message: "parse id fail", + } } - params := new(Params) - err := ctx.ReadJSON(params) + path := filepath.Join(config.Cfg.Web.TasksDir, taskId, errContentDir, name) + lines, err := readFile(path, offset, limit) if err != nil { - zap.L().Warn("config change to json wrong", zap.Error(err)) return base.Response{ Code: base.Error, Message: err.Error(), } } - fileName := "config.yaml" - dir := params.MountPath - _, err = os.Stat(dir) - if os.IsNotExist(err) { - os.MkdirAll(dir, os.ModePerm) - } - path := filepath.Join(dir, fileName) - outYaml, err := yaml.Marshal(params.Config) - err = os.WriteFile(path, outYaml, 0644) + muTaskId.RLock() + taskIdBytes, err := ioutil.ReadFile(config.Cfg.Web.TaskIdPath) + muTaskId.RUnlock() if err != nil { - zap.L().Warn("write"+path+"file error", zap.Error(err)) + zap.L().Warn("read taskId file error", zap.Error(err)) return base.Response{ Code: base.Error, Message: err.Error(), } } - + taskIdJSON := make(map[string]bool) + if len(taskIdBytes) != 0 { + err = json.Unmarshal(taskIdBytes, &taskIdJSON) + if err != nil { + zap.L().Warn("parse taskId file error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + } + if len(lines) == 0 && taskIdJSON[taskId] { + return base.Response{ + Code: base.Success, + Data: "", + } + } + if len(lines) == 0 { + return base.Response{ + Code: base.Error, + } + } return base.Response{ Code: base.Success, + Data: lines, } } +func readFile(path string, offset int64, limit int64) ([]string, error) { + file, err := os.Open(path) + if err != nil { + zap.L().Warn("open file error", zap.Error(err)) + return nil, err + } + defer file.Close() + scanner := bufio.NewScanner(file) + res := make([]string, 0) + if limit != -1 { + for lineIndex := int64(0); scanner.Scan() && lineIndex < offset+limit; lineIndex++ { + if lineIndex >= offset { + res = append(res, scanner.Text()) + } + } + } else { + for lineIndex := int64(0); scanner.Scan(); lineIndex++ { + if lineIndex >= offset { + res = append(res, scanner.Text()) + } + } + } + return res, nil +} + func Callback(ctx iris.Context) base.Result { type Params struct { TaskId string `json:"taskId"` @@ -215,7 +492,6 @@ func Callback(ctx iris.Context) base.Result { func GetWorkingDir(ctx iris.Context) base.Result { data := dirResponse{ - TaskDir: config.Cfg.Web.TasksDir, UploadDir: config.Cfg.Web.UploadDir, } return base.Response{ @@ -223,3 +499,47 @@ func GetWorkingDir(ctx iris.Context) base.Result { Data: data, } } + +func GetTaskDir(ctx iris.Context) base.Result { + taskDir, err := importer.GetNewTaskDir() + if err != nil { + return base.Response{ + Code: base.Error, + } + } + data := dirResponse{ + TaskDir: taskDir, + } + return base.Response{ + Code: base.Success, + Data: data, + } +} + +func GetTaskLogNames(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + errLogDir := filepath.Join(config.Cfg.Web.TasksDir, id, "err") + fileInfos, err := ioutil.ReadDir(errLogDir) + if err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + logs := make([]log, 0) + importLog := log{ + Name: "import.log", + } + logs = append(logs, importLog) + for _, fileInfo := range fileInfos { + name := fileInfo.Name() + l := log{ + Name: name, + } + logs = append(logs, l) + } + return base.Response{ + Code: base.Success, + Data: logs, + } +} diff --git a/server/pkg/webserver/service/importer/importer.go b/server/pkg/webserver/service/importer/importer.go index 48dfb220..3f23648e 100644 --- a/server/pkg/webserver/service/importer/importer.go +++ b/server/pkg/webserver/service/importer/importer.go @@ -3,12 +3,18 @@ package importer import ( "errors" "fmt" + "os" + "path/filepath" + "sort" "time" importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-studio/server/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/pkg/utils" "go.uber.org/zap" + "gopkg.in/yaml.v2" ) type ImportResult struct { @@ -22,15 +28,72 @@ type ImportResult struct { } type ActionResult struct { - Results []Task `json:"results"` - Msg string `json:"msg"` + Results []TaskInfo `json:"results"` + Msg string `json:"msg"` +} + +func GetNewTaskDir() (string, error) { + taskID, err := GetTaskMgr().NewTaskID() + if err != nil { + return "", err + } + taskDir := filepath.Join(config.Cfg.Web.TasksDir, taskID) + return taskDir, nil +} + +func CreateConfigFile(dir string, config importconfig.YAMLConfig) error { + fileName := "config.yaml" + err := utils.CreateDir(dir) + if err != nil { + return err + } + path := filepath.Join(dir, fileName) + // erase user information + address := *config.NebulaClientSettings.Connection.Address + user := *config.NebulaClientSettings.Connection.User + password := *config.NebulaClientSettings.Connection.Password + *config.NebulaClientSettings.Connection.Address = "" + *config.NebulaClientSettings.Connection.User = "" + *config.NebulaClientSettings.Connection.Password = "" + + // erase path infomation + logPath := *config.LogPath + *config.LogPath = "import.log" + paths := make([]string, 0) + failDataPaths := make([]string, 0) + for _, file := range config.Files { + paths = append(paths, *file.Path) + failDataPaths = append(failDataPaths, *file.FailDataPath) + _, fileName := filepath.Split(*file.Path) + _, fileDataName := filepath.Split(*file.FailDataPath) + *file.Path = fileName + *file.FailDataPath = fileDataName + } + + outYaml, err := yaml.Marshal(config) + if err != nil { + return err + } + if err := os.WriteFile(path, outYaml, 0644); err != nil { + zap.L().Warn("write"+path+"file error", zap.Error(err)) + return err + } + + *config.LogPath = logPath + *config.NebulaClientSettings.Connection.Address = address + *config.NebulaClientSettings.Connection.User = user + *config.NebulaClientSettings.Connection.Password = password + for i, file := range config.Files { + *file.Path = paths[i] + *file.FailDataPath = failDataPaths[i] + } + return nil } func Import(taskID string, configPath string, configBody *importconfig.YAMLConfig) (err error) { zap.L().Debug(fmt.Sprintf("Start a import task: `%s`", taskID)) var conf *importconfig.YAMLConfig - if configPath != "" { conf, err = importconfig.Parse( configPath, @@ -42,43 +105,74 @@ func Import(taskID string, configPath string, configBody *importconfig.YAMLConfi } else { conf = configBody } - if err := conf.ValidateAndReset(""); err != nil { return err } task, _ := GetTaskMgr().GetTask(taskID) - go func() { result := ImportResult{} - now := time.Now() task.GetRunner().Run(conf) timeCost := time.Since(now).Milliseconds() - result.TaskId = taskID result.TimeCost = fmt.Sprintf("%dms", timeCost) - - if rerr := task.GetRunner().Error(); rerr != nil { - // task err: import task not finished err handle - task.TaskStatus = StatusAborted.String() - - err, _ := rerr.(importerErrors.ImporterError) + if rerrs := task.GetRunner().Errors(); len(rerrs) != 0 { + allErrIsNotCompleteError := true + for _, rerr := range rerrs { + err := rerr.(importerErrors.ImporterError) + if err.ErrCode != importerErrors.NotCompleteError { + allErrIsNotCompleteError = false + break + } + } + if allErrIsNotCompleteError { + task.TaskInfo.TaskStatus = StatusFinished.String() + result.FailedRows = task.GetRunner().NumFailed + err1 := GetTaskMgr().FinishTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + return + } + // TODO: return all errors + task.TaskInfo.TaskStatus = StatusAborted.String() + err, _ := rerrs[0].(importerErrors.ImporterError) result.ErrorResult.ErrorCode = err.ErrCode result.ErrorResult.ErrorMsg = err.ErrMsg.Error() - task.TaskMessage = err.ErrMsg.Error() + task.TaskInfo.TaskMessage = err.ErrMsg.Error() + err1 := GetTaskMgr().FinishTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } zap.L().Warn(fmt.Sprintf("Failed to finish a import task: `%s`, task result: `%v`", taskID, result)) } else { - task.TaskStatus = StatusFinished.String() - + task.TaskInfo.TaskStatus = StatusFinished.String() result.FailedRows = task.GetRunner().NumFailed - GetTaskMgr().DelTask(taskID) + err := GetTaskMgr().FinishTask(taskID) + if err != nil { + zap.L().Warn("finish task fail", zap.Error(err)) + } zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) } }() return nil } +func ImportStatus(taskID string) (*TaskInfo, error) { + if t, ok := GetTaskMgr().GetTask(taskID); ok { + if t.GetRunner() != nil { + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + return nil, err + } + } + return t.TaskInfo, nil + } + return nil, errors.New("task is not exist") +} + func ImportAction(taskID string, taskAction TaskAction) (result ActionResult, err error) { zap.L().Debug(fmt.Sprintf("Start a import task action: `%s` for task: `%s`", taskAction.String(), taskID)) @@ -93,6 +187,8 @@ func ImportAction(taskID string, taskAction TaskAction) (result ActionResult, er actionStop(taskID, &result) case ActionStopAll: actionStopAll(&result) + case ActionDel: + actionDel(taskID, &result) default: err = errors.New("unknown task action") } @@ -102,19 +198,27 @@ func ImportAction(taskID string, taskAction TaskAction) (result ActionResult, er return result, err } +func actionDel(taskID string, result *ActionResult) { + err := GetTaskMgr().DelTask(taskID) + if err != nil { + result.Msg = fmt.Sprintf("Task del fail, %s", err.Error()) + return + } + result.Msg = fmt.Sprintf("Task del successfully, taskID : %s", taskID) +} + func actionQuery(taskID string, result *ActionResult) { // a temp task obj for response task := Task{} + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + } if t, ok := GetTaskMgr().GetTask(taskID); ok { - task.TaskID = t.TaskID - task.TaskStatus = t.TaskStatus - task.TaskMessage = t.TaskMessage - result.Results = append(result.Results, task) + task = *t + result.Results = append(result.Results, *task.TaskInfo) result.Msg = "Task query successfully" } else { - task.TaskID = taskID - task.TaskStatus = StatusNotExisted.String() - result.Results = append(result.Results, task) result.Msg = "Task not existed" } } @@ -123,23 +227,26 @@ func actionQuery(taskID string, result *ActionResult) { `actionQueryAll` will return all tasks with status Aborted or Processing */ func actionQueryAll(result *ActionResult) { - taskIDs := GetTaskMgr().GetAllTaskIDs() + taskIDs, err := GetTaskMgr().GetAllTaskIDs() + if err != nil { + result.Msg = "Tasks query unsuccessfully" + return + } for _, taskID := range taskIDs { actionQuery(taskID, result) } - + sort.Slice(result.Results, func(i, j int) bool { + return result.Results[i].CreatedTime > result.Results[j].CreatedTime + }) result.Msg = "Tasks query successfully" } func actionStop(taskID string, result *ActionResult) { ok := GetTaskMgr().StopTask(taskID) - actionQuery(taskID, result) - if !ok { result.Msg = "Task has stopped or finished" } - result.Msg = "Task stop successfully" } @@ -147,12 +254,15 @@ func actionStop(taskID string, result *ActionResult) { `actionStopAll` will stop all tasks with status Processing */ func actionStopAll(result *ActionResult) { - taskIDs := GetTaskMgr().GetAllTaskIDs() + taskIDs, err := GetTaskMgr().GetAllTaskIDs() + if err != nil { + result.Msg = "Tasks query unsuccessfully" + return + } for _, taskID := range taskIDs { - if _task, _ := GetTaskMgr().GetTask(taskID); _task.TaskStatus == StatusProcessing.String() { + if _task, ok := GetTaskMgr().GetTask(taskID); ok && _task.TaskInfo.TaskStatus == StatusProcessing.String() { actionStop(taskID, result) } } - result.Msg = "Tasks stop successfully" } diff --git a/server/pkg/webserver/service/importer/task.go b/server/pkg/webserver/service/importer/task.go new file mode 100644 index 00000000..ac5233bb --- /dev/null +++ b/server/pkg/webserver/service/importer/task.go @@ -0,0 +1,22 @@ +package importer + +import ( + "github.com/vesoft-inc/nebula-importer/pkg/cmd" + + "go.uber.org/zap" +) + +type Task struct { + runner *cmd.Runner `json:"runner,omitempty"` + TaskInfo *TaskInfo `json:"task_info,omitempty"` +} + +func (t *Task) UpdateQueryStats() error { + stats, err := t.runner.QueryStats() + if err != nil { + zap.L().Warn("query import stats fail", zap.Error(err)) + return err + } + t.TaskInfo.Stats = *stats + return nil +} diff --git a/server/pkg/webserver/service/importer/taskInfo.go b/server/pkg/webserver/service/importer/taskInfo.go new file mode 100644 index 00000000..dd393131 --- /dev/null +++ b/server/pkg/webserver/service/importer/taskInfo.go @@ -0,0 +1,16 @@ +package importer + +import "github.com/vesoft-inc/nebula-importer/pkg/stats" + +type TaskInfo struct { + ID int `json:"taskID" gorm:"primaryKey;autoIncrement"` + Name string `json:"name"` + Space string `json:"space"` + NebulaAddress string `json:"nebulaAddress"` + CreatedTime int64 `json:"createdTime"` + UpdatedTime int64 `json:"updatedTime"` + User string `json:"user"` + TaskStatus string `json:"taskStatus"` + TaskMessage string `json:"taskMessage"` + Stats stats.Stats `json:"stats" gorm:"embedded"` +} diff --git a/server/pkg/webserver/service/importer/taskdb.go b/server/pkg/webserver/service/importer/taskdb.go new file mode 100644 index 00000000..c613f9bb --- /dev/null +++ b/server/pkg/webserver/service/importer/taskdb.go @@ -0,0 +1,73 @@ +package importer + +import ( + "fmt" + + "github.com/vesoft-inc/nebula-studio/server/pkg/config" + + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +type TaskDb struct { + *gorm.DB +} + +/* + `InitDB` initialize local sql by open sql and create task_infos table +*/ +func InitDB() { + dbFilePath := config.Cfg.Web.SqlitedbFilePath + //os.Remove(dbFilePath) + db, err := gorm.Open(sqlite.Open(dbFilePath), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info)}) + if err != nil { + zap.L().Fatal(fmt.Sprintf("init db fail: %s", err)) + } + // TODO: AutoMigrate need to optimize + err = db.AutoMigrate(&TaskInfo{}) + if err != nil { + zap.L().Fatal(fmt.Sprintf("init taskInfo table fail: %s", err)) + panic(err) + } + GetTaskMgr().db = &TaskDb{ + DB: db, + } +} + +func (t *TaskDb) InsertTaskInfo(info *TaskInfo) error { + return t.Create(info).Error +} + +func (t *TaskDb) UpdateTaskInfo(info *TaskInfo) error { + return t.Model(&TaskInfo{}).Where("id = ?", info.ID).Updates(info).Error +} + +func (t *TaskDb) DelTaskInfo(ID int) error { + return t.Delete(&TaskInfo{}, ID).Error +} + +func (t *TaskDb) LastId() (int, error) { + var id int + if err := t.Raw("SELECT MAX(id) FROM task_infos").Scan(&id).Error; err != nil { + if err.Error() == "sql: Scan error on column index 0, name \"MAX(id)\": converting NULL to int is unsupported" { + return 0, nil + } + return 0, err + } + return id, nil +} + +func (t *TaskDb) SelectAllIds() ([]int, error) { + var taskInfos []TaskInfo + ids := make([]int, 0) + if err := t.Select("id").Find(&taskInfos).Error; err != nil { + return nil, err + } + for _, taskInfo := range taskInfos { + ids = append(ids, taskInfo.ID) + } + return ids, nil +} diff --git a/server/pkg/webserver/service/importer/taskmgr.go b/server/pkg/webserver/service/importer/taskmgr.go index c5870bfd..c1348715 100644 --- a/server/pkg/webserver/service/importer/taskmgr.go +++ b/server/pkg/webserver/service/importer/taskmgr.go @@ -1,48 +1,48 @@ package importer import ( - "database/sql" + "errors" "fmt" + "go.uber.org/zap" "os" + "path/filepath" "strconv" "sync" + "time" "github.com/vesoft-inc/nebula-importer/pkg/cmd" "github.com/vesoft-inc/nebula-studio/server/pkg/config" _ "github.com/mattn/go-sqlite3" - "go.uber.org/zap" ) var ( taskmgr *TaskMgr = &TaskMgr{ tasks: sync.Map{}, - db: &sql.DB{}, + db: &TaskDb{}, } - tid uint64 = 0 - mux sync.Mutex ) type TaskMgr struct { tasks sync.Map - db *sql.DB + db *TaskDb } -type Task struct { - runner *cmd.Runner - - TaskID string `json:"taskID"` - TaskStatus string `json:"taskStatus"` - TaskMessage string `json:"taskMessage"` -} - -func NewTask(taskID string) Task { - return Task{ - runner: &cmd.Runner{}, - TaskID: taskID, - TaskStatus: StatusProcessing.String(), +func newTask(nebulaAddress string, user string, name string, space string) *Task { + timeUnix := time.Now().Unix() + return &Task{ + runner: &cmd.Runner{}, + TaskInfo: &TaskInfo{ + Name: name, + Space: space, + CreatedTime: timeUnix, + UpdatedTime: timeUnix, + TaskStatus: StatusProcessing.String(), + NebulaAddress: nebulaAddress, + User: user, + }, } } @@ -50,19 +50,30 @@ func (task *Task) GetRunner() *cmd.Runner { return task.runner } -func GetTaskID() (_tid uint64) { - mux.Lock() - defer mux.Unlock() - _tid = tid - return _tid +func (mgr *TaskMgr) NewTaskID() (string, error) { + tid, err := mgr.db.LastId() + if err != nil { + return "", err + } + taskID := fmt.Sprintf("%v", tid+1) + return taskID, nil } -func NewTaskID() (taskID string) { +func (mgr *TaskMgr) NewTask(nebulaAddress string, user string, name string, space string) (*Task, string, error) { mux.Lock() defer mux.Unlock() - tid++ - taskID = fmt.Sprintf("%d", tid) - return taskID + task := newTask(nebulaAddress, user, name, space) + if err := mgr.db.InsertTaskInfo(task.TaskInfo); err != nil { + return nil, "", err + } + tid, err := mgr.db.LastId() + if err != nil { + return nil, "", err + } + task.TaskInfo.ID = tid + taskID := fmt.Sprintf("%v", tid) + mgr.PutTask(taskID, task) + return task, taskID, nil } func GetTaskMgr() *TaskMgr { @@ -70,132 +81,129 @@ func GetTaskMgr() *TaskMgr { } /* - `GetTask` get task from map and local sql + GetTask get task from map and local sql */ func (mgr *TaskMgr) GetTask(taskID string) (*Task, bool) { - _tid, _ := strconv.ParseUint(taskID, 0, 64) - - if _tid > GetTaskID() || tid <= 0 { - return nil, false - } - if task, ok := mgr.getTaskFromMap(taskID); ok { return task, true } - - return mgr.getTaskFromSQL(taskID), true + task := mgr.getTaskFromSQL(taskID) + // did not find task + if task.TaskInfo.ID == 0 { + return nil, false + } + return task, true } /* - `PutTask` put task into tasks map + PutTask put task into tasks map */ func (mgr *TaskMgr) PutTask(taskID string, task *Task) { mgr.tasks.Store(taskID, task) } /* - `DelTask` will delete task in the map, - and put the task into local sql + FinishTask will query task stats, delete task in the map + and update the taskInfo in local sql */ -func (mgr *TaskMgr) DelTask(taskID string) { +func (mgr *TaskMgr) FinishTask(taskID string) (err error) { task, ok := mgr.getTaskFromMap(taskID) - if !ok { return } - + if err := task.UpdateQueryStats(); err != nil { + return err + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + err = mgr.db.UpdateTaskInfo(task.TaskInfo) + if err != nil { + return err + } mgr.tasks.Delete(taskID) - mgr.putTaskIntoSQL(taskID, task) + return +} + +func (mgr *TaskMgr) DelTask(taskID string) error { + _, ok := mgr.getTaskFromMap(taskID) + if ok { + mgr.tasks.Delete(taskID) + } + id, err := strconv.Atoi(taskID) + if err != nil { + return errors.New("taskID is wrong") + } + if err = mgr.db.DelTaskInfo(id); err != nil { + return err + } + taskDir := filepath.Join(config.Cfg.Web.TasksDir, taskID) + return os.RemoveAll(taskDir) } /* - `StopTask` will change the task status to `StatusStoped`, - and then call `DelTask` + UpdateTaskInfo will query task stats, update task in the map + and update the taskInfo in local sql +*/ +func (mgr *TaskMgr) UpdateTaskInfo(taskID string) error { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return nil + } + if err := task.UpdateQueryStats(); err != nil { + return err + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + return mgr.db.UpdateTaskInfo(task.TaskInfo) +} + +/* + StopTask will change the task status to `StatusStoped`, + and then call FinishTask */ func (mgr *TaskMgr) StopTask(taskID string) bool { if task, ok := mgr.getTaskFromMap(taskID); ok { for _, r := range task.GetRunner().Readers { r.Stop() } - - task.TaskStatus = StatusStoped.String() - - mgr.DelTask(taskID) - + task.TaskInfo.TaskStatus = StatusStoped.String() + if err := mgr.FinishTask(taskID); err != nil { + zap.L().Warn("finish task fail", zap.Error(err)) + return false + } return true } - return false } /* `GetAllTaskIDs` will return all task ids in map */ -func (mgr *TaskMgr) GetAllTaskIDs() []string { +func (mgr *TaskMgr) GetAllTaskIDs() ([]string, error) { ids := make([]string, 0) - mgr.tasks.Range(func(k, v interface{}) bool { - ids = append(ids, k.(string)) - return true - }) - - return ids -} - -/* - `InitDB` initialize local sql by open sql and create tasks table -*/ -func InitDB() { - dbFilePath := config.Cfg.Web.SqlitedbFilePath - - os.Remove(dbFilePath) - - _db, err := sql.Open("sqlite3", dbFilePath) - + allIds, err := mgr.db.SelectAllIds() if err != nil { - zap.L().Fatal(err.Error()) + return nil, err } - - GetTaskMgr().db = _db - - sqlStmt := ` - create table tasks (taskID integer not null primary key, taskStatus text, taskMessage text); - delete from tasks; - ` - _, err = GetTaskMgr().db.Exec(sqlStmt) - - if err != nil { - zap.L().Fatal(fmt.Sprintf("%q: %s\n", err, sqlStmt)) + for _, id := range allIds { + ids = append(ids, strconv.Itoa(id)) } - + return ids, nil } func (mgr *TaskMgr) getTaskFromMap(taskID string) (*Task, bool) { if task, ok := mgr.tasks.Load(taskID); ok { return task.(*Task), true } - return nil, false } func (mgr *TaskMgr) getTaskFromSQL(taskID string) *Task { - var taskStatus string - - rows, _ := mgr.db.Query(fmt.Sprintf("SELECT taskStatus FROM tasks WHERE taskID=%s", taskID)) - - for rows.Next() { - _ = rows.Scan(&taskStatus) - } - - return &Task{ - TaskID: taskID, - TaskStatus: taskStatus, - } -} - -func (mgr *TaskMgr) putTaskIntoSQL(taskID string, task *Task) { - stmt, _ := mgr.db.Prepare("INSERT INTO tasks(taskID, taskStatus) values(?,?)") - - _, _ = stmt.Exec(taskID, task.TaskStatus) + taskInfo := new(TaskInfo) + mgr.db.First(taskInfo, taskID) + task := new(Task) + task.TaskInfo = taskInfo + return task } type TaskAction int @@ -206,6 +214,7 @@ const ( ActionQueryAll ActionStop ActionStopAll + ActionDel ) var taskActionMap = map[TaskAction]string{ @@ -213,6 +222,7 @@ var taskActionMap = map[TaskAction]string{ ActionQueryAll: "actionQueryAll", ActionStop: "actionStop", ActionStopAll: "actionStopAll", + ActionDel: "actionDel", } var taskActionRevMap = map[string]TaskAction{ @@ -220,6 +230,7 @@ var taskActionRevMap = map[string]TaskAction{ "actionQueryAll": ActionQueryAll, "actionStop": ActionStop, "actionStopAll": ActionStopAll, + "actionDel": ActionDel, } func NewTaskAction(action string) TaskAction {