From 150403c0ec703d3dbe17ea0b212036def0a3dbb9 Mon Sep 17 00:00:00 2001 From: Xin Hao Date: Tue, 2 Apr 2024 16:53:31 +0800 Subject: [PATCH] AddTag and AddEdge support TTL (#328) Supports build QL with TTL wip fix docs fix --- label.go | 82 ++++++++++++++++++++-------- label_test.go | 29 ++++++++++ schema_manager.go | 50 ++++++++++------- session_pool.go | 24 +++++++++ session_pool_test.go | 124 +++++++++++++++++++++++++++++++++++++++++++ util.go | 54 +++++++++++++++++++ util_test.go | 14 +++++ 7 files changed, 336 insertions(+), 41 deletions(-) diff --git a/label.go b/label.go index 496d6635..5c7aec07 100644 --- a/label.go +++ b/label.go @@ -8,9 +8,18 @@ package nebula_go import ( + "fmt" "strings" ) +type LabelName struct { + Name string `nebula:"Name"` +} + +type SpaceName struct { + Name string `nebula:"Name"` +} + type Label struct { Field string `nebula:"Field"` Type string `nebula:"Type"` @@ -19,21 +28,23 @@ type Label struct { Comment string `nebula:"Comment"` } +type LabelSchema struct { + Name string + Fields []LabelFieldSchema + TTLDuration uint + TTLCol string +} + type LabelFieldSchema struct { Field string Type string Nullable bool } -type LabelSchema struct { - Name string - Fields []LabelFieldSchema -} - func (tag LabelSchema) BuildCreateTagQL() string { q := "CREATE TAG IF NOT EXISTS " + tag.Name + " (" - fs := []string{} + fields := []string{} for _, field := range tag.Fields { t := field.Type if t == "" { @@ -43,10 +54,16 @@ func (tag LabelSchema) BuildCreateTagQL() string { if !field.Nullable { n = "NOT NULL" } - fs = append(fs, field.Field+" "+t+" "+n) + fields = append(fields, field.Field+" "+t+" "+n) } - q += strings.Join(fs, ", ") + ");" + ttl := tag.buildTTL_QL() + + if ttl != "" { + q += strings.Join(fields, ", ") + ") " + ttl + ";" + } else { + q += strings.Join(fields, ", ") + ");" + } return q } @@ -59,7 +76,7 @@ func (tag LabelSchema) BuildDropTagQL() string { func (edge LabelSchema) BuildCreateEdgeQL() string { q := "CREATE EDGE IF NOT EXISTS " + edge.Name + " (" - fs := []string{} + fields := []string{} for _, field := range edge.Fields { t := field.Type if t == "" { @@ -69,14 +86,18 @@ func (edge LabelSchema) BuildCreateEdgeQL() string { if !field.Nullable { n = "NOT NULL" } - fs = append(fs, field.Field+" "+t+" "+n) + fields = append(fields, field.Field+" "+t+" "+n) } - if len(fs) > 0 { - q += strings.Join(fs, ", ") + ttl := edge.buildTTL_QL() + + if ttl != "" { + q += strings.Join(fields, ", ") + ") " + ttl + ";" + } else { + q += strings.Join(fields, ", ") + ");" } - return q + ");" + return q } func (edge LabelSchema) BuildDropEdgeQL() string { @@ -84,6 +105,33 @@ func (edge LabelSchema) BuildDropEdgeQL() string { return q } +func (label LabelSchema) buildTTL_QL() string { + ttl := "" + if label.TTLCol != "" { + if !label.isTTLColValid() { + panic(fmt.Errorf("TTL column %s does not exist in the fields", label.TTLCol)) + } + ttl = fmt.Sprintf(`TTL_DURATION = %d, TTL_COL = "%s"`, label.TTLDuration, label.TTLCol) + } + + return ttl +} + +func (label LabelSchema) isTTLColValid() bool { + if label.TTLCol == "" { + // no ttl column is valid + return true + } + + for _, field := range label.Fields { + if field.Field == label.TTLCol { + return true + } + } + + return false +} + func (field LabelFieldSchema) BuildAddTagFieldQL(labelName string) string { q := "ALTER TAG " + labelName + " ADD (" + field.Field + " " + field.Type if !field.Nullable { @@ -107,11 +155,3 @@ func (field Label) BuildDropTagFieldQL(labelName string) string { func (field Label) BuildDropEdgeFieldQL(labelName string) string { return "ALTER EDGE " + labelName + " DROP (" + field.Field + ");" } - -type LabelName struct { - Name string `nebula:"Name"` -} - -type SpaceName struct { - Name string `nebula:"Name"` -} diff --git a/label_test.go b/label_test.go index 2e85b8ce..42ed8895 100644 --- a/label_test.go +++ b/label_test.go @@ -33,6 +33,20 @@ func TestBuildCreateTagQL(t *testing.T) { } assert.Equal(t, "CREATE TAG IF NOT EXISTS account (name string NOT NULL, email string NULL, phone string NULL);", tag.BuildCreateTagQL()) assert.Equal(t, "DROP TAG IF EXISTS account;", tag.BuildDropTagQL()) + + tag.TTLDuration = 100 + tag.TTLCol = "created_at" + assert.PanicsWithError(t, "TTL column created_at does not exist in the fields", func() { + tag.BuildCreateTagQL() + }) + + tag.Fields = append(tag.Fields, LabelFieldSchema{ + Field: "created_at", + Type: "timestamp", + Nullable: true, + }) + + assert.Equal(t, `CREATE TAG IF NOT EXISTS account (name string NOT NULL, email string NULL, phone string NULL, created_at timestamp NULL) TTL_DURATION = 100, TTL_COL = "created_at";`, tag.BuildCreateTagQL()) } func TestBuildCreateEdgeQL(t *testing.T) { @@ -47,6 +61,21 @@ func TestBuildCreateEdgeQL(t *testing.T) { } assert.Equal(t, "CREATE EDGE IF NOT EXISTS account_email (email string NOT NULL);", edge.BuildCreateEdgeQL()) assert.Equal(t, "DROP EDGE IF EXISTS account_email;", edge.BuildDropEdgeQL()) + + edge.TTLDuration = 100 + edge.TTLCol = "created_at" + + assert.PanicsWithError(t, "TTL column created_at does not exist in the fields", func() { + edge.BuildCreateEdgeQL() + }) + + edge.Fields = append(edge.Fields, LabelFieldSchema{ + Field: "created_at", + Type: "timestamp", + Nullable: true, + }) + + assert.Equal(t, `CREATE EDGE IF NOT EXISTS account_email (email string NOT NULL, created_at timestamp NULL) TTL_DURATION = 100, TTL_COL = "created_at";`, edge.BuildCreateEdgeQL()) } func TestBuildAddFieldQL(t *testing.T) { diff --git a/schema_manager.go b/schema_manager.go index 63c72b5a..226561e5 100644 --- a/schema_manager.go +++ b/schema_manager.go @@ -35,6 +35,8 @@ func (mgr *SchemaManager) WithVerbose(verbose bool) *SchemaManager { // 2.2 If the field type is different, it will return an error. // 2.3 If a field exists in the graph but not in the given tag, // it will be removed. +// 3. If the tag exists and the fields are the same, +// it will be checked if the TTL is set as expected. // // Notice: // We won't change the field type because it has @@ -43,7 +45,7 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { // 1. Make sure the tag exists fields, err := mgr.pool.DescTag(tag.Name) if err != nil { - // 2. If the tag does not exist, create it + // If the tag does not exist, create it if strings.Contains(err.Error(), ErrorTagNotFound) { if mgr.verbose { log.Printf("ApplyTag: create the not existing tag. name=%s\n", tag.Name) @@ -53,15 +55,15 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { return nil, err } - // 3. The tag exists, add new fields if needed - // 3.1 Prepare the new fields + // 2. The tag exists, add new fields if needed + // 2.1 Prepare the new fields addFieldQLs := []string{} for _, expected := range tag.Fields { found := false for _, actual := range fields { if expected.Field == actual.Field { found = true - // 3.2 Check if the field type is different + // 2.2 Check if the field type is different if expected.Type != actual.Type { return nil, fmt.Errorf("field type is different. "+ "Expected: %s, Actual: %s", expected.Type, actual.Type) @@ -70,12 +72,12 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { } } if !found { - // 3.3 Add the not exists field QL + // 2.3 Add the not exists field QL q := expected.BuildAddTagFieldQL(tag.Name) addFieldQLs = append(addFieldQLs, q) } } - // 3.4 Execute the add field QLs if needed + // 2.4 Execute the add field QLs if needed if len(addFieldQLs) > 0 { queries := strings.Join(addFieldQLs, " ") if mgr.verbose { @@ -87,8 +89,8 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { } } - // 4. Remove the not expected field if needed - // 4.1 Prepare the not expected fields + // 3. Remove the not expected field if needed + // 3.1 Prepare the not expected fields dropFieldQLs := []string{} for _, actual := range fields { redundant := true @@ -99,12 +101,12 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { } } if redundant { - // 4.2 Remove the not expected field + // 3.2 Remove the not expected field q := actual.BuildDropTagFieldQL(tag.Name) dropFieldQLs = append(dropFieldQLs, q) } } - // 4.3 Execute the drop field QLs if needed + // 3.3 Execute the drop field QLs if needed if len(dropFieldQLs) > 0 { queries := strings.Join(dropFieldQLs, " ") if mgr.verbose { @@ -116,6 +118,9 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { } } + // 4. Check if the TTL is set as expected. + // @TODO + return nil, nil } @@ -126,6 +131,8 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) { // 2.2 If the field type is different, it will return an error. // 2.3 If a field exists in the graph but not in the given edge, // it will be removed. +// 3. If the edge exists and the fields are the same, +// it will be checked if the TTL is set as expected. // // Notice: // We won't change the field type because it has @@ -134,7 +141,7 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) { // 1. Make sure the edge exists fields, err := mgr.pool.DescEdge(edge.Name) if err != nil { - // 2. If the edge does not exist, create it + // If the edge does not exist, create it if strings.Contains(err.Error(), ErrorEdgeNotFound) { if mgr.verbose { log.Printf("ApplyEdge: create the not existing edge. name=%s\n", edge.Name) @@ -144,15 +151,15 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) { return nil, err } - // 3. The edge exists now, add new fields if needed - // 3.1 Prepare the new fields + // 2. The edge exists now, add new fields if needed + // 2.1 Prepare the new fields addFieldQLs := []string{} for _, expected := range edge.Fields { found := false for _, actual := range fields { if expected.Field == actual.Field { found = true - // 3.2 Check if the field type is different + // 2.2 Check if the field type is different if expected.Type != actual.Type { return nil, fmt.Errorf("field type is different. "+ "Expected: %s, Actual: %s", expected.Type, actual.Type) @@ -161,12 +168,12 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) { } } if !found { - // 3.3 Add the not exists field QL + // 2.3 Add the not exists field QL q := expected.BuildAddEdgeFieldQL(edge.Name) addFieldQLs = append(addFieldQLs, q) } } - // 3.4 Execute the add field QLs if needed + // 2.4 Execute the add field QLs if needed if len(addFieldQLs) > 0 { queries := strings.Join(addFieldQLs, " ") if mgr.verbose { @@ -178,8 +185,8 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) { } } - // 4. Remove the not expected field if needed - // 4.1 Prepare the not expected fields + // 3. Remove the not expected field if needed + // 3.1 Prepare the not expected fields dropFieldQLs := []string{} for _, actual := range fields { redundant := true @@ -190,12 +197,12 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) { } } if redundant { - // 4.2 Remove the not expected field + // 3.2 Remove the not expected field q := actual.BuildDropEdgeFieldQL(edge.Name) dropFieldQLs = append(dropFieldQLs, q) } } - // 4.3 Execute the drop field QLs if needed + // 3.3 Execute the drop field QLs if needed if len(dropFieldQLs) > 0 { queries := strings.Join(dropFieldQLs, "") if mgr.verbose { @@ -207,5 +214,8 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) { } } + // 4. Check if the TTL is set as expected. + // @TODO + return nil, nil } diff --git a/session_pool.go b/session_pool.go index c699c8bc..97cd3265 100644 --- a/session_pool.go +++ b/session_pool.go @@ -329,6 +329,18 @@ func (pool *SessionPool) AddTagTTL(tagName string, colName string, duration uint return rs, nil } +func (pool *SessionPool) GetTagTTL(tagName string) (string, uint, error) { + q := fmt.Sprintf("SHOW CREATE TAG %s;", tagName) + rs, err := pool.ExecuteAndCheck(q) + if err != nil { + return "", 0, err + } + + s := string(rs.GetRows()[0].Values[1].GetSVal()) + + return parseTTL(s) +} + func (pool *SessionPool) DescTag(tagName string) ([]Label, error) { q := fmt.Sprintf("DESC TAG %s;", tagName) rs, err := pool.ExecuteAndCheck(q) @@ -373,6 +385,18 @@ func (pool *SessionPool) AddEdgeTTL(tagName string, colName string, duration uin return rs, nil } +func (pool *SessionPool) GetEdgeTTL(edgeName string) (string, uint, error) { + q := fmt.Sprintf("SHOW CREATE EDGE %s;", edgeName) + rs, err := pool.ExecuteAndCheck(q) + if err != nil { + return "", 0, err + } + + s := string(rs.GetRows()[0].Values[1].GetSVal()) + + return parseTTL(s) +} + func (pool *SessionPool) DescEdge(edgeName string) ([]Label, error) { q := fmt.Sprintf("DESC EDGE %s;", edgeName) rs, err := pool.ExecuteAndCheck(q) diff --git a/session_pool_test.go b/session_pool_test.go index ac6f5b42..2a18f716 100644 --- a/session_pool_test.go +++ b/session_pool_test.go @@ -469,6 +469,130 @@ func TestSessionPoolCreateTagAndEdge(t *testing.T) { assert.Equal(t, "string", labels[0].Type) } +func TestSessionPoolCreateTagAndEdgeWithTTL(t *testing.T) { + spaceName := "test_space_schema_ttl" + err := prepareSpace(spaceName) + if err != nil { + t.Fatal(err) + } + defer dropSpace(spaceName) + + hostAddress := HostAddress{Host: address, Port: port} + config, err := NewSessionPoolConf( + "root", + "nebula", + []HostAddress{hostAddress}, + spaceName) + if err != nil { + t.Errorf("failed to create session pool config, %s", err.Error()) + } + + // allow only one session in the pool so it is easier to test + config.maxSize = 1 + + // create session pool + sessionPool, err := NewSessionPool(*config, DefaultLogger{}) + if err != nil { + t.Fatal(err) + } + defer sessionPool.Close() + + spaces, err := sessionPool.ShowSpaces() + if err != nil { + t.Fatal(err) + } + assert.LessOrEqual(t, 1, len(spaces), "should have at least 1 space") + var spaceNames []string + for _, space := range spaces { + spaceNames = append(spaceNames, space.Name) + } + assert.Contains(t, spaceNames, spaceName) + + tagSchema := LabelSchema{ + Name: "user", + Fields: []LabelFieldSchema{ + { + Field: "name", + Nullable: false, + }, + { + Field: "created_at", + Type: "int64", + Nullable: true, + }, + }, + TTLCol: "created_at", + TTLDuration: 5, + } + + _, err = sessionPool.CreateTag(tagSchema) + if err != nil { + t.Fatal(err) + } + + tags, err := sessionPool.ShowTags() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 1, len(tags)) + assert.Equal(t, "user", tags[0].Name) + labels, err := sessionPool.DescTag("user") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 2, len(labels)) + assert.Equal(t, "name", labels[0].Field) + assert.Equal(t, "string", labels[0].Type) + assert.Equal(t, "created_at", labels[1].Field) + assert.Equal(t, "int64", labels[1].Type) + + col, duration, err := sessionPool.GetTagTTL("user") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "created_at", col) + assert.Equal(t, uint(5), duration) + + edgeSchema := LabelSchema{ + Name: "friend", + Fields: []LabelFieldSchema{ + { + Field: "created_at", + Type: "int64", + Nullable: false, + }, + }, + TTLCol: "created_at", + TTLDuration: 5, + } + + _, err = sessionPool.CreateEdge(edgeSchema) + if err != nil { + t.Fatal(err) + } + + edges, err := sessionPool.ShowEdges() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 1, len(edges)) + assert.Equal(t, "friend", edges[0].Name) + labels, err = sessionPool.DescEdge("friend") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 1, len(labels)) + assert.Equal(t, "created_at", labels[0].Field) + assert.Equal(t, "int64", labels[0].Type) + + col, duration, err = sessionPool.GetEdgeTTL("friend") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "created_at", col) + assert.Equal(t, uint(5), duration) +} + func TestSessionPoolAddTTL(t *testing.T) { spaceName := "test_space_ttl" err := prepareSpace(spaceName) diff --git a/util.go b/util.go index df469692..a816c000 100644 --- a/util.go +++ b/util.go @@ -1,5 +1,10 @@ package nebula_go +import ( + "regexp" + "strconv" +) + func IndexOf(collection []string, element string) int { for i, item := range collection { if item == element { @@ -9,3 +14,52 @@ func IndexOf(collection []string, element string) int { return -1 } + +func parseTTL(s string) (string, uint, error) { + col, err := parseTTLCol(s) + if err != nil { + return "", 0, err + } + + duration, err := parseTTLDuration(s) + if err != nil { + return "", 0, err + } + + return col, duration, nil +} + +func parseTTLCol(s string) (string, error) { + reg, err := regexp.Compile(`ttl_col = "(\w+)"`) + ss := reg.FindStringSubmatch(s) + + if err != nil { + return "", err + } + + if len(ss) == 2 { + return ss[1], nil + } + + return "", nil +} + +func parseTTLDuration(s string) (uint, error) { + reg, err := regexp.Compile(`ttl_duration = (\d+)`) + + if err != nil { + return 0, err + } + + ss := reg.FindStringSubmatch(s) + + if len(ss) == 2 { + ttl, err := strconv.Atoi(ss[1]) + if err != nil { + return 0, err + } + return uint(ttl), nil + } + + return 0, nil +} diff --git a/util_test.go b/util_test.go index 8c640012..769caaff 100644 --- a/util_test.go +++ b/util_test.go @@ -13,3 +13,17 @@ func TestUtil_IndexOf(t *testing.T) { assert.Equal(t, IndexOf(collection, "c"), 2) assert.Equal(t, IndexOf(collection, "d"), -1) } + +func TestUtil_parseTTL(t *testing.T) { + s := "CREATE TAG `user` (\n\t\t`name` string NOT NULL,\n\t\t`created_at` int64 NULL\n\t) ttl_duration = 5, ttl_col = \"created_at\"" + col, duration, err := parseTTL(s) + assert.Nil(t, err) + assert.Equal(t, col, "created_at") + assert.Equal(t, duration, uint(5)) + + s = "CREATE TAG `user` (\n\t\t`name` string NOT NULL,\n\t\t`created_at` int64 NULL\n\t) ttl_duration = 0, ttl_col = \"\"" + col, duration, err = parseTTL(s) + assert.Nil(t, err) + assert.Equal(t, col, "") + assert.Equal(t, duration, uint(0)) +}