From 450eb80eba9935caf67e80eee193ed564d238d15 Mon Sep 17 00:00:00 2001 From: Muhammad Taha Naveed Date: Wed, 21 Aug 2024 21:42:31 +0500 Subject: [PATCH] Revamp age csv loader (#2044) * Allow 0 as entry_id - No regression test were impacted by this change. * Use batch inserts to improve performance - Changed heap_insert to heap_multi_insert since it is faster than calling heap_insert() in a loop. When multiple tuples can be inserted on a single page, just a single WAL record covering all of them, and only need to lock/unlock the page once. - BATCH_SIZE is set to 1000, which is the number of tuples to insert in a single batch. This number was chosen after some experimentation. - Change some of the field names to avoid confusion. * Use sequence for generating ids for edge and vertex - Sequence is not used if the id_field_exists is true in load_labels_from_file function, since the entry id is present in the csv. * Add function to create temporary table for ids - Created a temporary table and populate it with already generated vertex ids. A unique index is created on id column to ensure that new ids generated (using entry id from csv) are unique. * Insert generated ids in the temporary table to enforce uniqueness - Insert ids in the temporary table and also update index to enforce uniqueness. - If the entry id provided in the CSV is greater than the current sequence value, the sequence value is updated to match the entry ID. For example: Suppose the current sequence value is 1, and the CSV entry ID is 2. If we use 2 but not update the sequence to 2, next time the CREATE clause is used, 2 will be returned by sequence as an entry id, resulting in duplicate. - Update batch functions * Add functions to create graph and label automatically - These functions will check existence of graph and label, and create them if they don't exist. * Add regression tests --- regress/expected/age_load.out | 139 ++++++++---- regress/sql/age_load.sql | 74 ++++++- src/backend/catalog/ag_graph.c | 4 +- src/backend/catalog/ag_label.c | 5 + src/backend/commands/graph_commands.c | 34 ++- src/backend/utils/load/ag_load_edges.c | 125 ++++++++--- src/backend/utils/load/ag_load_labels.c | 283 +++++++++++++++++++++--- src/backend/utils/load/age_load.c | 137 +++++++++++- src/include/catalog/ag_graph.h | 1 + src/include/catalog/ag_label.h | 2 + src/include/commands/graph_commands.h | 1 + src/include/utils/graphid.h | 5 +- src/include/utils/load/ag_load_edges.h | 16 +- src/include/utils/load/ag_load_labels.h | 15 +- src/include/utils/load/age_load.h | 19 ++ 15 files changed, 724 insertions(+), 136 deletions(-) diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out index 8635a499b..8f216341b 100644 --- a/regress/expected/age_load.out +++ b/regress/expected/age_load.out @@ -19,6 +19,7 @@ \! cp -r regress/age_load/data regress/instance/data/age_load LOAD 'age'; SET search_path TO ag_catalog; +-- Create a country using CREATE clause SELECT create_graph('agload_test_graph'); NOTICE: graph "agload_test_graph" has been created create_graph @@ -26,34 +27,79 @@ NOTICE: graph "agload_test_graph" has been created (1 row) -SELECT create_vlabel('agload_test_graph','Country'); -NOTICE: VLabel "Country" has been created - create_vlabel ---------------- - +SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) RETURN n$$) as (n agtype); + n +---------------------------------------------------------------------------------- + {"id": 844424930131969, "label": "Country", "properties": {"__id__": 1}}::vertex (1 row) +-- +-- Load countries with id +-- SELECT load_labels_from_file('agload_test_graph', 'Country', - 'age_load/countries.csv'); + 'age_load/countries.csv', true); load_labels_from_file ----------------------- (1 row) -SELECT create_vlabel('agload_test_graph','City'); -NOTICE: VLabel "City" has been created - create_vlabel ---------------- - +-- A temporary table should have been created with 54 ids; 1 from CREATE and 53 from file +SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids"; + ?column? +---------- + t +(1 row) + +-- Sequence should be equal to max entry id i.e. 248 +SELECT currval('agload_test_graph."Country_id_seq"')=248; + ?column? +---------- + t (1 row) +-- Should error out on loading the same file again due to duplicate id +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', true); +ERROR: Cannot insert duplicate vertex id: 844424930131970 +HINT: Entry id 2 is already used +-- +-- Load cities with id +-- +-- Should create City label automatically and load cities SELECT load_labels_from_file('agload_test_graph', 'City', - 'age_load/cities.csv'); + 'age_load/cities.csv', true); +NOTICE: VLabel "City" has been created load_labels_from_file ----------------------- (1 row) +-- Temporary table should have 54+72485 rows now +SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids"; + ?column? +---------- + t +(1 row) + +-- Sequence should be equal to max entry id i.e. 146941 +SELECT currval('agload_test_graph."City_id_seq"')=146941; + ?column? +---------- + t +(1 row) + +-- Should error out on loading the same file again due to duplicate id +SELECT load_labels_from_file('agload_test_graph', 'City', + 'age_load/cities.csv', true); +ERROR: Cannot insert duplicate vertex id: 1125899906842777 +HINT: Entry id 153 is already used +-- +-- Load edges -- Connects cities to countries +-- +-- Should error out for using vertex label +SELECT load_edges_from_file('agload_test_graph', 'Country', + 'age_load/edges.csv'); +ERROR: label "Country" already exists as edge label SELECT create_elabel('agload_test_graph','has_city'); NOTICE: ELabel "has_city" has been created create_elabel @@ -68,6 +114,17 @@ SELECT load_edges_from_file('agload_test_graph', 'has_city', (1 row) +-- Sequence should be equal to number of edges loaded i.e. 72485 +SELECT currval('agload_test_graph."has_city_id_seq"')=72485; + ?column? +---------- + t +(1 row) + +-- Should error out for using edge label +SELECT load_labels_from_file('agload_test_graph', 'has_city', + 'age_load/cities.csv'); +ERROR: label "has_city" already exists as vertex label SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type FROM information_schema.tables WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC; @@ -83,7 +140,7 @@ WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC; SELECT COUNT(*) FROM agload_test_graph."Country"; count ------- - 53 + 54 (1 row) SELECT COUNT(*) FROM agload_test_graph."City"; @@ -101,7 +158,7 @@ SELECT COUNT(*) FROM agload_test_graph."has_city"; SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n agtype); count ------- - 72538 + 72539 (1 row) SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN e$$) as (n agtype); @@ -110,6 +167,17 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN e$$ 72485 (1 row) +-- +-- Load countries and cities without id +-- +-- Should load countries in Country label without error since it should use sequence now +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', false); + load_labels_from_file +----------------------- + +(1 row) + SELECT create_vlabel('agload_test_graph','Country2'); NOTICE: VLabel "Country2" has been created create_vlabel @@ -153,6 +221,7 @@ SELECT COUNT(*) FROM agload_test_graph."City2"; SELECT id FROM agload_test_graph."Country" LIMIT 10; id ----------------- + 844424930131969 844424930131970 844424930131971 844424930131974 @@ -162,7 +231,6 @@ SELECT id FROM agload_test_graph."Country" LIMIT 10; 844424930131996 844424930132002 844424930132023 - 844424930132025 (10 rows) SELECT id FROM agload_test_graph."Country2" LIMIT 10; @@ -180,13 +248,16 @@ SELECT id FROM agload_test_graph."Country2" LIMIT 10; 1688849860263946 (10 rows) +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 -----------------+-----------+-------- 844424930131990 | "Belgium" | "BE" -(1 row) + 844424930132223 | "Belgium" | "BE" +(2 rows) +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 @@ -194,13 +265,16 @@ SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'}) 1688849860263942 | "Belgium" | "BE" (1 row) +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 -----------------+-----------+-------- 844424930131983 | "Austria" | "AT" -(1 row) + 844424930132221 | "Austria" | "AT" +(2 rows) +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 @@ -208,15 +282,17 @@ SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'}) 1688849860263940 | "Austria" | "AT" (1 row) +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$ MATCH (u:Country {region : "Europe"}) WHERE u.name =~ 'Cro.*' - RETURN u.name, u.region -$$) AS (result_1 agtype, result_2 agtype); - result_1 | result_2 ------------+---------- - "Croatia" | "Europe" -(1 row) + RETURN id(u), u.name, u.region +$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype); + id(u) | result_1 | result_2 +-----------------+-----------+---------- + 844424930132023 | "Croatia" | "Europe" + 844424930132226 | "Croatia" | "Europe" +(2 rows) SELECT drop_graph('agload_test_graph', true); NOTICE: drop cascades to 7 other objects @@ -236,22 +312,11 @@ NOTICE: graph "agload_test_graph" has been dropped -- -- Test property type conversion -- -SELECT create_graph('agload_conversion'); -NOTICE: graph "agload_conversion" has been created - create_graph --------------- - -(1 row) - -- vertex: load as agtype -SELECT create_vlabel('agload_conversion','Person1'); -NOTICE: VLabel "Person1" has been created - create_vlabel ---------------- - -(1 row) - +-- Should create graph and label automatically SELECT load_labels_from_file('agload_conversion', 'Person1', 'age_load/conversion_vertices.csv', true, true); +NOTICE: graph "agload_conversion" has been created +NOTICE: VLabel "Person1" has been created load_labels_from_file ----------------------- diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql index cee34f59c..0d76654f9 100644 --- a/regress/sql/age_load.sql +++ b/regress/sql/age_load.sql @@ -22,20 +22,65 @@ LOAD 'age'; SET search_path TO ag_catalog; + +-- Create a country using CREATE clause SELECT create_graph('agload_test_graph'); -SELECT create_vlabel('agload_test_graph','Country'); +SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) RETURN n$$) as (n agtype); + +-- +-- Load countries with id +-- +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', true); + +-- A temporary table should have been created with 54 ids; 1 from CREATE and 53 from file +SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids"; + +-- Sequence should be equal to max entry id i.e. 248 +SELECT currval('agload_test_graph."Country_id_seq"')=248; + +-- Should error out on loading the same file again due to duplicate id SELECT load_labels_from_file('agload_test_graph', 'Country', - 'age_load/countries.csv'); + 'age_load/countries.csv', true); + +-- +-- Load cities with id +-- -SELECT create_vlabel('agload_test_graph','City'); +-- Should create City label automatically and load cities SELECT load_labels_from_file('agload_test_graph', 'City', - 'age_load/cities.csv'); + 'age_load/cities.csv', true); + +-- Temporary table should have 54+72485 rows now +SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids"; + +-- Sequence should be equal to max entry id i.e. 146941 +SELECT currval('agload_test_graph."City_id_seq"')=146941; + +-- Should error out on loading the same file again due to duplicate id +SELECT load_labels_from_file('agload_test_graph', 'City', + 'age_load/cities.csv', true); + +-- +-- Load edges -- Connects cities to countries +-- + +-- Should error out for using vertex label +SELECT load_edges_from_file('agload_test_graph', 'Country', + 'age_load/edges.csv'); SELECT create_elabel('agload_test_graph','has_city'); SELECT load_edges_from_file('agload_test_graph', 'has_city', 'age_load/edges.csv'); +-- Sequence should be equal to number of edges loaded i.e. 72485 +SELECT currval('agload_test_graph."has_city_id_seq"')=72485; + +-- Should error out for using edge label +SELECT load_labels_from_file('agload_test_graph', 'has_city', + 'age_load/cities.csv'); + SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type FROM information_schema.tables WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC; @@ -48,6 +93,14 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n ag SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN e$$) as (n agtype); +-- +-- Load countries and cities without id +-- + +-- Should load countries in Country label without error since it should use sequence now +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', false); + SELECT create_vlabel('agload_test_graph','Country2'); SELECT load_labels_from_file('agload_test_graph', 'Country2', 'age_load/countries.csv', false); @@ -62,31 +115,36 @@ SELECT COUNT(*) FROM agload_test_graph."City2"; SELECT id FROM agload_test_graph."Country" LIMIT 10; SELECT id FROM agload_test_graph."Country2" LIMIT 10; +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$ MATCH (u:Country {region : "Europe"}) WHERE u.name =~ 'Cro.*' - RETURN u.name, u.region -$$) AS (result_1 agtype, result_2 agtype); + RETURN id(u), u.name, u.region +$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype); SELECT drop_graph('agload_test_graph', true); -- -- Test property type conversion -- -SELECT create_graph('agload_conversion'); -- vertex: load as agtype -SELECT create_vlabel('agload_conversion','Person1'); + +-- Should create graph and label automatically SELECT load_labels_from_file('agload_conversion', 'Person1', 'age_load/conversion_vertices.csv', true, true); SELECT * FROM cypher('agload_conversion', $$ MATCH (n:Person1) RETURN properties(n) $$) as (a agtype); diff --git a/src/backend/catalog/ag_graph.c b/src/backend/catalog/ag_graph.c index c1e53d6ab..7000d80b7 100644 --- a/src/backend/catalog/ag_graph.c +++ b/src/backend/catalog/ag_graph.c @@ -36,8 +36,6 @@ #include "catalog/ag_graph.h" #include "utils/ag_cache.h" -static Oid get_graph_namespace(const char *graph_name); - // INSERT INTO ag_catalog.ag_graph VALUES (graph_name, nsp_id) Oid insert_graph(const Name graph_name, const Oid nsp_id) { @@ -160,7 +158,7 @@ Oid get_graph_oid(const char *graph_name) return InvalidOid; } -static Oid get_graph_namespace(const char *graph_name) +Oid get_graph_namespace(const char *graph_name) { graph_cache_data *cache_data; diff --git a/src/backend/catalog/ag_label.c b/src/backend/catalog/ag_label.c index 41a11abef..e60a1d564 100644 --- a/src/backend/catalog/ag_label.c +++ b/src/backend/catalog/ag_label.c @@ -185,6 +185,11 @@ char get_label_kind(const char *label_name, Oid label_graph) } } +char *get_label_seq_relation_name(const char *label_name) +{ + return psprintf("%s_id_seq", label_name); +} + PG_FUNCTION_INFO_V1(_label_name); /* diff --git a/src/backend/commands/graph_commands.c b/src/backend/commands/graph_commands.c index 4216652bc..e912518a5 100644 --- a/src/backend/commands/graph_commands.c +++ b/src/backend/commands/graph_commands.c @@ -43,6 +43,7 @@ #include "catalog/ag_label.h" #include "commands/label_commands.h" #include "utils/graphid.h" +#include "commands/graph_commands.h" #include "utils/name_validation.h" /* @@ -61,10 +62,7 @@ PG_FUNCTION_INFO_V1(create_graph); /* function that is evoked for creating a graph */ Datum create_graph(PG_FUNCTION_ARGS) { - char *graph; Name graph_name; - char *graph_name_str; - Oid nsp_id; //if no argument is passed with the function, graph name cannot be null if (PG_ARGISNULL(0)) @@ -76,6 +74,23 @@ Datum create_graph(PG_FUNCTION_ARGS) //gets graph name as function argument graph_name = PG_GETARG_NAME(0); + create_graph_internal(graph_name); + + ereport(NOTICE, + (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); + + /* + * According to postgres specification of c-language functions + * if function returns void this is the syntax. + */ + PG_RETURN_VOID(); +} + +Oid create_graph_internal(const Name graph_name) +{ + Oid nsp_id; + char *graph_name_str; + graph_name_str = NameStr(*graph_name); //checking if the name of the graph falls under the pre-decided graph naming conventions(regex) @@ -101,16 +116,11 @@ Datum create_graph(PG_FUNCTION_ARGS) //Increment the Command counter before create the generic labels. CommandCounterIncrement(); - //Create the default label tables - graph = graph_name->data; - create_label(graph, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL); - create_label(graph, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL); + /* Create the default label tables */ + create_label(graph_name_str, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL); + create_label(graph_name_str, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL); - ereport(NOTICE, - (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); - - //according to postgres specification of c-language functions if function returns void this is the syntax - PG_RETURN_VOID(); + return nsp_id; } static Oid create_schema_for_graph(const Name graph_name) diff --git a/src/backend/utils/load/ag_load_edges.c b/src/backend/utils/load/ag_load_edges.c index d6ae29ff0..bd65d69df 100644 --- a/src/backend/utils/load/ag_load_edges.c +++ b/src/backend/utils/load/ag_load_edges.c @@ -16,21 +16,23 @@ * specific language governing permissions and limitations * under the License. */ +#include "postgres.h" +#include "executor/spi.h" +#include "catalog/namespace.h" +#include "executor/executor.h" +#include "access/xact.h" +#include "executor/tuptable.h" -#include -#include -#include -#include -#include - -#include "utils/load/csv.h" #include "utils/load/ag_load_edges.h" -#include "utils/load/age_load.h" +#include "utils/load/csv.h" +static void init_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid); +static void finish_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid); void edge_field_cb(void *field, size_t field_len, void *data) { - csv_edge_reader *cr = (csv_edge_reader*)data; if (cr->error) { @@ -61,8 +63,8 @@ void edge_field_cb(void *field, size_t field_len, void *data) // Parser calls this function when it detects end of a row void edge_row_cb(int delim __attribute__((unused)), void *data) { - csv_edge_reader *cr = (csv_edge_reader*)data; + batch_insert_state *batch_state = cr->batch_state; size_t i, n_fields; int64 start_id_int; @@ -73,9 +75,12 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) graphid end_vertex_graph_id; int end_vertex_type_id; - graphid object_graph_id; + graphid edge_id; + int64 entry_id; - agtype* props = NULL; + Datum values[4]; + bool isnull[4] = {false, false, false, false}; + HeapTuple tuple; n_fields = cr->cur_field; @@ -94,24 +99,38 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) } else { - object_graph_id = make_graphid(cr->object_id, (int64)cr->row); + entry_id = nextval_internal(cr->label_seq_relid, true); + edge_id = make_graphid(cr->label_id, entry_id); start_id_int = strtol(cr->fields[0], NULL, 10); - start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_id); + start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid); end_id_int = strtol(cr->fields[2], NULL, 10); - end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_id); + end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid); start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int); end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int); - props = create_agtype_from_list_i(cr->header, cr->fields, - n_fields, 4, cr->load_as_agtype); + /* Fill the values */ + values[0] = GRAPHID_GET_DATUM(edge_id); + values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id); + values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id); + values[3] = AGTYPE_P_GET_DATUM( + create_agtype_from_list_i( + cr->header, cr->fields, + n_fields, 4, cr->load_as_agtype)); + + /* Create and insert the tuple into the batch state */ + tuple = heap_form_tuple(batch_state->desc, values, isnull); + batch_state->buffered_tuples[batch_state->num_tuples] = tuple; - insert_edge_simple(cr->graph_id, cr->object_name, - object_graph_id, start_vertex_graph_id, - end_vertex_graph_id, props); + batch_state->num_tuples++; - pfree(props); + if (batch_state->num_tuples >= batch_state->max_tuples) + { + /* Insert the batch when it is full (i.e. BATCH_SIZE) */ + insert_batch(batch_state, cr->label_name, cr->graph_oid); + batch_state->num_tuples = 0; + } } for (i = 0; i < n_fields; ++i) @@ -124,7 +143,6 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) ereport(NOTICE,(errmsg("THere is some error"))); } - cr->cur_field = 0; cr->curr_row_length = 0; cr->row += 1; @@ -156,9 +174,9 @@ static int is_term(unsigned char c) int create_edges_from_csv_file(char *file_path, char *graph_name, - Oid graph_id, - char *object_name, - int object_id, + Oid graph_oid, + char *label_name, + int label_id, bool load_as_agtype) { @@ -168,6 +186,8 @@ int create_edges_from_csv_file(char *file_path, size_t bytes_read; unsigned char options = 0; csv_edge_reader cr; + char *label_seq_name; + Oid nsp_id; if (csv_init(&p, options) != 0) { @@ -185,6 +205,8 @@ int create_edges_from_csv_file(char *file_path, (errmsg("Failed to open %s\n", file_path))); } + nsp_id = get_graph_namespace(graph_name); + label_seq_name = get_label_seq_relation_name(label_name); memset((void*)&cr, 0, sizeof(csv_edge_reader)); cr.alloc = 128; @@ -193,11 +215,15 @@ int create_edges_from_csv_file(char *file_path, cr.header_row_length = 0; cr.curr_row_length = 0; cr.graph_name = graph_name; - cr.graph_id = graph_id; - cr.object_name = object_name; - cr.object_id = object_id; + cr.graph_oid = graph_oid; + cr.label_name = label_name; + cr.label_id = label_id; + cr.label_seq_relid = get_relname_relid(label_seq_name, nsp_id); cr.load_as_agtype = load_as_agtype; + /* Initialize the batch insert state */ + init_edge_batch_insert(&cr.batch_state, label_name, graph_oid); + while ((bytes_read=fread(buf, 1, 1024, fp)) > 0) { if (csv_parse(&p, buf, bytes_read, edge_field_cb, @@ -210,6 +236,9 @@ int create_edges_from_csv_file(char *file_path, csv_fini(&p, edge_field_cb, edge_row_cb, &cr); + /* Finish any remaining batch inserts */ + finish_edge_batch_insert(&cr.batch_state, label_name, graph_oid); + if (ferror(fp)) { ereport(ERROR, (errmsg("Error while reading file %s\n", file_path))); @@ -221,3 +250,43 @@ int create_edges_from_csv_file(char *file_path, csv_free(&p); return EXIT_SUCCESS; } + +/* + * Initialize the batch insert state for edges. + */ +static void init_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid) +{ + Relation relation; + + // Open a temporary relation to get the tuple descriptor + relation = heap_open(get_label_relation(label_name, graph_oid), AccessShareLock); + + // Initialize the batch insert state + *batch_state = palloc(sizeof(batch_insert_state)); + (*batch_state)->max_tuples = BATCH_SIZE; + (*batch_state)->buffered_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple)); + (*batch_state)->desc = CreateTupleDescCopy(RelationGetDescr(relation)); + (*batch_state)->num_tuples = 0; + + heap_close(relation, AccessShareLock); +} + +/* + * Finish the batch insert for edges. Insert the + * remaining tuples in the batch state and clean up. + */ +static void finish_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid) +{ + if ((*batch_state)->num_tuples > 0) + { + insert_batch(*batch_state, label_name, graph_oid); + (*batch_state)->num_tuples = 0; + } + + // Clean up batch state + pfree((*batch_state)->buffered_tuples); + pfree(*batch_state); + *batch_state = NULL; +} \ No newline at end of file diff --git a/src/backend/utils/load/ag_load_labels.c b/src/backend/utils/load/ag_load_labels.c index 6f79071df..194a6dc82 100644 --- a/src/backend/utils/load/ag_load_labels.c +++ b/src/backend/utils/load/ag_load_labels.c @@ -17,11 +17,28 @@ * under the License. */ #include "postgres.h" +#include "executor/spi.h" +#include "catalog/namespace.h" +#include "executor/executor.h" +#include "access/xact.h" +#include "executor/tuptable.h" +#include "utils/rel.h" #include "utils/load/ag_load_labels.h" -#include "utils/load/age_load.h" #include "utils/load/csv.h" +static void setup_temp_table_for_vertex_ids(char *graph_name); +static void insert_batch_in_temp_table(batch_insert_state *batch_state, + Oid graph_oid, Oid relid); +static void init_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid); +static void finish_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid); +static void insert_vertex_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid, Oid temp_table_relid); + void vertex_field_cb(void *field, size_t field_len, void *data) { @@ -55,16 +72,20 @@ void vertex_field_cb(void *field, size_t field_len, void *data) void vertex_row_cb(int delim __attribute__((unused)), void *data) { - csv_vertex_reader *cr = (csv_vertex_reader*)data; - agtype *props = NULL; + batch_insert_state *batch_state = cr->batch_state; size_t i, n_fields; - graphid object_graph_id; - int64 label_id_int; + graphid vertex_id; + int64 entry_id; + Datum values[2]; + bool nulls[2] = {false, false}; + Datum temp_table_values[1]; + bool temp_table_nulls[1] = {false}; + HeapTuple tuple; + HeapTuple temp_table_tuple; n_fields = cr->cur_field; - if (cr->row == 0) { cr->header_num = cr->cur_field; @@ -82,36 +103,61 @@ void vertex_row_cb(int delim __attribute__((unused)), void *data) { if (cr->id_field_exists) { - label_id_int = strtol(cr->fields[0], NULL, 10); + entry_id = strtol(cr->fields[0], NULL, 10); + if (entry_id > cr->curr_seq_num) + { + DirectFunctionCall2(setval_oid, + ObjectIdGetDatum(cr->label_seq_relid), + Int64GetDatum(entry_id)); + cr->curr_seq_num = entry_id; + } } else { - label_id_int = (int64)cr->row; + entry_id = nextval_internal(cr->label_seq_relid, true); } - object_graph_id = make_graphid(cr->object_id, label_id_int); + vertex_id = make_graphid(cr->label_id, entry_id); - props = create_agtype_from_list(cr->header, cr->fields, - n_fields, label_id_int, - cr->load_as_agtype); - insert_vertex_simple(cr->graph_id, cr->object_name, - object_graph_id, props); - pfree(props); - } + /* Fill the values */ + values[0] = GRAPHID_GET_DATUM(vertex_id); + values[1] = AGTYPE_P_GET_DATUM( + create_agtype_from_list(cr->header, cr->fields, + n_fields, entry_id, + cr->load_as_agtype)); + + temp_table_values[0] = GRAPHID_GET_DATUM(vertex_id); + + /* Create the tuple */ + tuple = heap_form_tuple(batch_state->desc, values, nulls); + temp_table_tuple = heap_form_tuple(batch_state->id_desc, temp_table_values, + temp_table_nulls); + + /* Store the tuple in the batch state */ + batch_state->buffered_tuples[batch_state->num_tuples] = tuple; + batch_state->buffered_id_tuples[batch_state->num_tuples] = temp_table_tuple; + + batch_state->num_tuples++; + if (batch_state->num_tuples >= batch_state->max_tuples) + { + /* Insert the batch when it is full (i.e. BATCH_SIZE) */ + insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid, + cr->temp_table_relid); + batch_state->num_tuples = 0; + } + } for (i = 0; i < n_fields; ++i) { free(cr->fields[i]); } - if (cr->error) { ereport(NOTICE,(errmsg("THere is some error"))); } - cr->cur_field = 0; cr->curr_row_length = 0; cr->row += 1; @@ -143,9 +189,9 @@ static int is_term(unsigned char c) int create_labels_from_csv_file(char *file_path, char *graph_name, - Oid graph_id, - char *object_name, - int object_id, + Oid graph_oid, + char *label_name, + int label_id, bool id_field_exists, bool load_as_agtype) { @@ -156,6 +202,9 @@ int create_labels_from_csv_file(char *file_path, size_t bytes_read; unsigned char options = 0; csv_vertex_reader cr; + char *label_seq_name; + Oid temp_table_relid; + Oid nsp_id; if (csv_init(&p, options) != 0) { @@ -163,6 +212,13 @@ int create_labels_from_csv_file(char *file_path, (errmsg("Failed to initialize csv parser\n"))); } + temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name)); + if (!OidIsValid(temp_table_relid)) + { + setup_temp_table_for_vertex_ids(graph_name); + temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name)); + } + csv_set_space_func(&p, is_space); csv_set_term_func(&p, is_term); @@ -173,6 +229,8 @@ int create_labels_from_csv_file(char *file_path, (errmsg("Failed to open %s\n", file_path))); } + nsp_id = get_graph_namespace(graph_name); + label_seq_name = get_label_seq_relation_name(label_name); memset((void*)&cr, 0, sizeof(csv_vertex_reader)); @@ -182,13 +240,29 @@ int create_labels_from_csv_file(char *file_path, cr.header_row_length = 0; cr.curr_row_length = 0; cr.graph_name = graph_name; - cr.graph_id = graph_id; - cr.object_name = object_name; - cr.object_id = object_id; + cr.graph_oid = graph_oid; + cr.label_name = label_name; + cr.label_id = label_id; cr.id_field_exists = id_field_exists; + cr.label_seq_relid = get_relname_relid(label_seq_name, nsp_id); cr.load_as_agtype = load_as_agtype; + cr.temp_table_relid = temp_table_relid; + + if (cr.id_field_exists) + { + /* + * Set the curr_seq_num since we will need it to compare with + * incoming entry_id. + * + * We cant use currval because it will error out if nextval was + * not called before in the session. + */ + cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true); + } - + /* Initialize the batch insert state */ + init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid, + cr.temp_table_relid); while ((bytes_read=fread(buf, 1, 1024, fp)) > 0) { @@ -202,6 +276,10 @@ int create_labels_from_csv_file(char *file_path, csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr); + /* Finish any remaining batch inserts */ + finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid, + cr.temp_table_relid); + if (ferror(fp)) { ereport(ERROR, (errmsg("Error while reading file %s\n", @@ -214,3 +292,158 @@ int create_labels_from_csv_file(char *file_path, csv_free(&p); return EXIT_SUCCESS; } + +static void insert_vertex_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid, Oid temp_table_relid) +{ + insert_batch_in_temp_table(batch_state, graph_oid, temp_table_relid); + insert_batch(batch_state, label_name, graph_oid); +} + +/* + * Create and populate a temporary table with vertex ids that are already + * present in the graph. This table will be used to check if the new vertex + * id generated by loader is a duplicate. + * Unique index is created to enforce uniqueness of the ids. + * + * We dont need this for loading edges since the ids are generated using + * sequence and are unique. + */ +static void setup_temp_table_for_vertex_ids(char *graph_name) +{ + char *create_as_query; + char *index_query; + + create_as_query = psprintf("CREATE TEMP TABLE IF NOT EXISTS %s AS " + "SELECT DISTINCT id FROM \"%s\".%s", + GET_TEMP_VERTEX_ID_TABLE(graph_name), graph_name, + AG_DEFAULT_LABEL_VERTEX); + + index_query = psprintf("CREATE UNIQUE INDEX ON %s (id)", + GET_TEMP_VERTEX_ID_TABLE(graph_name)); + SPI_connect(); + SPI_execute(create_as_query, false, 0); + SPI_execute(index_query, false, 0); + + SPI_finish(); + + pfree(create_as_query); + pfree(index_query); +} + +/* + * Inserts batch of tuples into the temporary table. + * This function also updates the index to check for + * uniqueness of the ids. + */ +static void insert_batch_in_temp_table(batch_insert_state *batch_state, + Oid graph_oid, Oid relid) +{ + int i; + EState *estate; + ResultRelInfo *resultRelInfo; + Relation rel; + List *result; + + rel = heap_open(relid, RowExclusiveLock); + + /* Initialize executor state */ + estate = CreateExecutorState(); + + /* Initialize result relation information */ + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, rel, 1, NULL, estate->es_instrument); + estate->es_result_relation_info = resultRelInfo; + + /* Open the indices */ + ExecOpenIndices(resultRelInfo, false); + + /* Insert the batch into the temporary table */ + heap_multi_insert(rel, batch_state->buffered_id_tuples, + batch_state->num_tuples, GetCurrentCommandId(true), + false, NULL); + + for (i = 0; i < batch_state->num_tuples; i++) + { + TupleTableSlot *slot; + + slot = MakeSingleTupleTableSlot(batch_state->id_desc); + ExecStoreTuple(batch_state->buffered_id_tuples[i], + slot, InvalidBuffer, false); + result = ExecInsertIndexTuples(slot, &(batch_state->buffered_id_tuples[i]->t_self), + estate, true, NULL, NIL); + /* Check if the unique cnstraint is violated */ + if (list_length(result) != 0) + { + Datum id; + bool isnull; + + id = slot_getattr(slot, 1, &isnull); + pfree(slot); + ereport(ERROR, (errmsg("Cannot insert duplicate vertex id: %ld", + DATUM_GET_GRAPHID(id)), + errhint("Entry id %ld is already used", + get_graphid_entry_id(id)))); + } + + pfree(slot); + } + /* Clean up and close the indices */ + ExecCloseIndices(resultRelInfo); + + FreeExecutorState(estate); + heap_close(rel, RowExclusiveLock); + + CommandCounterIncrement(); +} + +/* + * Initialize the batch insert state for vertices. + */ +static void init_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid) +{ + Relation relation; + Oid relid; + Relation temp_table_relation; + + /* Open a temporary relation to get the tuple descriptor */ + relid = get_label_relation(label_name, graph_oid); + relation = heap_open(relid, AccessShareLock); + + temp_table_relation = heap_open(temp_table_relid, AccessShareLock); + + /* Initialize the batch insert state */ + *batch_state = palloc(sizeof(batch_insert_state)); + (*batch_state)->max_tuples = BATCH_SIZE; + (*batch_state)->buffered_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple)); + (*batch_state)->desc = CreateTupleDescCopy(RelationGetDescr(relation)); + (*batch_state)->id_desc = CreateTupleDescCopy(RelationGetDescr(temp_table_relation)); + (*batch_state)->buffered_id_tuples = palloc(BATCH_SIZE * sizeof(HeapTuple)); + (*batch_state)->num_tuples = 0; + + heap_close(relation, AccessShareLock); + heap_close(temp_table_relation, AccessShareLock); +} + +/* + * Finish the batch insert for vertices. Insert the + * remaining tuples in the batch state and clean up. + */ +static void finish_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid) +{ + if ((*batch_state)->num_tuples > 0) + { + insert_vertex_batch(*batch_state, label_name, graph_oid, temp_table_relid); + (*batch_state)->num_tuples = 0; + } + + /* Clean up batch state */ + pfree((*batch_state)->buffered_tuples); + pfree((*batch_state)->buffered_id_tuples); + pfree(*batch_state); + *batch_state = NULL; +} \ No newline at end of file diff --git a/src/backend/utils/load/age_load.c b/src/backend/utils/load/age_load.c index 5fa637b6e..fd90b6045 100644 --- a/src/backend/utils/load/age_load.c +++ b/src/backend/utils/load/age_load.c @@ -19,6 +19,7 @@ #include "postgres.h" #include "utils/jsonapi.h" +#include "nodes/makefuncs.h" #include "utils/load/age_load.h" #include "utils/load/ag_load_labels.h" @@ -26,6 +27,9 @@ static agtype_value *csv_value_to_agtype_value(char *csv_val); static bool json_validate(text *json); +static Oid get_or_create_graph(const Name graph_name); +static int32 get_or_create_label(Oid graph_oid, char *graph_name, + char *label_name, char label_kind); agtype *create_empty_agtype(void) { @@ -307,6 +311,33 @@ void insert_vertex_simple(Oid graph_id, char* label_name, CommandCounterIncrement(); } +void insert_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid) +{ + Relation label_relation; + BulkInsertState bistate; + Oid relid; + + // Get the relation OID + relid = get_label_relation(label_name, graph_oid); + + // Open the relation + label_relation = heap_open(relid, RowExclusiveLock); + + // Prepare the BulkInsertState + bistate = GetBulkInsertState(); + + // Perform the bulk insert + heap_multi_insert(label_relation, batch_state->buffered_tuples, + batch_state->num_tuples, GetCurrentCommandId(true), + false, bistate); + + // Clean up + FreeBulkInsertState(bistate); + heap_close(label_relation, RowExclusiveLock); + + CommandCounterIncrement(); +} PG_FUNCTION_INFO_V1(load_labels_from_file); Datum load_labels_from_file(PG_FUNCTION_ARGS) @@ -318,7 +349,7 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) char* graph_name_str; char* label_name_str; char* file_path_str; - Oid graph_id; + Oid graph_oid; int32 label_id; bool id_field_exists; bool load_as_agtype; @@ -347,19 +378,24 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) id_field_exists = PG_GETARG_BOOL(3); load_as_agtype = PG_GETARG_BOOL(4); - graph_name_str = NameStr(*graph_name); label_name_str = NameStr(*label_name); + + if (strcmp(label_name_str, "") == 0) + { + label_name_str = AG_DEFAULT_LABEL_VERTEX; + } + file_path_str = text_to_cstring(file_path); - graph_id = get_graph_oid(graph_name_str); - label_id = get_label_id(label_name_str, graph_id); + graph_oid = get_or_create_graph(graph_name); + label_id = get_or_create_label(graph_oid, graph_name_str, + label_name_str, LABEL_KIND_VERTEX); - create_labels_from_csv_file(file_path_str, graph_name_str, graph_id, + create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid, label_name_str, label_id, id_field_exists, load_as_agtype); PG_RETURN_VOID(); - } PG_FUNCTION_INFO_V1(load_edges_from_file); @@ -372,7 +408,7 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS) char* graph_name_str; char* label_name_str; char* file_path_str; - Oid graph_id; + Oid graph_oid; int32 label_id; bool load_as_agtype; @@ -401,13 +437,94 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS) graph_name_str = NameStr(*graph_name); label_name_str = NameStr(*label_name); + + if (strcmp(label_name_str, "") == 0) + { + label_name_str = AG_DEFAULT_LABEL_EDGE; + } + file_path_str = text_to_cstring(file_path); - graph_id = get_graph_oid(graph_name_str); - label_id = get_label_id(label_name_str, graph_id); + graph_oid = get_or_create_graph(graph_name); + label_id = get_or_create_label(graph_oid, graph_name_str, + label_name_str, LABEL_KIND_EDGE); - create_edges_from_csv_file(file_path_str, graph_name_str, graph_id, + create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid, label_name_str, label_id, load_as_agtype); PG_RETURN_VOID(); +} + +/* + * Helper function to create a graph if it does not exist. + * Just returns Oid of the graph if it already exists. + */ +static Oid get_or_create_graph(const Name graph_name) +{ + Oid graph_oid; + char *graph_name_str; + + graph_name_str = NameStr(*graph_name); + graph_oid = get_graph_oid(graph_name_str); + + if (OidIsValid(graph_oid)) + { + return graph_oid; + } + + create_graph_internal(graph_name); + graph_oid = get_graph_oid(graph_name_str); + + ereport(NOTICE, + (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); + + return graph_oid; } + +/* + * Helper function to create a label if it does not exist. + * Just returns label_id of the label if it already exists. + */ +static int32 get_or_create_label(Oid graph_oid, char *graph_name, + char *label_name, char label_kind) +{ + int32 label_id; + + label_id = get_label_id(label_name, graph_oid); + + /* Check if label exists */ + if (label_id_is_valid(label_id)) + { + char *label_kind_full = (label_kind == LABEL_KIND_VERTEX) + ? "vertex" : "edge"; + char opposite_label_kind = (label_kind == LABEL_KIND_VERTEX) + ? LABEL_KIND_EDGE : LABEL_KIND_VERTEX; + + /* If it exists, but as another label_kind, throw an error */ + if (get_label_kind(label_name, graph_oid) == opposite_label_kind) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("label \"%s\" already exists as %s label", + label_name, label_kind_full))); + } + } + else + { + /* Create a label */ + RangeVar *rv; + List *parent; + char *default_label = (label_kind == LABEL_KIND_VERTEX) + ? AG_DEFAULT_LABEL_VERTEX : AG_DEFAULT_LABEL_EDGE; + rv = get_label_range_var(graph_name, graph_oid, default_label); + parent = list_make1(rv); + + create_label(graph_name, label_name, label_kind, parent); + label_id = get_label_id(label_name, graph_oid); + + ereport(NOTICE, + (errmsg("VLabel \"%s\" has been created", label_name))); + } + + return label_id; +} \ No newline at end of file diff --git a/src/include/catalog/ag_graph.h b/src/include/catalog/ag_graph.h index b1b8f8e42..9b97aaf39 100644 --- a/src/include/catalog/ag_graph.h +++ b/src/include/catalog/ag_graph.h @@ -38,6 +38,7 @@ void update_graph_name(const Name graph_name, const Name new_name); Oid get_graph_oid(const char *graph_name); char *get_graph_namespace_name(const char *graph_name); +Oid get_graph_namespace(const char *graph_name); List *get_graphnames(void); void drop_graphs(List *graphnames); diff --git a/src/include/catalog/ag_label.h b/src/include/catalog/ag_label.h index 7dfa2226a..f742fb19f 100644 --- a/src/include/catalog/ag_label.h +++ b/src/include/catalog/ag_label.h @@ -75,6 +75,8 @@ int32 get_label_id(const char *label_name, Oid label_graph); Oid get_label_relation(const char *label_name, Oid label_graph); char *get_label_relation_name(const char *label_name, Oid label_graph); char get_label_kind(const char *label_name, Oid label_graph); +char *get_label_seq_relation_name(const char *label_name); + bool label_id_exists(Oid label_graph, int32 label_id); RangeVar *get_label_range_var(char *graph_name, Oid graph_oid, char *label_name); diff --git a/src/include/commands/graph_commands.h b/src/include/commands/graph_commands.h index e4d93fc1c..d456ef8c4 100644 --- a/src/include/commands/graph_commands.h +++ b/src/include/commands/graph_commands.h @@ -21,5 +21,6 @@ #define AG_GRAPH_COMMANDS_H Datum create_graph(PG_FUNCTION_ARGS); +Oid create_graph_internal(const Name graph_name); #endif diff --git a/src/include/utils/graphid.h b/src/include/utils/graphid.h index 844ce6525..fd9504dac 100644 --- a/src/include/utils/graphid.h +++ b/src/include/utils/graphid.h @@ -35,8 +35,9 @@ typedef int64 graphid; #define label_id_is_valid(id) (id >= LABEL_ID_MIN && id <= LABEL_ID_MAX) -#define ENTRY_ID_MIN INT64CONST(1) -#define ENTRY_ID_MAX INT64CONST(281474976710655) // 0x0000ffffffffffff +#define ENTRY_ID_MIN INT64CONST(0) +/* 0x0000ffffffffffff */ +#define ENTRY_ID_MAX INT64CONST(281474976710655) #define INVALID_ENTRY_ID INT64CONST(0) #define entry_id_is_valid(id) (id >= ENTRY_ID_MIN && id <= ENTRY_ID_MAX) diff --git a/src/include/utils/load/ag_load_edges.h b/src/include/utils/load/ag_load_edges.h index 57940d459..03ab89e17 100644 --- a/src/include/utils/load/ag_load_edges.h +++ b/src/include/utils/load/ag_load_edges.h @@ -17,6 +17,9 @@ * under the License. */ +#include "access/heapam.h" +#include "utils/load/age_load.h" + #ifndef AG_LOAD_EDGES_H #define AG_LOAD_EDGES_H @@ -38,21 +41,22 @@ typedef struct { size_t header_row_length; size_t curr_row_length; char *graph_name; - Oid graph_id; - char *object_name; - int object_id; + Oid graph_oid; + char *label_name; + int label_id; + Oid label_seq_relid; char *start_vertex; char *end_vertex; bool load_as_agtype; - + batch_insert_state *batch_state; } csv_edge_reader; void edge_field_cb(void *field, size_t field_len, void *data); void edge_row_cb(int delim __attribute__((unused)), void *data); -int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_id, - char *object_name, int object_id, +int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, + char *label_name, int label_id, bool load_as_agtype); #endif //AG_LOAD_EDGES_H diff --git a/src/include/utils/load/ag_load_labels.h b/src/include/utils/load/ag_load_labels.h index 71fcf97dc..5b24719f0 100644 --- a/src/include/utils/load/ag_load_labels.h +++ b/src/include/utils/load/ag_load_labels.h @@ -22,6 +22,7 @@ #define AG_LOAD_LABELS_H #include "access/heapam.h" +#include "utils/load/age_load.h" #define AGE_VERTIX 1 #define AGE_EDGE 2 @@ -45,19 +46,23 @@ typedef struct { size_t header_row_length; size_t curr_row_length; char *graph_name; - Oid graph_id; - char *object_name; - int object_id; + Oid graph_oid; + char *label_name; + int label_id; + Oid label_seq_relid; + Oid temp_table_relid; bool id_field_exists; bool load_as_agtype; + int curr_seq_num; + batch_insert_state *batch_state; } csv_vertex_reader; void vertex_field_cb(void *field, size_t field_len, void *data); void vertex_row_cb(int delim __attribute__((unused)), void *data); -int create_labels_from_csv_file(char *file_path, char *graph_name, Oid graph_id, - char *object_name, int object_id, +int create_labels_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, + char *label_name, int label_id, bool id_field_exists, bool load_as_agtype); #endif //AG_LOAD_LABELS_H diff --git a/src/include/utils/load/age_load.h b/src/include/utils/load/age_load.h index 1c77645d9..f8a18edd9 100644 --- a/src/include/utils/load/age_load.h +++ b/src/include/utils/load/age_load.h @@ -24,11 +24,28 @@ #include "catalog/ag_graph.h" #include "catalog/ag_label.h" #include "commands/label_commands.h" +#include "commands/graph_commands.h" #include "utils/ag_cache.h" #ifndef AGE_ENTITY_CREATOR_H #define AGE_ENTITY_CREATOR_H +#define TEMP_VERTEX_ID_TABLE_SUFFIX "_ag_vertex_ids" +#define GET_TEMP_VERTEX_ID_TABLE(graph_name) \ + psprintf("_%s%s", graph_name, TEMP_VERTEX_ID_TABLE_SUFFIX) + +#define BATCH_SIZE 1000 + +typedef struct +{ + HeapTuple *buffered_tuples; + HeapTuple *buffered_id_tuples; + TupleDesc desc; + TupleDesc id_desc; + int num_tuples; + int max_tuples; +} batch_insert_state; + agtype* create_empty_agtype(void); agtype* create_agtype_from_list(char **header, char **fields, @@ -42,5 +59,7 @@ void insert_vertex_simple(Oid graph_id, char *label_name, graphid vertex_id, void insert_edge_simple(Oid graph_id, char *label_name, graphid edge_id, graphid start_id, graphid end_id, agtype* end_properties); +void insert_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid); #endif //AGE_ENTITY_CREATOR_H