diff --git a/server/README.md b/server/README.md index 2028e3e2..a09ec6b5 100644 --- a/server/README.md +++ b/server/README.md @@ -29,15 +29,19 @@ $ ./nebula-studio-server -studio-config="./config/example-config.yaml" | ConnectDB | /api-nebula/db/connect | POST | | DisconnectDB | /api-nebula/db/disconnect | POST | | ImportData | /api-nebula/task/import | POST | -| HandleImportAction | /api-nebula/import/action | POST | +| HandleImportAction | /api-nebula/task/import/action | POST | +| QueryImportStats | /api-nebula/task/import/stats | POST | +| DownloadConfig | /api-nebula/task/import/config | POST | +| DownloadLog | /api-nebula/task/import/log | GET | | FilesIndex | /api/files | GET | | FilesDestroy | /api/files/{id:string} | DELETE | | FilesUpload | /api/files | PUT | | ReadLog | /api/import/log | GET | -| CreateConfigFile | /api/import/config | POST | +| ReadErrLog | /api/import/err_log | GET | | Callback | /api/import/finish | POST | | GetWorkingDir | /api/import/working_dir | GET | -| Index | / | GET,HEAD | +| GetTaskDir | /api/import/task_dir | GET | +| GetTaskLogPaths | /api/import/task_err_log_paths/{id:string} | GET | #### ExecNGQL API @@ -100,7 +104,7 @@ Response: The request json body: ```json -{"configBody":{"version":"v2","description":"web console import","clientSettings":{"retry":3,"concurrency":10,"channelBufferSize":128,"space":"basketballplayer","connection":{"user":"root","password":"123","address":"192.168.8.145:9669"}},"logPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/import.log","files":[{"path":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv","failDataPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/err/data 1Fail.csv","batchSize":10,"type":"csv","csv":{"withHeader":false,"withLabel":false},"schema":{"type":"vertex","vertex":{"vid":{"index":0,"type":"string"},"tags":[{"name":"player","props":[{"name":"name","type":"string","index":1},{"name":"age","type":"int","index":2}]}]}}}]},"configPath":""} +{"configBody":{"version":"v2","description":"web console import","clientSettings":{"retry":3,"concurrency":10,"channelBufferSize":128,"space":"basketballplayer","connection":{"user":"root","password":"123","address":"192.168.8.145:9669"}},"logPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/import.log","files":[{"path":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv","failDataPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/err/data 1Fail.csv","batchSize":10,"type":"csv","csv":{"withHeader":false,"withLabel":false},"schema":{"type":"vertex","vertex":{"vid":{"index":0,"type":"string"},"tags":[{"name":"player","props":[{"name":"name","type":"string","index":1},{"name":"age","type":"int","index":2}]}]}}}]},"configPath":"","name":"task1"} ``` Response: @@ -119,35 +123,153 @@ Response: The request json body: +```json +{"taskID":"1","taskAction":"actionQuery"} +``` + +Response: + ```json { - "code": 0, - "message": "Import task 4 submit successfully", - "data": [ - "4" - ] + "code": 0, + "message": "Processing a task action successfully", + "data": { + "results": [ + { + "taskID": 1, + "name": "task1", + "space": "test", + "nebulaAddress": "192.168.8.243:9669", + "createdTime": 1644386055, + "updatedTime": 1644386056, + "user": "root", + "taskStatus": "statusAborted", + "taskMessage": "failed to open connection, error: incompatible version between client and server: Graph client version(3.0.0) is not accepted, current graph client white list: 2.6.1:2.5.0:2.5.1:2.6.0. ", + "stats": { + "numFailed": 0, + "numReadFailed": 0, + "totalCount": 0, + "totalBatches": 0, + "totalLatency": 0, + "totalReqTime": 0, + "totalBytes": 0, + "totalImportedBytes": 0 + } + } + ], + "msg": "Task query successfully" + } } ``` +#### QueryImportStats API + +The request json body: + +```json +{"taskId":"1"} +``` + Response: ```json { - "code": 0, - "message": "Processing a task action successfully", - "data": { - "results": [ - { - "taskID": "4", + "code": 0, + "message": "Processing a task action successfully", + "data": { + "taskID": 3, + "name": "task1", + "space": "test", + "nebulaAddress": "192.168.8.233:9669", + "createdTime": 1646989643, + "updatedTime": 1646989646, + "user": "root", "taskStatus": "statusFinished", - "taskMessage": "" - } - ], - "msg": "Task query successfully" - } + "taskMessage": "", + "stats": { + "numFailed": 0, + "numReadFailed": 0, + "totalCount": 52, + "totalBatches": 10, + "totalLatency": 30089, + "totalReqTime": 532718, + "totalBytes": 1583, + "totalImportedBytes": 1583 + } + } +} +``` + +#### DownloadConfig API + +The request json body: + +```json +{ + "taskID": "1" } ``` +Response: + +``` +version: v2 +description: web console import +removeTempFiles: null +clientSettings: + retry: 3 + concurrency: 10 + channelBufferSize: 128 + space: test + connection: + user: user + password: password + address: host + postStart: null + preStop: null +logPath: E:\NebulaProject\新建文件夹 (4)\nebula-studio\server\data\tasks\1/import.log +files: +- path: E:\NebulaProject\新建文件夹 (4)\nebula-studio\server\data\upload\player.csv + failDataPath: E:\NebulaProject\新建文件夹 (4)\nebula-studio\server\data\tasks\1/err/数据源 + 1Fail.csv + batchSize: 60 + limit: null + inOrder: null + type: csv + csv: + withHeader: false + withLabel: false + delimiter: null + schema: + type: vertex + edge: null + vertex: + vid: + index: 0 + function: null + type: string + tags: + - name: player + props: + - name: name + type: string + index: 1 + - name: age + type: int + index: 2 + +``` + +#### DownloadLog API + +The request : + +http://localhost:9000/api-nebula/task/import/log/?pathName=E:\NebulaProject\nebula-studio\server\data\tasks\1\err\err 1.log + +Response: + +a file + #### FilesIndex API Response: @@ -212,21 +334,24 @@ Response: ``` #### ReadLog API Use params: -http://localhost:7001/api/import/log?dir=E:\NebulaProject\test\nebula-studio\server\data\tasks&startByte=0&endByte=1000000&taskId=4 +http://localhost:9000/api/import/log?limit=2&path=data/tasks/1/import.log&taskId=1&offset=1 Response: ```json { - "code": 0, - "message": "", - "data": "2022/01/11 16:33:48 [INFO] clientmgr.go:28: Create 10 Nebula Graph clients\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:64: Start to read file(0): E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv, schema: \u003c :VID(string),player.name:string,player.age:int \u003e\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:180: Total lines of file(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv) is: 52, error lines: 0\u003cbr /\u003e2022/01/11 16:33:48 [INFO] statsmgr.go:62: Done(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv): Time(1.45s), Finished(52), Failed(0), Read Failed(0), Latency AVG(6182us), Batches Req AVG(48887us), Rows AVG(35.87/s)\u003cbr /\u003e" -}xxxxxxxxxx {  "code": 0,  "message": "",  "data": "2022/01/11 16:33:48 [INFO] clientmgr.go:28: Create 10 Nebula Graph clients\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:64: Start to read file(0): E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv, schema: \u003c :VID(string),player.name:string,player.age:int \u003e\u003cbr /\u003e2022/01/11 16:33:48 [INFO] reader.go:180: Total lines of file(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv) is: 52, error lines: 0\u003cbr /\u003e2022/01/11 16:33:48 [INFO] statsmgr.go:62: Done(E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload\\player.csv): Time(1.45s), Finished(52), Failed(0), Read Failed(0), Latency AVG(6182us), Batches Req AVG(48887us), Rows AVG(35.87/s)\u003cbr /\u003e"}{    "code": 0,    "message": "",    "data": "hell"} -``` -#### CreateConfigFile API -Create a config file in config_dir -```json -{"config":{"version":"v2","description":"web console import","clientSettings":{"retry":3,"concurrency":10,"channelBufferSize":128,"space":"basketballplayer","connection":{"user":"root","password":"123","address":"192.168.8.145:9669"}},"logPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks/import.log","files":[]},"mountPath":"E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks"} + "code": 0, + "message": "", + "data": [ + "bbbbbbbbbbb", + "ccccccccccc" + ] +} ``` +#### ReadErrLog API + +Use params: + +http://localhost:9000/api/import/err_log?offset=1&ReadJSON=1&path=E:\\NebulaProject\nebula-studio\\server\\data\\tasks\\1\\err\\err 1.log Response: @@ -234,9 +359,12 @@ Response: { "code": 0, "message": "", - "data": "" + "data": [ + "bbbbbbbbbb" + ] } ``` + #### Callback API The request json body: @@ -266,9 +394,51 @@ Response: "code": 0, "message": "", "data": { - "taskDir": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\tasks", "uploadDir": "E:\\NebulaProject\\test\\nebula-studio\\server\\data\\upload" } } ``` +#### GetTaskDir API + +Response: + +```json +{ + "code": 0, + "message": "", + "data": { + "taskDir": "E:\\NebulaProject\\nebula-studio\\server\\data\\tasks\\1" + } +} +``` + +#### GetTaskLogPaths API + +Request: + +http://localhost:9000/api/import/task_err_log_paths/1 + +Response: + +```json +{ + "code": 0, + "message": "", + "data": [ + { + "name": "import.log", + "path": "E:\\NebulaProject\\nebula-studio\\server\\data\\tasks\\1\\import.log" + }, + { + "name": "err 1.log", + "path": "E:\\NebulaProject\\nebula-studio\\server\\data\\tasks\\1\\err\\err 1.log" + }, + { + "name": "err 2.log", + "path": "E:\\NebulaProject\\nebula-studio\\server\\data\\tasks\\1\\err\\err 2.log" + } + ] +} +``` + diff --git a/server/config/example-config.yaml b/server/config/example-config.yaml index 0050b47a..39c9a05f 100644 --- a/server/config/example-config.yaml +++ b/server/config/example-config.yaml @@ -4,4 +4,4 @@ web: # tasks_dir: # sqlitedb_file_path: # ip: - port: 7001 \ No newline at end of file +# port: 7001 \ No newline at end of file diff --git a/server/go.mod b/server/go.mod index 300780c1..e8fe1f8e 100644 --- a/server/go.mod +++ b/server/go.mod @@ -3,12 +3,16 @@ module github.com/vesoft-inc/nebula-studio/server go 1.17 require ( + github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 github.com/kataras/iris/v12 v12.2.0-alpha4 github.com/mattn/go-sqlite3 v2.0.3+incompatible + github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220124093655-f7f5b4c63e6a - github.com/vesoft-inc/nebula-importer v1.0.1-0.20211213064541-05a8646be295 + github.com/vesoft-inc/nebula-importer v1.0.1-0.20220314055243-ece1a79d37e5 go.uber.org/zap v1.17.0 gopkg.in/yaml.v2 v2.4.0 + gorm.io/driver/sqlite v1.2.6 + gorm.io/gorm v1.22.5 ) require ( @@ -29,6 +33,8 @@ require ( github.com/gorilla/css v1.0.0 // indirect github.com/iris-contrib/jade v1.1.4 // indirect github.com/iris-contrib/schema v0.0.6 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.4 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kataras/blocks v0.0.4 // indirect github.com/kataras/golog v0.1.7 // indirect @@ -47,7 +53,7 @@ require ( github.com/tdewolff/minify/v2 v2.9.22 // indirect github.com/tdewolff/parse/v2 v2.5.21 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/vesoft-inc/nebula-go/v2 v2.5.2-0.20211210024917-9461e07cdca2 // indirect + github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220214062853-d0c59964d0af // indirect github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/yosssi/ace v0.0.5 // indirect diff --git a/server/go.sum b/server/go.sum index 32867656..09186994 100644 --- a/server/go.sum +++ b/server/go.sum @@ -16,6 +16,8 @@ github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0 github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 h1:OYA+5W64v3OgClL+IrOD63t4i/RW7RqrAVl9LTZ9UqQ= +github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394/go.mod h1:Q8n74mJTIgjX4RBBcHnJ05h//6/k6foqmgE45jTQtxg= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible h1:Ppm0npCCsmuR9oQaBtRuZcmILVE74aXE+AmrJj8L2ns= @@ -40,7 +42,6 @@ github.com/djherbis/atime v1.1.0/go.mod h1:28OF6Y8s3NQWwacXc5eZTsEsiMzp7LF8MbXE+ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= -github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU= github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 h1:ZA+qQ3d2In0RNzVpk+D/nq1sjDSv+s1Wy2zrAPQAmsg= github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= @@ -97,6 +98,11 @@ github.com/iris-contrib/jade v1.1.4 h1:WoYdfyJFfZIUgqNAeOyRfTNQZOksSlZ6+FnXR3AEp github.com/iris-contrib/jade v1.1.4/go.mod h1:EDqR+ur9piDl6DUgs6qRrlfzmlx/D5UybogqrXvJTBE= github.com/iris-contrib/schema v0.0.6 h1:CPSBLyx2e91H2yJzPuhGuifVRnZBBJ3pCOMbOvPZaTw= github.com/iris-contrib/schema v0.0.6/go.mod h1:iYszG0IOsuIsfzjymw1kMzTL8YQcCWlm65f3wX8J5iA= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.4 h1:tHnRBy1i5F2Dh8BAFxqFzxKqqvezXrL2OW1TnX+Mlas= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= @@ -130,6 +136,7 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs= +github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mediocregopher/radix/v3 v3.6.0/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= @@ -165,6 +172,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca h1:NugYot0LIVPxTvN8n+Kvkn6TrbMyxQiuvKdEwFdR9vI= +github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/closestmatch v2.1.0+incompatible h1:Uel2GXEpJqOWBrlyI+oY9LTiyyjYS17cCYRqP13/SHk= @@ -202,12 +211,12 @@ github.com/tdewolff/test v1.0.6/go.mod h1:6DAvZliBAAnD7rhVgwaM7DE5/d9NMOAJ09SqYq github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/vesoft-inc/nebula-go/v2 v2.5.2-0.20211210024917-9461e07cdca2 h1:ivN82oSpM/kNCZVdzoCSj1IT4ZDW7Xi2juhAZvh7eF8= -github.com/vesoft-inc/nebula-go/v2 v2.5.2-0.20211210024917-9461e07cdca2/go.mod h1:fehDUs97/mpmxXi9WezhznX0Dg7hmQRUoOWgDZv9zG0= +github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220214062853-d0c59964d0af h1:5z3eC9o+LzDBeA2kRIMgj+k7C8bfSuWh9ffZXljkPzs= +github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220214062853-d0c59964d0af/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220124093655-f7f5b4c63e6a h1:tirwUK05V3LPsIFaLa0cHmVqFD3F1Tb8PlDTK7YWQGc= github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220124093655-f7f5b4c63e6a/go.mod h1:sFEvE+cY4TgwqWx6H6msOqAUzRhsEHHKaaMgIZENHuQ= -github.com/vesoft-inc/nebula-importer v1.0.1-0.20211213064541-05a8646be295 h1:YdBNGn+5YyIuQ6cWjIPCvSJW/3qj5ck5UIWGeh4hXNI= -github.com/vesoft-inc/nebula-importer v1.0.1-0.20211213064541-05a8646be295/go.mod h1:0nHCbr2/nckhfxAA8sDbwxkNzA9YyXazWKyVMT/PfPo= +github.com/vesoft-inc/nebula-importer v1.0.1-0.20220314055243-ece1a79d37e5 h1:YghW/f27/NEpP8h3yAn2p7WCZYUIx3rc+pdS3v9QmRc= +github.com/vesoft-inc/nebula-importer v1.0.1-0.20220314055243-ece1a79d37e5/go.mod h1:Hgn05JWXQcxyoJAjJVN34PnUndY+JcNKzpb0DoyFrsM= github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= @@ -307,7 +316,6 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= @@ -325,5 +333,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.2.6 h1:SStaH/b+280M7C8vXeZLz/zo9cLQmIGwwj3cSj7p6l4= +gorm.io/driver/sqlite v1.2.6/go.mod h1:gyoX0vHiiwi0g49tv+x2E7l8ksauLK0U/gShcdUsjWY= +gorm.io/gorm v1.22.3/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0= +gorm.io/gorm v1.22.5 h1:lYREBgc02Be/5lSCTuysZZDb6ffL2qrat6fg9CFbvXU= +gorm.io/gorm v1.22.5/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8= moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE= diff --git a/server/pkg/api/routes/gateway.go b/server/pkg/api/routes/gateway.go index 63fd8cb3..824fffd2 100644 --- a/server/pkg/api/routes/gateway.go +++ b/server/pkg/api/routes/gateway.go @@ -29,5 +29,17 @@ var GatewayRoute = base.Route{ Path: "task/import/action", POST: controller.HandleImportAction, }, + { + Path: "task/import/stats", + POST: controller.QueryImportStats, + }, + { + Path: "task/import/config/{id:string}", + GET: controller.DownloadConfigFile, + }, + { + Path: "task/import/log", + GET: controller.DownloadLog, + }, }, } diff --git a/server/pkg/api/routes/import.go b/server/pkg/api/routes/import.go index 51775130..35780a8a 100644 --- a/server/pkg/api/routes/import.go +++ b/server/pkg/api/routes/import.go @@ -11,11 +11,11 @@ var ImportRoute = base.Route{ SubRoutes: []base.Route{ { Path: "log", - GET: controller.ReadLog, + GET: controller.ReadImportLog, }, { - Path: "config", - POST: controller.CreateConfigFile, + Path: "err_log", + GET: controller.ReadErrLog, }, { Path: "finish", @@ -25,5 +25,13 @@ var ImportRoute = base.Route{ Path: "working_dir", GET: controller.GetWorkingDir, }, + { + Path: "task_dir", + GET: controller.GetTaskDir, + }, + { + Path: "task_log_paths/{id:string}", + GET: controller.GetTaskLogPaths, + }, }, } diff --git a/server/pkg/utils/file.go b/server/pkg/utils/file.go new file mode 100644 index 00000000..6baac9c0 --- /dev/null +++ b/server/pkg/utils/file.go @@ -0,0 +1,14 @@ +package utils + +import "os" + +func CreateDir(dir string) error { + _, err := os.Stat(dir) + if os.IsNotExist(err) { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return err + } + } + return nil +} diff --git a/server/pkg/webserver/base/scheme.go b/server/pkg/webserver/base/scheme.go index c10344e6..728e02dd 100644 --- a/server/pkg/webserver/base/scheme.go +++ b/server/pkg/webserver/base/scheme.go @@ -6,6 +6,7 @@ import ( "github.com/kataras/iris/v12" "github.com/kataras/iris/v12/context" "github.com/kataras/iris/v12/core/router" + "go.uber.org/zap" ) diff --git a/server/pkg/webserver/base/types.go b/server/pkg/webserver/base/types.go index c6daa797..27b906ad 100644 --- a/server/pkg/webserver/base/types.go +++ b/server/pkg/webserver/base/types.go @@ -3,6 +3,7 @@ package base type StatusCode int const ( - Error StatusCode = -1 - Success StatusCode = 0 + Error StatusCode = -1 + Success StatusCode = 0 + AuthorizationError StatusCode = 401 ) diff --git a/server/pkg/webserver/controller/files.go b/server/pkg/webserver/controller/files.go index 472e9e7e..746c6cc4 100644 --- a/server/pkg/webserver/controller/files.go +++ b/server/pkg/webserver/controller/files.go @@ -1,9 +1,12 @@ package controller import ( + "bytes" "encoding/csv" "fmt" + "go.uber.org/zap" "io/ioutil" + "mime/multipart" "net/http" "os" "path/filepath" @@ -11,8 +14,9 @@ import ( "github.com/vesoft-inc/nebula-studio/server/pkg/config" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/base" + "github.com/axgle/mahonia" "github.com/kataras/iris/v12" - "go.uber.org/zap" + "github.com/saintfish/chardet" ) type fileStat struct { @@ -51,7 +55,6 @@ func FilesDestroy(ctx iris.Context) base.Result { } func FilesIndex(ctx iris.Context) base.Result { - dir := config.Cfg.Web.UploadDir filesInfo, err := ioutil.ReadDir(dir) if err != nil { @@ -64,7 +67,6 @@ func FilesIndex(ctx iris.Context) base.Result { data := make([]*fileStat, 0) for _, fileInfo := range filesInfo { - if fileInfo.IsDir() { continue } @@ -78,14 +80,12 @@ func FilesIndex(ctx iris.Context) base.Result { count := 0 content := make([][]string, 0) for count < 3 { - line, err := reader.Read() count++ if err != nil { break } content = append(content, line) - } data = append(data, &fileStat{ Content: content, @@ -95,19 +95,34 @@ func FilesIndex(ctx iris.Context) base.Result { Name: fileInfo.Name(), Size: fileInfo.Size(), }) - } return base.Response{ Code: base.Success, Data: data, } - } func FilesUpload(ctx iris.Context) base.Result { - dir := config.Cfg.Web.UploadDir files, _, err := ctx.UploadFormFiles(dir) + for _, file := range files { + charSet, err := checkCharset(file) + if err != nil { + continue + } + if charSet == "UTF-8" { + continue + } + path := filepath.Join(dir, file.Filename) + err = changeFileCharset2UTF8(path, charSet) + if err != nil { + zap.L().Warn("upload file error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + } if err != nil { zap.L().Warn("upload file error", zap.Error(err)) return base.Response{ @@ -115,10 +130,53 @@ func FilesUpload(ctx iris.Context) base.Result { Message: err.Error(), } } - fmt.Println(len(files)) + zap.L().Info(fmt.Sprintf("upload %d files", len(files))) ctx.StatusCode(http.StatusOK) return base.Response{ Code: base.Success, } +} +func checkCharset(file *multipart.FileHeader) (string, error) { + open, err := file.Open() + if err != nil { + return "", err + } + bytes := make([]byte, 1024) + _, err = open.Read(bytes) + if err != nil { + return "", err + } + detector := chardet.NewTextDetector() + best, err := detector.DetectBest(bytes) + if err != nil { + return "", err + } + return best.Charset, nil +} + +func changeFileCharset2UTF8(path string, charSet string) error { + file, err := ioutil.ReadFile(path) + if err != nil { + return err + } + reader := bytes.NewReader(file) + if err != nil { + return err + } + decoder := mahonia.NewDecoder(charSet) + otherBytes, err := ioutil.ReadAll(reader) + s := string(otherBytes) + utf8String := decoder.ConvertString(s) + if err != nil { + return err + } + if err != nil { + return err + } + err = ioutil.WriteFile(path, []byte(utf8String), 0666) + if err != nil { + return err + } + return nil } diff --git a/server/pkg/webserver/controller/gateway.go b/server/pkg/webserver/controller/gateway.go index edcc67ea..c1ba21d7 100644 --- a/server/pkg/webserver/controller/gateway.go +++ b/server/pkg/webserver/controller/gateway.go @@ -1,12 +1,18 @@ package controller import ( + "encoding/base64" + "errors" "fmt" + "path/filepath" + "strings" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/gateway/dao" "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-studio/server/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/pkg/utils" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/base" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/service/importer" @@ -14,16 +20,18 @@ import ( "go.uber.org/zap" ) +const ( + ErrDir = "err" +) + type execNGQLParams struct { Gql string `json:"gql"` ParamList types.ParameterList `json:"paramList"` } type connectDBParams struct { - Address string `json:"address"` - Port int `json:"port"` - Username string `json:"username"` - Password string `json:"password"` + Address string `json:"address"` + Port int `json:"port"` } type disConnectDBParams struct { @@ -33,6 +41,11 @@ type disConnectDBParams struct { type importDataParams struct { ConfigPath string `json:"configPath"` ConfigBody *importconfig.YAMLConfig `json:"configBody"` + Name string `json:"name"` +} + +type queryImportStatsParams struct { + TaskId string `json:"taskId"` } type handleImportActionParams struct { @@ -66,8 +79,27 @@ func ExecNGQL(ctx iris.Context) base.Result { } func ConnectDB(ctx iris.Context) base.Result { + token := ctx.GetHeader("Authorization") + tokenSlice := strings.Split(token, " ") + if len(tokenSlice) != 2 { + return base.Response{ + Code: base.AuthorizationError, + Message: "Not get token", + } + } + + decode, err := base64.StdEncoding.DecodeString(tokenSlice[1]) + if err != nil { + return base.Response{ + Code: base.AuthorizationError, + Message: err.Error(), + } + } + account := strings.Split(string(decode), ":") + username, password := account[0], account[1] + params := new(connectDBParams) - err := ctx.ReadJSON(params) + err = ctx.ReadJSON(params) if err != nil { zap.L().Warn("connectDBParams get fail", zap.Error(err)) return base.Response{ @@ -75,7 +107,7 @@ func ConnectDB(ctx iris.Context) base.Result { Message: err.Error(), } } - clientInfo, err := dao.Connect(params.Address, params.Port, params.Username, params.Password) + clientInfo, err := dao.Connect(params.Address, params.Port, username, password) if err != nil { return nil } @@ -115,32 +147,95 @@ func DisconnectDB(ctx iris.Context) base.Result { } func ImportData(ctx iris.Context) base.Result { - taskId := importer.NewTaskID() - task := importer.NewTask(taskId) - importer.GetTaskMgr().PutTask(taskId, &task) params := new(importDataParams) err := ctx.ReadJSON(params) if err != nil { zap.L().Warn("importDataParams get fail", zap.Error(err)) err = importerErrors.Wrap(importerErrors.InvalidConfigPathOrFormat, err) - } else { - err = importer.Import(taskId, params.ConfigPath, params.ConfigBody) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } } - + err = validImportDataParams(params) + if err != nil { + zap.L().Warn("importDataParams get fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + // create config file + taskDir := importer.GetNewTaskDir() + err = importer.CreateConfigFile(taskDir, *params.ConfigBody) + if err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + // create err dir + taskErrDir := filepath.Join(taskDir, ErrDir) + err = utils.CreateDir(taskErrDir) + if err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + // import + nebulaAddress := *params.ConfigBody.NebulaClientSettings.Connection.Address + user := *params.ConfigBody.NebulaClientSettings.Connection.User + name := params.Name + space := *params.ConfigBody.NebulaClientSettings.Space + task, taskID, err := importer.GetTaskMgr().NewTask(nebulaAddress, user, name, space) + if err != nil { + zap.L().Warn("init task fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + err = importer.Import(taskID, params.ConfigPath, params.ConfigBody) if err != nil { // task err: import task not start err handle - task.TaskStatus = importer.StatusAborted.String() - zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskId, err)) + task.TaskInfo.TaskStatus = importer.StatusAborted.String() + importer.GetTaskMgr().FinishTask(taskID) + zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskID, err)) return base.Response{ Code: base.Error, Message: err.Error(), } } + return base.Response{ + Code: base.Success, + Data: []string{taskID}, + Message: fmt.Sprintf("Import task %s submit successfully", taskID), + } +} +func QueryImportStats(ctx iris.Context) base.Result { + params := new(queryImportStatsParams) + err := ctx.ReadJSON(params) + if err != nil { + zap.L().Warn("queryImportStatsParams get fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + taskInfo, err := importer.ImportStatus(params.TaskId) + if err != nil { + zap.L().Warn("queryImportStats fail", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } return base.Response{ Code: base.Success, - Data: []string{taskId}, - Message: fmt.Sprintf("Import task %s submit successfully", taskId), + Message: "Processing a task action successfully", + Data: taskInfo, } } @@ -168,3 +263,45 @@ func HandleImportAction(ctx iris.Context) base.Result { Data: data, } } + +func DownloadConfigFile(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + if id == "" { + return base.Response{ + Code: base.Error, + Message: "id parse failed", + } + } + configPath := filepath.Join(config.Cfg.Web.TasksDir, id, "config.yaml") + ctx.SendFile(configPath, "config.yaml") + return base.Response{ + Code: base.Success, + } +} + +func DownloadLog(ctx iris.Context) base.Result { + path := ctx.URLParam("pathName") + if path == "" { + return base.Response{ + Code: base.Error, + Message: "path parse failed", + } + } + index := strings.LastIndex(path, "/") + if index == -1 { + index = strings.LastIndex(path, "\\") + } + filename := path[index:] + ctx.SendFile(path, filename) + return base.Response{ + Code: base.Success, + } +} + +func validImportDataParams(params *importDataParams) error { + if params.ConfigBody.NebulaClientSettings.Connection.Address == nil || params.ConfigBody.NebulaClientSettings. + Connection.User == nil || params.ConfigBody.NebulaClientSettings.Space == nil { + return errors.New("importDataParams is wrong") + } + return nil +} diff --git a/server/pkg/webserver/controller/import.go b/server/pkg/webserver/controller/import.go index ff6cf81f..15c6181f 100644 --- a/server/pkg/webserver/controller/import.go +++ b/server/pkg/webserver/controller/import.go @@ -1,21 +1,20 @@ package controller import ( + "bufio" "encoding/json" "io/ioutil" "os" "path/filepath" "strconv" - "strings" "sync" - importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" "github.com/vesoft-inc/nebula-studio/server/pkg/config" "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/base" + "github.com/vesoft-inc/nebula-studio/server/pkg/webserver/service/importer" "github.com/kataras/iris/v12" "go.uber.org/zap" - "gopkg.in/yaml.v2" ) type dirResponse struct { @@ -23,16 +22,39 @@ type dirResponse struct { UploadDir string `json:"uploadDir,omitempty"` } +type log struct { + Name string `json:"name"` + Path string `json:"path"` +} + var muTaskId sync.RWMutex -func ReadLog(ctx iris.Context) base.Result { - startByte, _ := strconv.ParseInt(ctx.URLParam("startByte"), 10, 64) - endByte, _ := strconv.ParseInt(ctx.URLParam("endByte"), 10, 64) - dir := ctx.URLParam("dir") +func ReadImportLog(ctx iris.Context) base.Result { + offset, err := strconv.ParseInt(ctx.URLParam("offset"), 10, 64) + if err != nil { + zap.L().Warn("offset parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + limitStr := ctx.URLParam("limit") + var limit int64 + if limitStr == "" { + limit = -1 + } else { + limit, err = strconv.ParseInt(limitStr, 10, 64) + if err != nil { + zap.L().Warn("limit parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + } + path := ctx.URLParam("path") taskId := ctx.URLParam("ReadJSON") - - path := filepath.Join(dir, "import.log") - bytes, err := readFile(path, startByte, endByte) + lines, err := readFile(path, offset, limit) if err != nil { return base.Response{ Code: base.Error, @@ -62,90 +84,136 @@ func ReadLog(ctx iris.Context) base.Result { } } } - if len(bytes) == 0 && taskIdJSON[taskId]{ + if len(lines) == 0 && taskIdJSON[taskId] { return base.Response{ Code: base.Success, Data: "", } } - if len(bytes) == 0 { + if len(lines) == 0 { return base.Response{ Code: base.Error, } } - - log := string(bytes) - log = strings.Replace(log, "\n", "
", -1) return base.Response{ Code: base.Success, - Data: log, + Data: lines, } } -func readFile(path string, startByte, endByte int64) ([]byte, error) { - file, err := os.Open(path) - defer file.Close() +func ReadErrLog(ctx iris.Context) base.Result { + offset, err := strconv.ParseInt(ctx.URLParam("offset"), 10, 64) if err != nil { - zap.L().Warn("open file error", zap.Error(err)) - return nil, err - } - _, err = file.Seek(startByte, 0) - if err != nil { - zap.L().Warn("file seek error", zap.Error(err)) - return nil, err - } - stat, _ := file.Stat() - if stat.Size() < endByte { - endByte = stat.Size() - } - if endByte < startByte { - bytes := make([]byte, 0) - return bytes, nil + zap.L().Warn("offset parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } } - bytes := make([]byte, endByte-startByte) - _, err = file.Read(bytes) - if err != nil { - zap.L().Warn("read file error", zap.Error(err)) - return nil, err + limitStr := ctx.URLParam("limit") + var limit int64 + if limitStr == "" { + limit = -1 + } else { + limit, err = strconv.ParseInt(limitStr, 10, 64) + if err != nil { + zap.L().Warn("limit parse error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } } - return bytes, nil -} + taskId := ctx.URLParam("ReadJSON") + path := ctx.URLParam("path") -func CreateConfigFile(ctx iris.Context) base.Result { - type Params struct { - MountPath string `json:"mountPath"` - Config importconfig.YAMLConfig `json:"config"` - } - params := new(Params) - err := ctx.ReadJSON(params) + lines, err := readFile(path, offset, limit) if err != nil { - zap.L().Warn("config change to json wrong", zap.Error(err)) return base.Response{ Code: base.Error, Message: err.Error(), } } - fileName := "config.yaml" - dir := params.MountPath - _, err = os.Stat(dir) - if os.IsNotExist(err) { - os.MkdirAll(dir, os.ModePerm) - } - path := filepath.Join(dir, fileName) - outYaml, err := yaml.Marshal(params.Config) - err = os.WriteFile(path, outYaml, 0644) + muTaskId.RLock() + taskIdBytes, err := ioutil.ReadFile(config.Cfg.Web.TaskIdPath) + muTaskId.RUnlock() if err != nil { - zap.L().Warn("write"+path+"file error", zap.Error(err)) + zap.L().Warn("read taskId file error", zap.Error(err)) return base.Response{ Code: base.Error, Message: err.Error(), } } - + taskIdJSON := make(map[string]bool) + if len(taskIdBytes) != 0 { + err = json.Unmarshal(taskIdBytes, &taskIdJSON) + if err != nil { + zap.L().Warn("parse taskId file error", zap.Error(err)) + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + } + if len(lines) == 0 && taskIdJSON[taskId] { + return base.Response{ + Code: base.Success, + Data: "", + } + } + if len(lines) == 0 { + return base.Response{ + Code: base.Error, + } + } return base.Response{ Code: base.Success, + Data: lines, + } +} + +func readFile(path string, offset int64, limit int64) ([]string, error) { + file, err := os.Open(path) + defer file.Close() + if err != nil { + zap.L().Warn("open file error", zap.Error(err)) + return nil, err + } + scanner := bufio.NewScanner(file) + lineCount := int64(0) + startLine := 1 + offset + var endLine int64 + if limit != -1 { + endLine = startLine + limit - 1 + } else { + endLine = -1 + } + // start + if startLine != 1 { + for scanner.Scan() { + lineCount++ + if lineCount == offset { + break + } + } } + // end + res := make([]string, 0) + if endLine == -1 { + for scanner.Scan() { + res = append(res, scanner.Text()) + } + } else { + for scanner.Scan() { + lineCount++ + res = append(res, scanner.Text()) + if lineCount == endLine { + break + } + } + } + return res, nil } func Callback(ctx iris.Context) base.Result { @@ -215,7 +283,6 @@ func Callback(ctx iris.Context) base.Result { func GetWorkingDir(ctx iris.Context) base.Result { data := dirResponse{ - TaskDir: config.Cfg.Web.TasksDir, UploadDir: config.Cfg.Web.UploadDir, } return base.Response{ @@ -223,3 +290,45 @@ func GetWorkingDir(ctx iris.Context) base.Result { Data: data, } } + +func GetTaskDir(ctx iris.Context) base.Result { + taskDir := importer.GetNewTaskDir() + data := dirResponse{ + TaskDir: taskDir, + } + return base.Response{ + Code: base.Success, + Data: data, + } +} + +func GetTaskLogPaths(ctx iris.Context) base.Result { + id := ctx.Params().GetString("id") + errLogDir := filepath.Join(config.Cfg.Web.TasksDir, id, "err") + fileInfos, err := ioutil.ReadDir(errLogDir) + if err != nil { + return base.Response{ + Code: base.Error, + Message: err.Error(), + } + } + paths := make([]log, 0) + importLog := log{ + Name: "import.log", + Path: filepath.Join(config.Cfg.Web.TasksDir, id, "import.log"), + } + paths = append(paths, importLog) + for _, fileInfo := range fileInfos { + name := fileInfo.Name() + path := filepath.Join(errLogDir, name) + l := log{ + Name: name, + Path: path, + } + paths = append(paths, l) + } + return base.Response{ + Code: base.Success, + Data: paths, + } +} diff --git a/server/pkg/webserver/service/importer/importer.go b/server/pkg/webserver/service/importer/importer.go index 48dfb220..15978db2 100644 --- a/server/pkg/webserver/service/importer/importer.go +++ b/server/pkg/webserver/service/importer/importer.go @@ -3,10 +3,16 @@ package importer import ( "errors" "fmt" + "gopkg.in/yaml.v2" + "os" + "path/filepath" + "sort" "time" importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-studio/server/pkg/config" + "github.com/vesoft-inc/nebula-studio/server/pkg/utils" "go.uber.org/zap" ) @@ -22,15 +28,46 @@ type ImportResult struct { } type ActionResult struct { - Results []Task `json:"results"` - Msg string `json:"msg"` + Results []TaskInfo `json:"results"` + Msg string `json:"msg"` +} + +func GetNewTaskDir() string { + taskID := GetTaskMgr().NewTaskID() + taskDir := filepath.Join(config.Cfg.Web.TasksDir, taskID) + return taskDir +} + +func CreateConfigFile(dir string, config importconfig.YAMLConfig) error { + fileName := "config.yaml" + err := utils.CreateDir(dir) + if err != nil { + return err + } + path := filepath.Join(dir, fileName) + // erase user information + address := *config.NebulaClientSettings.Connection.Address + user := *config.NebulaClientSettings.Connection.User + password := *config.NebulaClientSettings.Connection.Password + *config.NebulaClientSettings.Connection.Address = "" + *config.NebulaClientSettings.Connection.User = "" + *config.NebulaClientSettings.Connection.Password = "" + outYaml, err := yaml.Marshal(config) + err = os.WriteFile(path, outYaml, 0644) + *config.NebulaClientSettings.Connection.Address = address + *config.NebulaClientSettings.Connection.User = user + *config.NebulaClientSettings.Connection.Password = password + if err != nil { + zap.L().Warn("write"+path+"file error", zap.Error(err)) + return err + } + return nil } func Import(taskID string, configPath string, configBody *importconfig.YAMLConfig) (err error) { zap.L().Debug(fmt.Sprintf("Start a import task: `%s`", taskID)) var conf *importconfig.YAMLConfig - if configPath != "" { conf, err = importconfig.Parse( configPath, @@ -42,43 +79,59 @@ func Import(taskID string, configPath string, configBody *importconfig.YAMLConfi } else { conf = configBody } - if err := conf.ValidateAndReset(""); err != nil { return err } task, _ := GetTaskMgr().GetTask(taskID) - go func() { result := ImportResult{} - now := time.Now() task.GetRunner().Run(conf) timeCost := time.Since(now).Milliseconds() - result.TaskId = taskID result.TimeCost = fmt.Sprintf("%dms", timeCost) - - if rerr := task.GetRunner().Error(); rerr != nil { - // task err: import task not finished err handle - task.TaskStatus = StatusAborted.String() - - err, _ := rerr.(importerErrors.ImporterError) + if rerrs := task.GetRunner().Errors(); len(rerrs) != 0 { + if len(rerrs) == 1 { + err, _ := rerrs[0].(importerErrors.ImporterError) + if err.ErrCode == importerErrors.NotCompleteError { + task.TaskInfo.TaskStatus = StatusFinished.String() + result.FailedRows = task.GetRunner().NumFailed + GetTaskMgr().FinishTask(taskID) + zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + return + } + } + // TODO task err: only return first error + task.TaskInfo.TaskStatus = StatusAborted.String() + err, _ := rerrs[0].(importerErrors.ImporterError) result.ErrorResult.ErrorCode = err.ErrCode result.ErrorResult.ErrorMsg = err.ErrMsg.Error() - task.TaskMessage = err.ErrMsg.Error() + task.TaskInfo.TaskMessage = err.ErrMsg.Error() + GetTaskMgr().FinishTask(taskID) zap.L().Warn(fmt.Sprintf("Failed to finish a import task: `%s`, task result: `%v`", taskID, result)) } else { - task.TaskStatus = StatusFinished.String() - + task.TaskInfo.TaskStatus = StatusFinished.String() result.FailedRows = task.GetRunner().NumFailed - GetTaskMgr().DelTask(taskID) + GetTaskMgr().FinishTask(taskID) zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) } }() return nil } +func ImportStatus(taskID string) (result *TaskInfo, err error) { + if t, ok := GetTaskMgr().GetTask(taskID); ok { + if t.GetRunner() != nil { + GetTaskMgr().UpdateTaskInfo(taskID) + } + result = t.TaskInfo + return result, nil + } else { + return nil, errors.New("task is not exist") + } +} + func ImportAction(taskID string, taskAction TaskAction) (result ActionResult, err error) { zap.L().Debug(fmt.Sprintf("Start a import task action: `%s` for task: `%s`", taskAction.String(), taskID)) @@ -93,6 +146,8 @@ func ImportAction(taskID string, taskAction TaskAction) (result ActionResult, er actionStop(taskID, &result) case ActionStopAll: actionStopAll(&result) + case ActionDel: + actionDel(taskID, &result) default: err = errors.New("unknown task action") } @@ -102,19 +157,24 @@ func ImportAction(taskID string, taskAction TaskAction) (result ActionResult, er return result, err } +func actionDel(taskID string, result *ActionResult) { + err := GetTaskMgr().DelTask(taskID) + if err != nil { + result.Msg = fmt.Sprintf("Task del fail, %s", err.Error()) + return + } + result.Msg = fmt.Sprintf("Task del successfully, taskID : %s", taskID) +} + func actionQuery(taskID string, result *ActionResult) { // a temp task obj for response task := Task{} + GetTaskMgr().UpdateTaskInfo(taskID) if t, ok := GetTaskMgr().GetTask(taskID); ok { - task.TaskID = t.TaskID - task.TaskStatus = t.TaskStatus - task.TaskMessage = t.TaskMessage - result.Results = append(result.Results, task) + task = *t + result.Results = append(result.Results, *task.TaskInfo) result.Msg = "Task query successfully" } else { - task.TaskID = taskID - task.TaskStatus = StatusNotExisted.String() - result.Results = append(result.Results, task) result.Msg = "Task not existed" } } @@ -127,19 +187,18 @@ func actionQueryAll(result *ActionResult) { for _, taskID := range taskIDs { actionQuery(taskID, result) } - + sort.Slice(result.Results, func(i, j int) bool { + return result.Results[i].CreatedTime > result.Results[j].CreatedTime + }) result.Msg = "Tasks query successfully" } func actionStop(taskID string, result *ActionResult) { ok := GetTaskMgr().StopTask(taskID) - actionQuery(taskID, result) - if !ok { result.Msg = "Task has stopped or finished" } - result.Msg = "Task stop successfully" } @@ -149,7 +208,7 @@ func actionStop(taskID string, result *ActionResult) { func actionStopAll(result *ActionResult) { taskIDs := GetTaskMgr().GetAllTaskIDs() for _, taskID := range taskIDs { - if _task, _ := GetTaskMgr().GetTask(taskID); _task.TaskStatus == StatusProcessing.String() { + if _task, ok := GetTaskMgr().GetTask(taskID); ok && _task.TaskInfo.TaskStatus == StatusProcessing.String() { actionStop(taskID, result) } } diff --git a/server/pkg/webserver/service/importer/task.go b/server/pkg/webserver/service/importer/task.go new file mode 100644 index 00000000..face8135 --- /dev/null +++ b/server/pkg/webserver/service/importer/task.go @@ -0,0 +1,21 @@ +package importer + +import ( + "github.com/vesoft-inc/nebula-importer/pkg/cmd" + + "go.uber.org/zap" +) + +type Task struct { + runner *cmd.Runner `json:"runner,omitempty"` + TaskInfo *TaskInfo `json:"task_info,omitempty"` +} + +func (t *Task) UpdateQueryStats() { + stats, err := t.runner.QueryStats() + if err != nil { + zap.L().Warn("query import stats fail", zap.Error(err)) + return + } + t.TaskInfo.Stats = *stats +} diff --git a/server/pkg/webserver/service/importer/taskInfo.go b/server/pkg/webserver/service/importer/taskInfo.go new file mode 100644 index 00000000..dd393131 --- /dev/null +++ b/server/pkg/webserver/service/importer/taskInfo.go @@ -0,0 +1,16 @@ +package importer + +import "github.com/vesoft-inc/nebula-importer/pkg/stats" + +type TaskInfo struct { + ID int `json:"taskID" gorm:"primaryKey;autoIncrement"` + Name string `json:"name"` + Space string `json:"space"` + NebulaAddress string `json:"nebulaAddress"` + CreatedTime int64 `json:"createdTime"` + UpdatedTime int64 `json:"updatedTime"` + User string `json:"user"` + TaskStatus string `json:"taskStatus"` + TaskMessage string `json:"taskMessage"` + Stats stats.Stats `json:"stats" gorm:"embedded"` +} diff --git a/server/pkg/webserver/service/importer/taskdb.go b/server/pkg/webserver/service/importer/taskdb.go new file mode 100644 index 00000000..84161c56 --- /dev/null +++ b/server/pkg/webserver/service/importer/taskdb.go @@ -0,0 +1,56 @@ +package importer + +import ( + "fmt" + + "github.com/vesoft-inc/nebula-studio/server/pkg/config" + + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +type TaskDb struct { + *gorm.DB +} + +/* + `InitDB` initialize local sql by open sql and create task_infos table +*/ +func InitDB() { + dbFilePath := config.Cfg.Web.SqlitedbFilePath + //os.Remove(dbFilePath) + db, err := gorm.Open(sqlite.Open(dbFilePath), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info)}) + if err != nil { + zap.L().Fatal(fmt.Sprintf("init db fail: %s", err)) + } + err = db.AutoMigrate(&TaskInfo{}) + if err != nil { + zap.L().Fatal(fmt.Sprintf("init taskInfo table fail: %s", err)) + } + GetTaskMgr().db = &TaskDb{ + DB: db, + } +} + +func (t *TaskDb) InsertTaskInfo(info *TaskInfo) { + t.Create(info) +} + +func (t *TaskDb) UpdateTaskInfo(info *TaskInfo) { + t.Model(&TaskInfo{}).Where("id = ?", info.ID).Updates(info) +} + +func (t *TaskDb) DelTaskInfo(ID int) { + t.Delete(&TaskInfo{}, ID) +} + +func (t *TaskDb) LastId() int { + var id int + if t.Raw("SELECT MAX(id) FROM task_infos").Scan(&id).Error != nil { + return 0 + } + return id +} diff --git a/server/pkg/webserver/service/importer/taskmgr.go b/server/pkg/webserver/service/importer/taskmgr.go index c5870bfd..4cf49847 100644 --- a/server/pkg/webserver/service/importer/taskmgr.go +++ b/server/pkg/webserver/service/importer/taskmgr.go @@ -1,48 +1,47 @@ package importer import ( - "database/sql" + "errors" "fmt" "os" + "path/filepath" "strconv" "sync" + "time" "github.com/vesoft-inc/nebula-importer/pkg/cmd" "github.com/vesoft-inc/nebula-studio/server/pkg/config" _ "github.com/mattn/go-sqlite3" - "go.uber.org/zap" ) var ( taskmgr *TaskMgr = &TaskMgr{ tasks: sync.Map{}, - db: &sql.DB{}, + db: &TaskDb{}, } - tid uint64 = 0 - mux sync.Mutex ) type TaskMgr struct { tasks sync.Map - db *sql.DB + db *TaskDb } -type Task struct { - runner *cmd.Runner - - TaskID string `json:"taskID"` - TaskStatus string `json:"taskStatus"` - TaskMessage string `json:"taskMessage"` -} - -func NewTask(taskID string) Task { - return Task{ - runner: &cmd.Runner{}, - TaskID: taskID, - TaskStatus: StatusProcessing.String(), +func newTask(nebulaAddress string, user string, name string, space string) *Task { + timeUnix := time.Now().Unix() + return &Task{ + runner: &cmd.Runner{}, + TaskInfo: &TaskInfo{ + Name: name, + Space: space, + CreatedTime: timeUnix, + UpdatedTime: timeUnix, + TaskStatus: StatusProcessing.String(), + NebulaAddress: nebulaAddress, + User: user, + }, } } @@ -50,19 +49,22 @@ func (task *Task) GetRunner() *cmd.Runner { return task.runner } -func GetTaskID() (_tid uint64) { - mux.Lock() - defer mux.Unlock() - _tid = tid - return _tid +func (mgr *TaskMgr) NewTaskID() string { + tid := mgr.db.LastId() + taskID := fmt.Sprintf("%v", tid+1) + return taskID } -func NewTaskID() (taskID string) { +func (mgr *TaskMgr) NewTask(nebulaAddress string, user string, name string, space string) (*Task, string, error) { mux.Lock() defer mux.Unlock() - tid++ - taskID = fmt.Sprintf("%d", tid) - return taskID + task := newTask(nebulaAddress, user, name, space) + mgr.db.InsertTaskInfo(task.TaskInfo) + tid := mgr.db.LastId() + task.TaskInfo.ID = tid + taskID := fmt.Sprintf("%v", tid) + mgr.PutTask(taskID, task) + return task, taskID, nil } func GetTaskMgr() *TaskMgr { @@ -70,61 +72,89 @@ func GetTaskMgr() *TaskMgr { } /* - `GetTask` get task from map and local sql + GetTask get task from map and local sql */ func (mgr *TaskMgr) GetTask(taskID string) (*Task, bool) { - _tid, _ := strconv.ParseUint(taskID, 0, 64) - - if _tid > GetTaskID() || tid <= 0 { - return nil, false - } - if task, ok := mgr.getTaskFromMap(taskID); ok { return task, true } - - return mgr.getTaskFromSQL(taskID), true + task := mgr.getTaskFromSQL(taskID) + // did not find task + if task.TaskInfo.ID == 0 { + return nil, false + } + return task, true } /* - `PutTask` put task into tasks map + PutTask put task into tasks map */ func (mgr *TaskMgr) PutTask(taskID string, task *Task) { mgr.tasks.Store(taskID, task) } /* - `DelTask` will delete task in the map, - and put the task into local sql + FinishTask will query task stats, delete task in the map + and update the taskInfo in local sql */ -func (mgr *TaskMgr) DelTask(taskID string) { +func (mgr *TaskMgr) FinishTask(taskID string) { task, ok := mgr.getTaskFromMap(taskID) - if !ok { return } - + task.UpdateQueryStats() + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + mgr.db.UpdateTaskInfo(task.TaskInfo) mgr.tasks.Delete(taskID) - mgr.putTaskIntoSQL(taskID, task) +} + +func (mgr *TaskMgr) DelTask(taskID string) error { + _, ok := mgr.getTaskFromMap(taskID) + if ok { + mgr.tasks.Delete(taskID) + } + id, err := strconv.Atoi(taskID) + if err != nil { + return errors.New("taskID is wrong") + } + mgr.db.DelTaskInfo(id) + taskDir := filepath.Join(config.Cfg.Web.TasksDir, taskID) + err = os.RemoveAll(taskDir) + if err != nil { + return err + } + return nil +} + +/* + UpdateTaskInfo will query task stats, update task in the map + and update the taskInfo in local sql +*/ +func (mgr *TaskMgr) UpdateTaskInfo(taskID string) { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return + } + task.UpdateQueryStats() + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + mgr.db.UpdateTaskInfo(task.TaskInfo) } /* - `StopTask` will change the task status to `StatusStoped`, - and then call `DelTask` + StopTask will change the task status to `StatusStoped`, + and then call FinishTask */ func (mgr *TaskMgr) StopTask(taskID string) bool { if task, ok := mgr.getTaskFromMap(taskID); ok { for _, r := range task.GetRunner().Readers { r.Stop() } - - task.TaskStatus = StatusStoped.String() - - mgr.DelTask(taskID) - + task.TaskInfo.TaskStatus = StatusStoped.String() + mgr.FinishTask(taskID) return true } - return false } @@ -133,69 +163,25 @@ func (mgr *TaskMgr) StopTask(taskID string) bool { */ func (mgr *TaskMgr) GetAllTaskIDs() []string { ids := make([]string, 0) - mgr.tasks.Range(func(k, v interface{}) bool { - ids = append(ids, k.(string)) - return true - }) - - return ids -} - -/* - `InitDB` initialize local sql by open sql and create tasks table -*/ -func InitDB() { - dbFilePath := config.Cfg.Web.SqlitedbFilePath - - os.Remove(dbFilePath) - - _db, err := sql.Open("sqlite3", dbFilePath) - - if err != nil { - zap.L().Fatal(err.Error()) - } - - GetTaskMgr().db = _db - - sqlStmt := ` - create table tasks (taskID integer not null primary key, taskStatus text, taskMessage text); - delete from tasks; - ` - _, err = GetTaskMgr().db.Exec(sqlStmt) - - if err != nil { - zap.L().Fatal(fmt.Sprintf("%q: %s\n", err, sqlStmt)) + for i := 1; i <= mgr.db.LastId(); i++ { + ids = append(ids, strconv.Itoa(i)) } - + return ids } func (mgr *TaskMgr) getTaskFromMap(taskID string) (*Task, bool) { if task, ok := mgr.tasks.Load(taskID); ok { return task.(*Task), true } - return nil, false } func (mgr *TaskMgr) getTaskFromSQL(taskID string) *Task { - var taskStatus string - - rows, _ := mgr.db.Query(fmt.Sprintf("SELECT taskStatus FROM tasks WHERE taskID=%s", taskID)) - - for rows.Next() { - _ = rows.Scan(&taskStatus) - } - - return &Task{ - TaskID: taskID, - TaskStatus: taskStatus, - } -} - -func (mgr *TaskMgr) putTaskIntoSQL(taskID string, task *Task) { - stmt, _ := mgr.db.Prepare("INSERT INTO tasks(taskID, taskStatus) values(?,?)") - - _, _ = stmt.Exec(taskID, task.TaskStatus) + taskInfo := new(TaskInfo) + mgr.db.First(taskInfo, taskID) + task := new(Task) + task.TaskInfo = taskInfo + return task } type TaskAction int @@ -206,6 +192,7 @@ const ( ActionQueryAll ActionStop ActionStopAll + ActionDel ) var taskActionMap = map[TaskAction]string{ @@ -213,6 +200,7 @@ var taskActionMap = map[TaskAction]string{ ActionQueryAll: "actionQueryAll", ActionStop: "actionStop", ActionStopAll: "actionStopAll", + ActionDel: "actionDel", } var taskActionRevMap = map[string]TaskAction{ @@ -220,6 +208,7 @@ var taskActionRevMap = map[string]TaskAction{ "actionQueryAll": ActionQueryAll, "actionStop": ActionStop, "actionStopAll": ActionStopAll, + "actionDel": ActionDel, } func NewTaskAction(action string) TaskAction {