Skip to content

Commit

Permalink
Fix csv file importing with label (#93)
Browse files Browse the repository at this point in the history
* Test string vid

* Fix delete failure for label csv file

* Fix error message

* Fix vid format checker param
  • Loading branch information
yixinglu authored Oct 15, 2020
1 parent 2c2fb1c commit 5d1476d
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 18 deletions.
51 changes: 51 additions & 0 deletions examples/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,32 @@ files:
type: double
index: 1

- path: ./follow-with-label-and-str-vid.csv
failDataPath: ./err/follow-with-label-and-str-vid.csv
batchSize: 2
inOrder: true
type: csv
csv:
withHeader: false
withLabel: true
schema:
type: edge
edge:
name: follow
withRanking: true
srcVID:
index: 0
function: hash
dstVID:
index: 2
function: hash
rank:
index: 3
props:
- name: likeness
type: double
index: 1

- path: ./follow.csv
failDataPath: ./err/follow.csv
batchSize: 2
Expand Down Expand Up @@ -180,6 +206,31 @@ files:
- name: gender
type: string

- path: ./student-with-label-and-str-vid.csv
failDataPath: ./err/student_label_str_vid.csv
batchSize: 2
type: csv
csv:
withHeader: false
withLabel: true
schema:
type: vertex
vertex:
vid:
index: 1
function: uuid
tags:
- name: student
props:
- name: age
type: int
index: 2
- name: name
type: string
index: 1
- name: gender
type: string

- path: ./follow.csv
failDataPath: ./err/follow_index.csv
batchSize: 2
Expand Down
5 changes: 5 additions & 0 deletions examples/follow-with-label-and-str-vid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
+,一201,92.5,200,0
+,一200,85.6,201,1
+,一202,93.2,201,2
-,一201,92.5,200,0
-,一200,85.6,201,1
4 changes: 4 additions & 0 deletions examples/student-with-label-and-str-vid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
+,200,Monica,16,female
+,201,Mike,18,male
+,202,Jane,17,female
-,201,Mike,18,male
30 changes: 30 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/url"
"os"
"path/filepath"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -427,6 +428,17 @@ func (v *VID) String(vid string) string {
}
}

func (v *VID) FormatValue(record base.Record) (string, error) {
if len(record) <= *v.Index {
return "", fmt.Errorf("vid index(%d) out of range record length(%d)", *v.Index, len(record))
}
if v.Function == nil || *v.Function == "" {
return record[*v.Index], nil
} else {
return fmt.Sprintf("%s(%q)", *v.Function, record[*v.Index]), nil
}
}

func (v *VID) checkFunction(prefix string) error {
if v.Function != nil {
switch strings.ToLower(*v.Function) {
Expand Down Expand Up @@ -461,6 +473,15 @@ func (r *Rank) validateAndReset(prefix string, defaultVal int) error {
return nil
}

var re = regexp.MustCompile(`^([+-]?\d+|hash\("(.+)"\)|uuid\("(.+)"\))$`)

func checkVidFormat(vid string) error {
if !re.MatchString(vid) {
return fmt.Errorf("Invalid vid format: %s", vid)
}
return nil
}

func (e *Edge) FormatValues(record base.Record) (string, error) {
var cells []string
for i, prop := range e.Props {
Expand All @@ -480,12 +501,18 @@ func (e *Edge) FormatValues(record base.Record) (string, error) {
srcVID = fmt.Sprintf("%s(%q)", *e.SrcVID.Function, record[*e.SrcVID.Index])
} else {
srcVID = record[*e.SrcVID.Index]
if err := checkVidFormat(srcVID); err != nil {
return "", err
}
}
var dstVID string
if e.DstVID.Function != nil {
dstVID = fmt.Sprintf("%s(%q)", *e.DstVID.Function, record[*e.DstVID.Index])
} else {
dstVID = record[*e.DstVID.Index]
if err := checkVidFormat(dstVID); err != nil {
return "", err
}
}
return fmt.Sprintf(" %s->%s%s:(%s) ", srcVID, dstVID, rank, strings.Join(cells, ",")), nil
}
Expand Down Expand Up @@ -606,6 +633,9 @@ func (v *Vertex) FormatValues(record base.Record) (string, error) {
vid = fmt.Sprintf("%s(%q)", *v.VID.Function, record[*v.VID.Index])
} else {
vid = record[*v.VID.Index]
if err := checkVidFormat(vid); err != nil {
return "", err
}
}
return fmt.Sprintf(" %s: (%s)", vid, strings.Join(cells, ",")), nil
}
Expand Down
34 changes: 16 additions & 18 deletions pkg/reader/batchmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"hash/fnv"
"regexp"
"strings"

"github.com/vesoft-inc/nebula-importer/pkg/base"
Expand Down Expand Up @@ -201,20 +200,13 @@ func (bm *BatchMgr) parseProperty(r string) (columnName, columnType string) {
}
}

var re = regexp.MustCompile(`^([+-]?\d+|hash\("(.+)"\)|uuid\("(.+)"\))$`)

func (bm *BatchMgr) Add(data base.Data) error {
var vid string
if bm.Schema.IsVertex() {
vid = data.Record[*bm.Schema.Vertex.VID.Index]
} else {
vid = data.Record[*bm.Schema.Edge.SrcVID.Index]
}
if !re.MatchString(vid) {
err := fmt.Errorf("Invalid vid format: %s", vid)
bm.Batches[0].SendErrorData(data, err)
return err
}
batchIdx := getBatchId(vid, len(bm.Batches))
bm.Batches[batchIdx].Add(data)
return nil
Expand Down Expand Up @@ -298,7 +290,11 @@ func (m *BatchMgr) makeVertexInsertStmt(data []base.Data) (string, error) {
func (m *BatchMgr) makeVertexDeleteStmt(data []base.Data) (string, error) {
var idList []string
for _, d := range data {
idList = append(idList, d.Record[*m.Schema.Vertex.VID.Index])
vid, err := m.Schema.Vertex.VID.FormatValue(d.Record)
if err != nil {
return "", err
}
idList = append(idList, vid)
}
return fmt.Sprintf("DELETE VERTEX %s;", strings.Join(idList, ",")), nil
}
Expand Down Expand Up @@ -342,17 +338,19 @@ func (m *BatchMgr) makeEdgeDeleteStmt(batch []base.Data) (string, error) {
var idList []string
for _, d := range batch {
var id string
srcVid, err := m.Schema.Edge.SrcVID.FormatValue(d.Record)
if err != nil {
return "", err
}
dstVid, err := m.Schema.Edge.DstVID.FormatValue(d.Record)
if err != nil {
return "", err
}
if m.Schema.Edge.Rank != nil {
id = fmt.Sprintf("%s->%s@%s",
d.Record[*m.Schema.Edge.SrcVID.Index],
d.Record[*m.Schema.Edge.DstVID.Index],
d.Record[*m.Schema.Edge.Rank.Index],
)
rank := d.Record[*m.Schema.Edge.Rank.Index]
id = fmt.Sprintf("%s->%s@%s", srcVid, dstVid, rank)
} else {
id = fmt.Sprintf("%s->%s",
d.Record[*m.Schema.Edge.SrcVID.Index],
d.Record[*m.Schema.Edge.DstVID.Index],
)
id = fmt.Sprintf("%s->%s", srcVid, dstVid)
}
idList = append(idList, id)
}
Expand Down

0 comments on commit 5d1476d

Please sign in to comment.