Skip to content

Commit

Permalink
Add client
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Oct 16, 2019
1 parent 2033203 commit 8188be8
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 1 deletion.
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
root = true

[*]
insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true
indent_style = space
indent_size = 2

[{Makefile,go.mod,go.sum,*.go}]
indent_style = tab
indent_size = 2
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/yixinglu/nebula-importer

require (
github.com/vesoft-inc/nebula-go v0.0.0-20190929150249-5af3ab30c240 // indirect
github.com/vesoft-inc/nebula-go v0.0.0-20190929150249-5af3ab30c240
gopkg.in/yaml.v2 v2.2.4
)
108 changes: 108 additions & 0 deletions importer/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package nebula_importer

import (
"fmt"
"log"
"strconv"
"strings"

nebula "github.com/vesoft-inc/nebula-go"
graph "github.com/vesoft-inc/nebula-go/graph"
)

func NewNebulaClientPool(config *YAMLConfig, nth int, recordCh *<-chan []string, errDataCh *chan<- []string, errLogCh *chan<- string) {
for i := 0; i < config.Settings.Concurrency; i++ {
go func() {
client, err := nebula.NewClient(config.Settings.Connection.Address)
if err != nil {
log.Println(err)
return
}

if err = client.Connect(config.Settings.Connection.User, config.Settings.Connection.Password); err != nil {
log.Println(err)
return
}
defer client.Disconnect()

for {
record := <-*recordCh

stmt, err := makeStmt(config, nth, record)
if err != nil {
*errLogCh <- fmt.Sprintf("Fail to make nGQL statement, %s", err.Error())
*errDataCh <- record
continue
}

// TODO: Add some metrics for response latency, succeededCount, failedCount
resp, err := client.Execute(stmt)
if err != nil {
*errLogCh <- fmt.Sprintf("Client execute error: %s", err.Error())
*errDataCh <- record
continue
}

if resp.GetErrorCode() != graph.ErrorCode_SUCCEEDED {
*errLogCh <- fmt.Sprintf("Fail to execute: %s, error: %s", stmt, resp.GetErrorMsg())
*errDataCh <- record
continue
}
}

}()
}
}

func makeStmt(config *YAMLConfig, nth int, record []string) (string, error) {
file := config.Files[nth]
schemaType := strings.ToUpper(file.Schema.Type)

var builder strings.Builder
builder.WriteString(fmt.Sprintf("INSERT %s %s(", schemaType, file.Schema.Name))

for idx, prop := range file.Schema.Props {
builder.WriteString(prop.Name)
if idx < len(file.Schema.Props)-1 {
builder.WriteString(",")
}
}
builder.WriteString(") VALUES ")

isEdge := schemaType == "EDGE"
fromIndex := 1
if isEdge {
fromIndex = 2
}

if err := writeVID(isEdge, record, &builder); err != nil {
return "", err
}

builder.WriteString(":(")
for idx, val := range record[fromIndex:] {
builder.WriteString(val)
if idx < len(record)-1 {
builder.WriteString(",")
}
}
builder.WriteString(");")
return builder.String(), nil
}

func writeVID(isEdge bool, record []string, builder *strings.Builder) error {
vid, err := strconv.ParseInt(record[0], 10, 64)
if err != nil {
return err
}
builder.WriteString(fmt.Sprintf("%d", vid))

if isEdge {
dstVID, err := strconv.ParseInt(record[1], 10, 64)
if err != nil {
return err
}
builder.WriteString(fmt.Sprintf(" -> %d", dstVID))
}
return nil
}

0 comments on commit 8188be8

Please sign in to comment.