diff --git a/examples/basic/basic.int.v3.yaml b/examples/basic/basic.int.v3.yaml index 4857ec5b..684d9424 100644 --- a/examples/basic/basic.int.v3.yaml +++ b/examples/basic/basic.int.v3.yaml @@ -71,6 +71,8 @@ sources: alternativeIndices: - 6 - name: Person + filter: + expr: (Record[1] == "Mahinda" or Record[1] == "Michael") and Record[3] == "male" id: type: "INT" index: 0 @@ -125,6 +127,8 @@ sources: nullValue: _NULL_ defaultValue: 0000-00-00T00:00:00 - name: KNOWS # person_knows_person + filter: + expr: Record[1] == "XXX" src: id: type: "INT" diff --git a/examples/basic/basic.string.v3.yaml b/examples/basic/basic.string.v3.yaml index 6c6623af..60fc11b0 100644 --- a/examples/basic/basic.string.v3.yaml +++ b/examples/basic/basic.string.v3.yaml @@ -69,7 +69,73 @@ sources: - name: "browserUsed" type: "STRING" index: 7 + - path: ./person.csv + filter: + expr: Record[1] != "933" + csv: + delimiter: "|" + tags: + - name: Person + id: + type: "STRING" + concatItems: + - person_ + - 0 + - _id + props: + - name: "firstName" + type: "STRING" + index: 1 + - name: "lastName" + type: "STRING" + index: 2 + - name: "gender" + type: "STRING" + index: 3 + nullable: true + defaultValue: female + - name: "birthday" + type: "DATE" + index: 4 + nullable: true + nullValue: _NULL_ + - name: "creationDate" + type: "DATETIME" + index: 5 + - name: "locationIP" + type: "STRING" + index: 6 + - name: "browserUsed" + type: "STRING" + index: 7 + - path: ./knows.csv + batch: 256 + edges: + - name: KNOWS # person_knows_person + src: + id: + type: "STRING" + concatItems: + - person_ + - 0 + - _id + dst: + id: + type: "STRING" + concatItems: + - person_ + - 1 + - _id + props: + - name: "creationDate" + type: "DATETIME" + index: 2 + nullable: true + nullValue: _NULL_ + defaultValue: 0000-00-00T00:00:00 - path: ./knows.csv + filter: + expr: Record[1] == "XXX" batch: 256 edges: - name: KNOWS # person_knows_person diff --git a/go.mod b/go.mod index c2dbe7ad..96e813b0 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/agiledragon/gomonkey/v2 v2.9.0 github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible + github.com/antonmedv/expr v1.12.5 github.com/aws/aws-sdk-go v1.44.178 github.com/cenkalti/backoff/v4 v4.1.3 github.com/colinmarc/hdfs/v2 v2.3.0 diff --git a/go.sum b/go.sum index 810dc876..d3bd1626 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible h1:KXeJoM1wo9I/6xPTyt6qCxoSZnmASiAjlrr0dyTUKt8= github.com/aliyun/aliyun-oss-go-sdk v2.2.6+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= +github.com/antonmedv/expr v1.12.5 h1:Fq4okale9swwL3OeLLs9WD9H6GbgBLJyN/NUHRv+n0E= +github.com/antonmedv/expr v1.12.5/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= github.com/aws/aws-sdk-go v1.44.178 h1:4igreoWPEA7xVLnOeSXLhDXTsTSPKQONZcQ3llWAJw0= github.com/aws/aws-sdk-go v1.44.178/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= @@ -218,11 +220,14 @@ github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUq github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= diff --git a/integration-testing/testdata/basic/basic.int.v3.yaml b/integration-testing/testdata/basic/basic.int.v3.yaml index c2c30879..bc952cee 100644 --- a/integration-testing/testdata/basic/basic.int.v3.yaml +++ b/integration-testing/testdata/basic/basic.int.v3.yaml @@ -71,6 +71,8 @@ sources: alternativeIndices: - 6 - name: Person + filter: + expr: (Record[1] == "Mahinda" or Record[1] == "Michael") and Record[3] == "male" id: type: "INT" index: 0 @@ -125,6 +127,8 @@ sources: nullValue: _NULL_ defaultValue: 0000-00-00T00:00:00 - name: KNOWS # person_knows_person + filter: + expr: Record[1] == "XXX" src: id: type: "INT" diff --git a/integration-testing/testdata/basic/basic.string.v3.yaml b/integration-testing/testdata/basic/basic.string.v3.yaml index 431e3d0e..708a9956 100644 --- a/integration-testing/testdata/basic/basic.string.v3.yaml +++ b/integration-testing/testdata/basic/basic.string.v3.yaml @@ -69,7 +69,73 @@ sources: - name: "browserUsed" type: "STRING" index: 7 + - path: ./person.csv + filter: + expr: Record[1] != "933" + csv: + delimiter: "|" + tags: + - name: Person + id: + type: "STRING" + concatItems: + - person_ + - 0 + - _id + props: + - name: "firstName" + type: "STRING" + index: 1 + - name: "lastName" + type: "STRING" + index: 2 + - name: "gender" + type: "STRING" + index: 3 + nullable: true + defaultValue: female + - name: "birthday" + type: "DATE" + index: 4 + nullable: true + nullValue: _NULL_ + - name: "creationDate" + type: "DATETIME" + index: 5 + - name: "locationIP" + type: "STRING" + index: 6 + - name: "browserUsed" + type: "STRING" + index: 7 + - path: ./knows.csv + batch: 256 + edges: + - name: KNOWS # person_knows_person + src: + id: + type: "STRING" + concatItems: + - person_ + - 0 + - _id + dst: + id: + type: "STRING" + concatItems: + - person_ + - 1 + - _id + props: + - name: "creationDate" + type: "DATETIME" + index: 2 + nullable: true + nullValue: _NULL_ + defaultValue: 0000-00-00T00:00:00 - path: ./knows.csv + filter: + expr: Record[1] == "XXX" batch: 256 edges: - name: KNOWS # person_knows_person diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index dcc5cb81..4e8f127f 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -23,4 +23,5 @@ var ( ErrNoIndicesOrConcatItems = stderrors.New("no indices or concat items") ErrUnsupportedConcatItemType = stderrors.New("unsupported concat item type") ErrUnsupportedFunction = stderrors.New("unsupported function") + ErrFilterSyntax = stderrors.New("filter syntax") ) diff --git a/pkg/errors/import.go b/pkg/errors/import.go index 40b87787..831e9dfb 100644 --- a/pkg/errors/import.go +++ b/pkg/errors/import.go @@ -195,7 +195,7 @@ func (e *ImportError) Error() string { fields = append(fields, fmt.Sprintf("%s(%s)", fieldStatement, statement)) } if len(e.Messages) > 0 { - fields = append(fields, fmt.Sprintf("%s%s", fieldMessages, strings.Join(e.Messages, ", "))) + fields = append(fields, fmt.Sprintf("%s: %s", fieldMessages, strings.Join(e.Messages, ", "))) } if e.Err != nil { fields = append(fields, e.Err.Error()) diff --git a/pkg/errors/import_test.go b/pkg/errors/import_test.go index e2e0f5e4..372bba7f 100644 --- a/pkg/errors/import_test.go +++ b/pkg/errors/import_test.go @@ -177,7 +177,7 @@ var _ = Describe("ImportError", func() { "record": []string{"record1", "record2"}, "statement": "test statement", })) - Expect(importError.Error()).To(Equal("graph(graphName): node(nodeName): edge(edgeName): nodeID(nodeIDName): prop(propName): record([record1 record2]): statement(test statement): messagestest message, test message 1: test error")) + Expect(importError.Error()).To(Equal("graph(graphName): node(nodeName): edge(edgeName): nodeID(nodeIDName): prop(propName): record([record1 record2]): statement(test statement): messages: test message, test message 1: test error")) }) It("withField", func() { diff --git a/pkg/importer/importer.go b/pkg/importer/importer.go index 3288e944..f6565e07 100644 --- a/pkg/importer/importer.go +++ b/pkg/importer/importer.go @@ -20,8 +20,9 @@ type ( } ImportResp struct { - Latency time.Duration - RespTime time.Duration + RecordNum int + Latency time.Duration + RespTime time.Duration } ImportResult struct { @@ -96,11 +97,15 @@ func WithWaitFunc(fn func()) Option { } func (i *defaultImporter) Import(records ...spec.Record) (*ImportResp, error) { - statement, err := i.builder.Build(records...) + statement, nRecord, err := i.builder.Build(records...) if err != nil { return nil, err } + if nRecord == 0 { + return &ImportResp{}, nil + } + resp, err := i.pool.Execute(statement) if err != nil { return nil, errors.NewImportError(err). @@ -112,8 +117,9 @@ func (i *defaultImporter) Import(records ...spec.Record) (*ImportResp, error) { } return &ImportResp{ - RespTime: resp.GetRespTime(), - Latency: resp.GetLatency(), + RecordNum: nRecord, + RespTime: resp.GetRespTime(), + Latency: resp.GetLatency(), }, nil } diff --git a/pkg/importer/importer_test.go b/pkg/importer/importer_test.go index 52219bb0..dd91a150 100644 --- a/pkg/importer/importer_test.go +++ b/pkg/importer/importer_test.go @@ -35,7 +35,7 @@ var _ = Describe("Importer", func() { Describe("New", func() { It("build failed", func() { - mockBuilder.EXPECT().Build(gomock.Any()).Return("", errors.ErrNoRecord) + mockBuilder.EXPECT().Build(gomock.Any()).Return("", 0, errors.ErrNoRecord) i := New(mockBuilder, mockClientPool) resp, err := i.Import(spec.Record{}) @@ -45,7 +45,7 @@ var _ = Describe("Importer", func() { }) It("execute failed", func() { - mockBuilder.EXPECT().Build(gomock.Any()).Return("statement", nil) + mockBuilder.EXPECT().Build(gomock.Any()).Return("statement", 1, nil) mockClientPool.EXPECT().Execute(gomock.Any()).Return(nil, stderrors.New("test error")) i := New(mockBuilder, mockClientPool) @@ -58,7 +58,7 @@ var _ = Describe("Importer", func() { }) It("execute IsSucceed false", func() { - mockBuilder.EXPECT().Build(gomock.Any()).Return("statement", nil) + mockBuilder.EXPECT().Build(gomock.Any()).Return("statement", 1, nil) mockClientPool.EXPECT().Execute(gomock.Any()).Times(1).Return(mockResponse, nil) mockResponse.EXPECT().IsSucceed().Times(1).Return(false) mockResponse.EXPECT().GetError().Times(1).Return(stderrors.New("status failed")) @@ -74,7 +74,7 @@ var _ = Describe("Importer", func() { }) It("execute successfully", func() { - mockBuilder.EXPECT().Build(gomock.Any()).Times(1).Return("statement", nil) + mockBuilder.EXPECT().Build(gomock.Any()).Times(1).Return("statement", 1, nil) mockClientPool.EXPECT().Execute(gomock.Any()).Times(1).Return(mockResponse, nil) mockResponse.EXPECT().IsSucceed().Times(1).Return(true) mockResponse.EXPECT().GetLatency().Times(1).Return(time.Microsecond * 10) @@ -91,7 +91,7 @@ var _ = Describe("Importer", func() { }) It("execute successfully with Add, Wait and Done", func() { - mockBuilder.EXPECT().Build(gomock.Any()).Times(2).Return("statement", nil) + mockBuilder.EXPECT().Build(gomock.Any()).Times(2).Return("statement", 1, nil) mockClientPool.EXPECT().Execute(gomock.Any()).Times(2).Return(mockResponse, nil) mockResponse.EXPECT().IsSucceed().Times(2).Return(true) mockResponse.EXPECT().GetLatency().Times(2).Return(time.Microsecond * 10) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 08152e83..760a049f 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -379,8 +379,8 @@ func (m *defaultManager) submitImporterTask(nBytes int, records spec.Records, im m.onRequestFailed(records) isFailed = true // do not return, continue the subsequent importer. - } else { - m.onRequestSucceeded(records, result) + } else if result.RecordNum > 0 { + m.onRequestSucceeded(result) } } } @@ -428,8 +428,8 @@ func (m *defaultManager) onRequestFailed(records spec.Records) { m.stats.RequestFailed(int64(len(records))) } -func (m *defaultManager) onRequestSucceeded(records spec.Records, result *importer.ImportResp) { - m.stats.RequestSucceeded(int64(len(records)), result.Latency, result.RespTime) +func (m *defaultManager) onRequestSucceeded(result *importer.ImportResp) { + m.stats.RequestSucceeded(int64(result.RecordNum), result.Latency, result.RespTime) } func (m *defaultManager) logError(err error, msg string, fields ...logger.Field) { diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 69bfbd7f..b09d7c85 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -172,8 +172,9 @@ var _ = Describe("Manager", func() { return nil, stderrors.New("import failed") } return &importer.ImportResp{ - Latency: 2 * time.Microsecond, - RespTime: 3 * time.Microsecond, + RecordNum: len(records), + Latency: 2 * time.Microsecond, + RespTime: 3 * time.Microsecond, }, nil } diff --git a/pkg/spec/base/builder.go b/pkg/spec/base/builder.go index f326c638..a1fbe581 100644 --- a/pkg/spec/base/builder.go +++ b/pkg/spec/base/builder.go @@ -4,12 +4,12 @@ package specbase type ( // StatementBuilder is the interface to build statement StatementBuilder interface { - Build(records ...Record) (string, error) + Build(records ...Record) (statement string, nRecord int, err error) } - StatementBuilderFunc func(records ...Record) (string, error) + StatementBuilderFunc func(records ...Record) (statement string, nRecord int, err error) ) -func (f StatementBuilderFunc) Build(records ...Record) (string, error) { +func (f StatementBuilderFunc) Build(records ...Record) (statement string, nRecord int, err error) { return f(records...) } diff --git a/pkg/spec/base/builder_mock.go b/pkg/spec/base/builder_mock.go index 5bb51fa7..d02ef819 100644 --- a/pkg/spec/base/builder_mock.go +++ b/pkg/spec/base/builder_mock.go @@ -34,7 +34,7 @@ func (m *MockStatementBuilder) EXPECT() *MockStatementBuilderMockRecorder { } // Build mocks base method. -func (m *MockStatementBuilder) Build(records ...Record) (string, error) { +func (m *MockStatementBuilder) Build(records ...Record) (string, int, error) { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range records { @@ -42,8 +42,9 @@ func (m *MockStatementBuilder) Build(records ...Record) (string, error) { } ret := m.ctrl.Call(m, "Build", varargs...) ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // Build indicates an expected call of Build. diff --git a/pkg/spec/base/builder_test.go b/pkg/spec/base/builder_test.go index fe3b5279..581c8572 100644 --- a/pkg/spec/base/builder_test.go +++ b/pkg/spec/base/builder_test.go @@ -7,11 +7,12 @@ import ( var _ = Describe("StatementBuilderFunc", func() { It("", func() { - var b StatementBuilder = StatementBuilderFunc(func(records ...Record) (string, error) { - return "test statement", nil + var b StatementBuilder = StatementBuilderFunc(func(records ...Record) (string, int, error) { + return "test statement", 1, nil }) - statement, err := b.Build() + statement, nRecord, err := b.Build() Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("test statement")) }) }) diff --git a/pkg/spec/base/filter.go b/pkg/spec/base/filter.go new file mode 100644 index 00000000..23c193b1 --- /dev/null +++ b/pkg/spec/base/filter.go @@ -0,0 +1,34 @@ +package specbase + +import ( + "github.com/antonmedv/expr" + "github.com/antonmedv/expr/vm" +) + +type Filter struct { + Expr string `yaml:"expr,omitempty"` + program *vm.Program +} + +func (f *Filter) Build() error { + var env = map[string]any{ + "Record": Record{}, + } + program, err := expr.Compile(f.Expr, expr.Env(env), expr.AsBool()) + if err != nil { + return err + } + f.program = program + return nil +} + +func (f *Filter) Filter(record Record) (bool, error) { + var env = map[string]any{ + "Record": record, + } + out, err := expr.Run(f.program, env) + if err != nil { + return false, err + } + return out.(bool), nil +} diff --git a/pkg/spec/base/filter_test.go b/pkg/spec/base/filter_test.go new file mode 100644 index 00000000..85828477 --- /dev/null +++ b/pkg/spec/base/filter_test.go @@ -0,0 +1,38 @@ +package specbase + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Filter", func() { + It("build failed", func() { + f := Filter{ + Expr: "", + } + err := f.Build() + Expect(err).To(HaveOccurred()) + Expect(f.program).To(BeNil()) + }) + + It("successfully", func() { + f := Filter{ + Expr: `(Record[0] == "A" or Record[0] == "B") and Record[1] != "C"`, + } + err := f.Build() + Expect(err).NotTo(HaveOccurred()) + Expect(f.program).NotTo(BeNil()) + + ok, err := f.Filter(Record{}) + Expect(err).To(HaveOccurred()) + Expect(ok).To(BeFalse()) + + ok, err = f.Filter(Record{"A", "C"}) + Expect(err).NotTo(HaveOccurred()) + Expect(ok).To(BeFalse()) + + ok, err = f.Filter(Record{"B", "D"}) + Expect(err).NotTo(HaveOccurred()) + Expect(ok).To(BeTrue()) + }) +}) diff --git a/pkg/spec/v3/edge.go b/pkg/spec/v3/edge.go index b528891e..6aafb435 100644 --- a/pkg/spec/v3/edge.go +++ b/pkg/spec/v3/edge.go @@ -6,6 +6,7 @@ import ( "github.com/vesoft-inc/nebula-importer/v4/pkg/bytebufferpool" "github.com/vesoft-inc/nebula-importer/v4/pkg/errors" + specbase "github.com/vesoft-inc/nebula-importer/v4/pkg/spec/base" "github.com/vesoft-inc/nebula-importer/v4/pkg/utils" ) @@ -19,7 +20,9 @@ type ( IgnoreExistedIndex *bool `yaml:"ignoreExistedIndex,omitempty"` - fnInsertStatement func(records ...Record) (string, error) + Filter *specbase.Filter `yaml:"filter,omitempty"` + + fnInsertStatement func(records ...Record) (string, int, error) insertPrefix string // "INSERT EDGE name(prop_name, ..., prop_name) VALUES " } @@ -72,6 +75,12 @@ func WithEdgeIgnoreExistedIndex(ignore bool) EdgeOption { } } +func WithEdgeFilter(f *specbase.Filter) EdgeOption { + return func(e *Edge) { + e.Filter = f + } +} + func (e *Edge) Options(opts ...EdgeOption) *Edge { for _, opt := range opts { opt(e) @@ -147,34 +156,49 @@ func (e *Edge) Validate() error { return e.importError(err) } + if e.Filter != nil { + if err := e.Filter.Build(); err != nil { + return e.importError(errors.ErrFilterSyntax, "%s", err) + } + } + return nil } -func (e *Edge) InsertStatement(records ...Record) (string, error) { +func (e *Edge) InsertStatement(records ...Record) (statement string, nRecord int, err error) { return e.fnInsertStatement(records...) } -func (e *Edge) insertStatementWithoutRank(records ...Record) (string, error) { +func (e *Edge) insertStatementWithoutRank(records ...Record) (statement string, nRecord int, err error) { buff := bytebufferpool.Get() defer bytebufferpool.Put(buff) buff.SetString(e.insertPrefix) - for i, record := range records { + for _, record := range records { + if e.Filter != nil { + ok, err := e.Filter.Filter(record) + if err != nil { + return "", 0, e.importError(err) + } + if !ok { // skipping those return false by Filter + continue + } + } srcIDValue, err := e.Src.IDValue(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } dstIDValue, err := e.Dst.IDValue(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } propsValueList, err := e.Props.ValueList(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } - if i > 0 { + if nRecord > 0 { _, _ = buff.WriteString(", ") } @@ -185,35 +209,51 @@ func (e *Edge) insertStatementWithoutRank(records ...Record) (string, error) { _, _ = buff.WriteString(":(") _, _ = buff.WriteStringSlice(propsValueList, ", ") _, _ = buff.WriteString(")") + + nRecord++ + } + + if nRecord == 0 { + return "", 0, nil } - return buff.String(), nil + + return buff.String(), nRecord, nil } -func (e *Edge) insertStatementWithRank(records ...Record) (string, error) { +func (e *Edge) insertStatementWithRank(records ...Record) (statement string, nRecord int, err error) { buff := bytebufferpool.Get() defer bytebufferpool.Put(buff) buff.SetString(e.insertPrefix) - for i, record := range records { + for _, record := range records { + if e.Filter != nil { + ok, err := e.Filter.Filter(record) + if err != nil { + return "", 0, e.importError(err) + } + if !ok { // skipping those return false by Filter + continue + } + } srcIDValue, err := e.Src.IDValue(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } dstIDValue, err := e.Dst.IDValue(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } rankValue, err := e.Rank.Value(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } propsValueList, err := e.Props.ValueList(record) if err != nil { - return "", e.importError(err) + return "", 0, e.importError(err) } - if i > 0 { + if nRecord > 0 { _, _ = buff.WriteString(", ") } @@ -226,11 +266,18 @@ func (e *Edge) insertStatementWithRank(records ...Record) (string, error) { _, _ = buff.WriteString(":(") _, _ = buff.WriteStringSlice(propsValueList, ", ") _, _ = buff.WriteString(")") + + nRecord++ + } + + if nRecord == 0 { + return "", 0, nil } - return buff.String(), nil + + return buff.String(), nRecord, nil } -func (e *Edge) importError(err error, formatWithArgs ...any) *errors.ImportError { //nolint:unparam +func (e *Edge) importError(err error, formatWithArgs ...any) *errors.ImportError { return errors.AsOrNewImportError(err, formatWithArgs...).SetEdgeName(e.Name) } diff --git a/pkg/spec/v3/edge_test.go b/pkg/spec/v3/edge_test.go index 411ef040..6bd1217a 100644 --- a/pkg/spec/v3/edge_test.go +++ b/pkg/spec/v3/edge_test.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/vesoft-inc/nebula-importer/v4/pkg/errors" + specbase "github.com/vesoft-inc/nebula-importer/v4/pkg/spec/base" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -143,6 +144,19 @@ var _ = Describe("Edge", func() { Expect(stderrors.Is(err, errors.ErrUnsupportedValueType)).To(BeTrue()) }) + It("filter validate failed", func() { + node := NewNode( + "name", + WithNodeID(&NodeID{Name: "id", Type: ValueTypeInt}), + WithNodeFilter(&specbase.Filter{ + Expr: "", + }), + ) + err := node.Validate() + Expect(err).To(HaveOccurred()) + Expect(stderrors.Is(err, errors.ErrFilterSyntax)).To(BeTrue()) + }) + It("success without props", func() { edge := NewEdge( "name", @@ -265,28 +279,32 @@ var _ = Describe("Edge", func() { }) It("one record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 1->\"id1\":()")) }) It("two record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}, []string{"2", "id2", "2.2", "str2"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}, []string{"2", "id2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 1->\"id1\":(), 2->\"id2\":()")) }) It("src failed", func() { - statement, err := edge.InsertStatement([]string{}) + statement, nRecord, err := edge.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("dst failed", func() { - statement, err := edge.InsertStatement([]string{"1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -322,35 +340,40 @@ var _ = Describe("Edge", func() { }) It("one record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`(`prop1`) VALUES 1->\"id1\":(\"str1\")")) }) It("two record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}, []string{"2", "id2", "2.2", "str2"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}, []string{"2", "id2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`(`prop1`) VALUES 1->\"id1\":(\"str1\"), 2->\"id2\":(\"str2\")")) }) It("src failed", func() { - statement, err := edge.InsertStatement([]string{}) + statement, nRecord, err := edge.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("dst failed", func() { - statement, err := edge.InsertStatement([]string{"1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("props failed", func() { - statement, err := edge.InsertStatement([]string{"1", "id1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -387,35 +410,40 @@ var _ = Describe("Edge", func() { }) It("one record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`(`prop1`, `prop2`) VALUES 1->\"id1\":(\"str1\", 1.1)")) }) It("two record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}, []string{"2", "id2", "2.2", "str2"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1.1", "str1"}, []string{"2", "id2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`(`prop1`, `prop2`) VALUES 1->\"id1\":(\"str1\", 1.1), 2->\"id2\":(\"str2\", 2.2)")) }) It("src failed", func() { - statement, err := edge.InsertStatement([]string{}) + statement, nRecord, err := edge.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("dst failed", func() { - statement, err := edge.InsertStatement([]string{"1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("props failed", func() { - statement, err := edge.InsertStatement([]string{"1", "id1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -453,42 +481,48 @@ var _ = Describe("Edge", func() { }) It("one record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1", "1.1", "str1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`(`prop1`, `prop2`) VALUES 1->\"id1\"@1:(\"str1\", 1.1)")) }) It("two record", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1", "1.1", "str1"}, []string{"2", "id2", "2", "2.2", "str2"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1", "1.1", "str1"}, []string{"2", "id2", "2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`(`prop1`, `prop2`) VALUES 1->\"id1\"@1:(\"str1\", 1.1), 2->\"id2\"@2:(\"str2\", 2.2)")) }) It("src failed", func() { - statement, err := edge.InsertStatement([]string{}) + statement, nRecord, err := edge.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("dst failed", func() { - statement, err := edge.InsertStatement([]string{"1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("rank failed", func() { - statement, err := edge.InsertStatement([]string{"1", "id1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("props failed", func() { - statement, err := edge.InsertStatement([]string{"1", "id1", "1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1", "1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -519,8 +553,9 @@ var _ = Describe("Edge", func() { err := edge.Validate() Expect(err).NotTo(HaveOccurred()) - statement, err := edge.InsertStatement([]string{"1", "id1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE `name`() VALUES 1->\"id1\":()")) }) It("WithEdgeIgnoreExistedIndex true", func() { @@ -548,12 +583,149 @@ var _ = Describe("Edge", func() { err := edge.Validate() Expect(err).NotTo(HaveOccurred()) - statement, err := edge.InsertStatement([]string{"1", "id1"}) + statement, nRecord, err := edge.InsertStatement([]string{"1", "id1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 1->\"id1\":()")) }) }) }) + + When("WithEdgeFilter", func() { + It("WithEdgeFilter error", func() { + edge := NewEdge( + "name", + WithEdgeSrc(&EdgeNodeRef{ + Name: "srcNodeName", + ID: &NodeID{ + Name: "id", + Type: ValueTypeInt, + Index: 0, + }, + }), + WithEdgeDst(&EdgeNodeRef{ + Name: "dstNodeName", + ID: &NodeID{ + Name: "id", + Type: ValueTypeString, + Index: 1, + }, + }), + WithEdgeFilter(&specbase.Filter{ + Expr: "", + }), + ) + edge.Complete() + err := edge.Validate() + Expect(err).To(HaveOccurred()) + Expect(stderrors.Is(err, errors.ErrFilterSyntax)).To(BeTrue()) + }) + It("WithEdgeFilter successfully", func() { + edge := NewEdge( + "name", + WithEdgeSrc(&EdgeNodeRef{ + Name: "srcNodeName", + ID: &NodeID{ + Name: "id", + Type: ValueTypeInt, + Index: 0, + }, + }), + WithEdgeDst(&EdgeNodeRef{ + Name: "dstNodeName", + ID: &NodeID{ + Name: "id", + Type: ValueTypeString, + Index: 1, + }, + }), + WithEdgeFilter(&specbase.Filter{ + Expr: `(Record[0] == "1" or Record[0] == "2" or Record[0] == "3") and Record[1] != "A"`, + }), + ) + edge.Complete() + err := edge.Validate() + Expect(err).NotTo(HaveOccurred()) + + // all true + statement, nRecord, err := edge.InsertStatement([]string{"1", "B"}, []string{"2", "C"}, []string{"3", "D"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(3)) + Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 1->\"B\":(), 2->\"C\":(), 3->\"D\":()")) + + // partially true + statement, nRecord, err = edge.InsertStatement([]string{"2", "A"}, []string{"3", "D"}, []string{"4", "E"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) + Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 3->\"D\":()")) + + // all false + statement, nRecord, err = edge.InsertStatement([]string{"1", "A"}, []string{"2", "A"}, []string{"4", "E"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(0)) + Expect(statement).To(Equal("")) + + // filter failed + statement, nRecord, err = edge.InsertStatement([]string{"1"}) + Expect(err).To(HaveOccurred()) + Expect(nRecord).To(Equal(0)) + Expect(statement).To(Equal("")) + }) + It("WithEdgeFilter rank successfully", func() { + edge := NewEdge( + "name", + WithEdgeSrc(&EdgeNodeRef{ + Name: "srcNodeName", + ID: &NodeID{ + Name: "id", + Type: ValueTypeInt, + Index: 0, + }, + }), + WithEdgeDst(&EdgeNodeRef{ + Name: "dstNodeName", + ID: &NodeID{ + Name: "id", + Type: ValueTypeString, + Index: 1, + }, + }), + WithRank(&Rank{ + Index: 0, + }), + WithEdgeFilter(&specbase.Filter{ + Expr: `(Record[0] == "1" or Record[0] == "2" or Record[0] == "3") and Record[1] != "A"`, + }), + ) + edge.Complete() + err := edge.Validate() + Expect(err).NotTo(HaveOccurred()) + + // all true + statement, nRecord, err := edge.InsertStatement([]string{"1", "B"}, []string{"2", "C"}, []string{"3", "D"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(3)) + Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 1->\"B\"@1:(), 2->\"C\"@2:(), 3->\"D\"@3:()")) + + // partially true + statement, nRecord, err = edge.InsertStatement([]string{"2", "A"}, []string{"3", "D"}, []string{"4", "E"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) + Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `name`() VALUES 3->\"D\"@3:()")) + + // all false + statement, nRecord, err = edge.InsertStatement([]string{"1", "A"}, []string{"2", "A"}, []string{"4", "E"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(0)) + Expect(statement).To(Equal("")) + + // filter failed + statement, nRecord, err = edge.InsertStatement([]string{"1"}) + Expect(err).To(HaveOccurred()) + Expect(nRecord).To(Equal(0)) + Expect(statement).To(Equal("")) + }) + }) }) var _ = Describe("Edges", func() { diff --git a/pkg/spec/v3/graph.go b/pkg/spec/v3/graph.go index 6c96c024..22f27605 100644 --- a/pkg/spec/v3/graph.go +++ b/pkg/spec/v3/graph.go @@ -71,30 +71,30 @@ func (g *Graph) Validate() error { return nil } -func (g *Graph) InsertNodeStatement(n *Node, records ...Record) (string, error) { - statement, err := n.InsertStatement(records...) +func (g *Graph) InsertNodeStatement(n *Node, records ...Record) (statement string, nRecord int, err error) { + statement, nRecord, err = n.InsertStatement(records...) if err != nil { - return "", g.importError(err).SetGraphName(g.Name).SetNodeName(n.Name) + return "", 0, g.importError(err).SetGraphName(g.Name).SetNodeName(n.Name) } - return statement, nil + return statement, nRecord, nil } func (g *Graph) InsertNodeBuilder(n *Node) specbase.StatementBuilder { - return specbase.StatementBuilderFunc(func(records ...specbase.Record) (string, error) { + return specbase.StatementBuilderFunc(func(records ...specbase.Record) (string, int, error) { return g.InsertNodeStatement(n, records...) }) } -func (g *Graph) InsertEdgeStatement(e *Edge, records ...Record) (string, error) { - statement, err := e.InsertStatement(records...) +func (g *Graph) InsertEdgeStatement(e *Edge, records ...Record) (statement string, nRecord int, err error) { + statement, nRecord, err = e.InsertStatement(records...) if err != nil { - return "", g.importError(err).SetGraphName(g.Name).SetEdgeName(e.Name) + return "", 0, g.importError(err).SetGraphName(g.Name).SetEdgeName(e.Name) } - return statement, nil + return statement, nRecord, nil } func (g *Graph) InsertEdgeBuilder(e *Edge) specbase.StatementBuilder { - return specbase.StatementBuilderFunc(func(records ...specbase.Record) (string, error) { + return specbase.StatementBuilderFunc(func(records ...specbase.Record) (string, int, error) { return g.InsertEdgeStatement(e, records...) }) } diff --git a/pkg/spec/v3/graph_test.go b/pkg/spec/v3/graph_test.go index 7deaa339..62ff97ef 100644 --- a/pkg/spec/v3/graph_test.go +++ b/pkg/spec/v3/graph_test.go @@ -173,27 +173,31 @@ var _ = Describe("Graph", func() { It("success", func() { node := graph.Nodes[0] - statement, err := graph.InsertNodeStatement(node, []string{"1"}) + statement, nRecord, err := graph.InsertNodeStatement(node, []string{"1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `node1`() VALUES 1:()")) b := graph.InsertNodeBuilder(node) - statement, err = b.Build([]string{"1"}) + statement, nRecord, err = b.Build([]string{"1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `node1`() VALUES 1:()")) }) It("failed", func() { node := graph.Nodes[0] - statement, err := graph.InsertNodeStatement(node, []string{}) + statement, nRecord, err := graph.InsertNodeStatement(node, []string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(Equal("")) b := graph.InsertNodeBuilder(node) - statement, err = b.Build([]string{}) + statement, nRecord, err = b.Build([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(Equal("")) }) }) @@ -232,27 +236,31 @@ var _ = Describe("Graph", func() { It("success", func() { edge := graph.Edges[0] - statement, err := graph.InsertEdgeStatement(edge, []string{"1", "2"}) + statement, nRecord, err := graph.InsertEdgeStatement(edge, []string{"1", "2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `edge1`() VALUES 1->2:()")) b := graph.InsertEdgeBuilder(edge) - statement, err = b.Build([]string{"1", "2"}) + statement, nRecord, err = b.Build([]string{"1", "2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT EDGE IGNORE_EXISTED_INDEX `edge1`() VALUES 1->2:()")) }) It("failed", func() { edge := graph.Edges[0] - statement, err := graph.InsertEdgeStatement(edge, []string{}) + statement, nRecord, err := graph.InsertEdgeStatement(edge, []string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(Equal("")) b := graph.InsertEdgeBuilder(edge) - statement, err = b.Build([]string{}) + statement, nRecord, err = b.Build([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(Equal("")) }) }) diff --git a/pkg/spec/v3/node.go b/pkg/spec/v3/node.go index eac24587..8f6c7e67 100644 --- a/pkg/spec/v3/node.go +++ b/pkg/spec/v3/node.go @@ -6,6 +6,7 @@ import ( "github.com/vesoft-inc/nebula-importer/v4/pkg/bytebufferpool" "github.com/vesoft-inc/nebula-importer/v4/pkg/errors" + specbase "github.com/vesoft-inc/nebula-importer/v4/pkg/spec/base" "github.com/vesoft-inc/nebula-importer/v4/pkg/utils" ) @@ -18,6 +19,8 @@ type ( IgnoreExistedIndex *bool `yaml:"ignoreExistedIndex,omitempty"` + Filter *specbase.Filter `yaml:"filter,omitempty"` + insertPrefix string // // "INSERT EDGE name(prop_name, ..., prop_name) VALUES " } @@ -53,6 +56,12 @@ func WithNodeIgnoreExistedIndex(ignore bool) NodeOption { } } +func WithNodeFilter(f *specbase.Filter) NodeOption { + return func(n *Node) { + n.Filter = f + } +} + func (n *Node) Options(opts ...NodeOption) *Node { for _, opt := range opts { opt(n) @@ -96,26 +105,41 @@ func (n *Node) Validate() error { return n.importError(err) } + if n.Filter != nil { + if err := n.Filter.Build(); err != nil { + return n.importError(errors.ErrFilterSyntax, "%s", err) + } + } + return nil } -func (n *Node) InsertStatement(records ...Record) (string, error) { +func (n *Node) InsertStatement(records ...Record) (statement string, nRecord int, err error) { buff := bytebufferpool.Get() defer bytebufferpool.Put(buff) buff.SetString(n.insertPrefix) - for i, record := range records { + for _, record := range records { + if n.Filter != nil { + ok, err := n.Filter.Filter(record) + if err != nil { + return "", 0, n.importError(err) + } + if !ok { // skipping those return false by Filter + continue + } + } idValue, err := n.ID.Value(record) if err != nil { - return "", n.importError(err) + return "", 0, n.importError(err) } propsValueList, err := n.Props.ValueList(record) if err != nil { - return "", n.importError(err) + return "", 0, n.importError(err) } - if i > 0 { + if nRecord > 0 { _, _ = buff.WriteString(", ") } @@ -124,11 +148,18 @@ func (n *Node) InsertStatement(records ...Record) (string, error) { _, _ = buff.WriteString(":(") _, _ = buff.WriteStringSlice(propsValueList, ", ") _, _ = buff.WriteString(")") + + nRecord++ } - return buff.String(), nil + + if nRecord == 0 { + return "", 0, nil + } + + return buff.String(), nRecord, nil } -func (n *Node) importError(err error, formatWithArgs ...any) *errors.ImportError { //nolint:unparam +func (n *Node) importError(err error, formatWithArgs ...any) *errors.ImportError { return errors.AsOrNewImportError(err, formatWithArgs...).SetNodeName(n.Name) } diff --git a/pkg/spec/v3/node_test.go b/pkg/spec/v3/node_test.go index 7e27d4d3..11ed14cc 100644 --- a/pkg/spec/v3/node_test.go +++ b/pkg/spec/v3/node_test.go @@ -4,6 +4,7 @@ import ( stderrors "errors" "github.com/vesoft-inc/nebula-importer/v4/pkg/errors" + specbase "github.com/vesoft-inc/nebula-importer/v4/pkg/spec/base" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -71,6 +72,19 @@ var _ = Describe("Node", func() { Expect(stderrors.Is(err, errors.ErrUnsupportedValueType)).To(BeTrue()) }) + It("filter validate failed", func() { + node := NewNode( + "name", + WithNodeID(&NodeID{Name: "id", Type: ValueTypeInt}), + WithNodeFilter(&specbase.Filter{ + Expr: "", + }), + ) + err := node.Validate() + Expect(err).To(HaveOccurred()) + Expect(stderrors.Is(err, errors.ErrFilterSyntax)).To(BeTrue()) + }) + It("success without props", func() { node := NewNode( "name", @@ -105,21 +119,24 @@ var _ = Describe("Node", func() { }) It("one record", func() { - statement, err := node.InsertStatement([]string{"1", "1.1", "str1"}) + statement, nRecord, err := node.InsertStatement([]string{"1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`() VALUES 1:()")) }) It("two record", func() { - statement, err := node.InsertStatement([]string{"1", "1.1", "str1"}, []string{"2", "2.2", "str2"}) + statement, nRecord, err := node.InsertStatement([]string{"1", "1.1", "str1"}, []string{"2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`() VALUES 1:(), 2:()")) }) It("failed id no record", func() { - statement, err := node.InsertStatement([]string{}) + statement, nRecord, err := node.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -140,28 +157,32 @@ var _ = Describe("Node", func() { }) It("one record", func() { - statement, err := node.InsertStatement([]string{"1", "1.1", "str1"}) + statement, nRecord, err := node.InsertStatement([]string{"1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`(`prop1`) VALUES 1:(\"str1\")")) }) It("two record", func() { - statement, err := node.InsertStatement([]string{"1", "1.1", "str1"}, []string{"2", "2.2", "str2"}) + statement, nRecord, err := node.InsertStatement([]string{"1", "1.1", "str1"}, []string{"2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`(`prop1`) VALUES 1:(\"str1\"), 2:(\"str2\")")) }) It("failed id no record", func() { - statement, err := node.InsertStatement([]string{}) + statement, nRecord, err := node.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("failed prop no record", func() { - statement, err := node.InsertStatement([]string{"1"}) + statement, nRecord, err := node.InsertStatement([]string{"1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -183,28 +204,32 @@ var _ = Describe("Node", func() { }) It("one record", func() { - statement, err := node.InsertStatement([]string{"1", "1.1", "str1"}) + statement, nRecord, err := node.InsertStatement([]string{"1", "1.1", "str1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`(`prop1`, `prop2`) VALUES 1:(\"str1\", 1.1)")) }) It("two record", func() { - statement, err := node.InsertStatement([]string{"1", "1.1", "str1"}, []string{"2", "2.2", "str2"}) + statement, nRecord, err := node.InsertStatement([]string{"1", "1.1", "str1"}, []string{"2", "2.2", "str2"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(2)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`(`prop1`, `prop2`) VALUES 1:(\"str1\", 1.1), 2:(\"str2\", 2.2)")) }) It("failed id no record", func() { - statement, err := node.InsertStatement([]string{}) + statement, nRecord, err := node.InsertStatement([]string{}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) It("failed prop no record", func() { - statement, err := node.InsertStatement([]string{"1"}) + statement, nRecord, err := node.InsertStatement([]string{"1"}) Expect(err).To(HaveOccurred()) Expect(stderrors.Is(err, errors.ErrNoRecord)).To(BeTrue()) + Expect(nRecord).To(Equal(0)) Expect(statement).To(BeEmpty()) }) }) @@ -221,8 +246,9 @@ var _ = Describe("Node", func() { err := node.Validate() Expect(err).NotTo(HaveOccurred()) - statement, err := node.InsertStatement([]string{"1"}) + statement, nRecord, err := node.InsertStatement([]string{"1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX `name`() VALUES 1:()")) }) It("WithNodeIgnoreExistedIndex true", func() { @@ -235,11 +261,64 @@ var _ = Describe("Node", func() { err := node.Validate() Expect(err).NotTo(HaveOccurred()) - statement, err := node.InsertStatement([]string{"1"}) + statement, nRecord, err := node.InsertStatement([]string{"1"}) Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`() VALUES 1:()")) }) }) + + When("WithNodeFilter", func() { + It("WithNodeFilter error", func() { + node := NewNode( + "name", + WithNodeID(&NodeID{Name: "id", Type: ValueTypeInt, Index: 0}), + WithNodeFilter(&specbase.Filter{ + Expr: "", + }), + ) + node.Complete() + err := node.Validate() + Expect(err).To(HaveOccurred()) + Expect(stderrors.Is(err, errors.ErrFilterSyntax)).To(BeTrue()) + }) + It("WithNodeFilter successfully", func() { + node := NewNode( + "name", + WithNodeID(&NodeID{Name: "id", Type: ValueTypeInt, Index: 0}), + WithNodeFilter(&specbase.Filter{ + Expr: `(Record[0] == "1" or Record[0] == "2" or Record[0] == "3") and Record[1] != "A"`, + }), + ) + node.Complete() + err := node.Validate() + Expect(err).NotTo(HaveOccurred()) + + // all true + statement, nRecord, err := node.InsertStatement([]string{"1", "B"}, []string{"2", "C"}, []string{"3", "D"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(3)) + Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`() VALUES 1:(), 2:(), 3:()")) + + // partially true + statement, nRecord, err = node.InsertStatement([]string{"2", "A"}, []string{"3", "D"}, []string{"4", "E"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(1)) + Expect(statement).To(Equal("INSERT VERTEX IGNORE_EXISTED_INDEX `name`() VALUES 3:()")) + + // all false + statement, nRecord, err = node.InsertStatement([]string{"1", "A"}, []string{"2", "A"}, []string{"4", "E"}) + Expect(err).NotTo(HaveOccurred()) + Expect(nRecord).To(Equal(0)) + Expect(statement).To(Equal("")) + + // filter failed + statement, nRecord, err = node.InsertStatement([]string{"1"}) + Expect(err).To(HaveOccurred()) + Expect(nRecord).To(Equal(0)) + Expect(statement).To(Equal("")) + }) + }) }) var _ = Describe("Nodes", func() {