Skip to content

Commit

Permalink
Fix MERGE visibility in chained commands, SET specifically.
Browse files Browse the repository at this point in the history
Background:

The MERGE command searches for a pattern to return. If that pattern
isn't found, it will create it and return the newly created pattern.
In both cases, MERGE will return one or more tuples.

When MERGE is chained with other commands, the tuples generated
(either found or created) by MERGE are passed up to the parent
commands. Those parent commands can then make modifications to
those tuples, if necessary.

Issue:

The issue here was that the newly created tuples were not visible,
meaning the currentCommandId used to create them was not strictly
less than the currentCommandId used to update them further in the
chain.

The error with the processed tuples would show up on the surface (the
return value) as being correct. However, after inspection, it would
be shown that the tuple wasn't actually modified by any of the chained
commands. Note, this was only the case for newly created tuples.

merge (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n;
                                        n
----------------------------------------------------------------------------------
 {"id": 844424930131970, "label": "node", "properties": {"name": "Lisa"}}::vertex
(1 row)

match (n) return n;
                                        n
----------------------------------------------------------------------------------
 {"id": 844424930131970, "label": "node", "properties": {"name": "Jason"}}::vertex
(1 row)

To fix this, the currentCommandId needed to be incremented after creating
a tuple in MERGE. This would allow the chained commands that follow, to
see it.  However, the currentCommandId used by the MERGE still needed to
remain the same. This made it necessary to create a field in the custom
scan node to hold the original currentCommandId for the MERGE instances'
updates. What this does is to always keep the newly created MERGE tuples
updated with a currentCommandId that is always 1 less than the currrent
currentCommandId.

Doing this corrected the issue.

merge (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n;
                                        n
----------------------------------------------------------------------------------
 {"id": 844424930131971, "label": "node", "properties": {"name": "Lisa"}}::vertex
(1 row)

match (n) return n;
----------------------------------------------------------------------------------
 {"id": 844424930131971, "label": "node", "properties": {"name": "Lisa"}}::vertex
(1 row)

Added regression tests.
  • Loading branch information
jrgemignani committed Jul 7, 2022
1 parent 6fbdfa8 commit 99e7c62
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 24 deletions.
49 changes: 48 additions & 1 deletion regress/expected/cypher_merge.out
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,54 @@ SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype
(1 row)

-- Node doesn't exist, is created, then set
-- to be added -jrg
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) DELETE n $$) AS (n agtype);
n
---
(0 rows)

SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
n
---
(0 rows)

SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n $$) AS (n agtype);
n
-----------------------------------------------------------------------------------
{"id": 2533274790395906, "label": "node", "properties": {"name": "Lisa"}}::vertex
(1 row)

SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
n
-----------------------------------------------------------------------------------
{"id": 2533274790395906, "label": "node", "properties": {"name": "Lisa"}}::vertex
(1 row)

-- Multiple SETs
SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Lisa'}) SET n.age = 23, n.gender = "Female" RETURN n $$) AS (n agtype);
n
------------------------------------------------------------------------------------------------------------------
{"id": 2533274790395906, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
(1 row)

SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
n
------------------------------------------------------------------------------------------------------------------
{"id": 2533274790395906, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
(1 row)

SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa', n.age = 23, n.gender = 'Female' RETURN n $$) AS (n agtype);
n
------------------------------------------------------------------------------------------------------------------
{"id": 2533274790395907, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
(1 row)

SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
n
------------------------------------------------------------------------------------------------------------------
{"id": 2533274790395906, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
{"id": 2533274790395907, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
(2 rows)

--clean up
SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype);
a
Expand Down
10 changes: 9 additions & 1 deletion regress/sql/cypher_merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,15 @@ SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.nam
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);

-- Node doesn't exist, is created, then set
-- to be added -jrg
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) DELETE n $$) AS (n agtype);
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n $$) AS (n agtype);
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
-- Multiple SETs
SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Lisa'}) SET n.age = 23, n.gender = "Female" RETURN n $$) AS (n agtype);
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa', n.age = 23, n.gender = 'Female' RETURN n $$) AS (n agtype);
SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);

--clean up
SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype);
Expand Down
70 changes: 56 additions & 14 deletions src/backend/executor/cypher_merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate,
cypher_node->prop_expr_state =
ExecInitExpr(cypher_node->prop_expr, (PlanState *)node);
}


}

/*
Expand All @@ -166,6 +164,9 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate,
if (estate->es_output_cid == 0)
estate->es_output_cid = estate->es_snapshot->curcid;

/* store the currentCommandId for this instance */
css->base_currentCommandId = GetCurrentCommandId(false);

Increment_Estate_CommandId(estate);
}

Expand Down Expand Up @@ -656,18 +657,57 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,

ExecClearTuple(elemTupleSlot);

// get the next graphid for this vertex.
/* get the next graphid for this vertex */
id = ExecEvalExpr(node->id_expr_state, econtext, &isNull);
elemTupleSlot->tts_values[vertex_tuple_id] = id;
elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull;

// get the properties for this vertex
/* get the properties for this vertex */
prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull);
elemTupleSlot->tts_values[vertex_tuple_properties] = prop;
elemTupleSlot->tts_isnull[vertex_tuple_properties] = isNull;

// Insert the new vertex
insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);
/*
* Insert the new vertex.
*
* Depending on the currentCommandId, we need to do this one of two
* different ways -
*
* 1) If they are equal, the currentCommandId hasn't been used for an
* update, or it hasn't been incremented after being used. In either
* case, we need to use the current one and then increment it so that
* the following commands will have visibility of this update. Note,
* it isn't our job to update the currentCommandId first and then do
* this check.
*
* 2) If they are not equal, the currentCommandId has been used and/or
* updated. In this case, we can't use it. Otherwise our update won't
* be visible to anything that follows, until the currentCommandId is
* updated again. Remember, visibility is, greater than but not equal
* to, the currentCommandID used for the update. So, in this case we
* need to use the original currentCommandId when begin_cypher_merge
* was initiated as everything under this instance of merge needs to
* be based off of that initial currentCommandId. This allows the
* following command to see the updates generated by this instance of
* merge.
*/
if (css->base_currentCommandId == GetCurrentCommandId(false))
{
insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);

/*
* Increment the currentCommandId since we processed an update. We
* don't want to do this outside of this block because we don't want
* to inadvertently or unnecessarily update the commandCounterId of
* another command.
*/
CommandCounterIncrement();
}
else
{
insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate,
css->base_currentCommandId);
}

/* restore the old result relation info */
estate->es_result_relation_info = old_estate_es_result_relation_info;
Expand All @@ -682,11 +722,11 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
{
Datum result;

// make the vertex agtype
/* make the vertex agtype */
result = make_vertex(
id, CStringGetDatum(node->label_name), prop);

// append to the path list
/* append to the path list */
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
css->path_values = lappend(css->path_values,
Expand Down Expand Up @@ -724,22 +764,22 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
node->variable_name)));
}

// get the vertex agtype in the scanTupleSlot
/* get the vertex agtype in the scanTupleSlot */
d = scantuple->tts_values[node->tuple_position - 1];
a = DATUM_GET_AGTYPE_P(d);

// Convert to an agtype value
/* Convert to an agtype value */
v = get_ith_agtype_value_from_container(&a->root, 0);

if (v->type != AGTV_VERTEX)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("agtype must resolve to a vertex")));

// extract the id agtype field
/* extract the id agtype field */
id_value = GET_AGTYPE_VALUE_OBJECT_VALUE(v, "id");

// extract the graphid and cast to a Datum
/* extract the graphid and cast to a Datum */
id = GRAPHID_GET_DATUM(id_value->val.int_value);

/*
Expand All @@ -764,7 +804,9 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
}
}

// add the Datum to the list of entities for creating the path variable
/*
* Add the Datum to the list of entities for creating the path variable
*/
if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
{
Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
Expand All @@ -773,7 +815,7 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
}
}

// If the path continues, create the next edge, passing the vertex's id.
/* If the path continues, create the next edge, passing the vertex's id. */
if (next != NULL)
{
merge_edge(css, lfirst(next), id, lnext(next));
Expand Down
33 changes: 25 additions & 8 deletions src/backend/executor/cypher_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,30 +207,47 @@ bool entity_exists(EState *estate, Oid graph_oid, graphid id)
}

/*
* Insert the edge/vertex tuple into the table and indices. If the table's
* constraints have not been violated.
* Insert the edge/vertex tuple into the table and indices. Check that the
* table's constraints have not been violated.
*
* This function defaults to, and flags for update, the currentCommandId. If
* you need to pass a specific cid and avoid using the currentCommandId, use
* insert_entity_tuple_cid instead.
*/
HeapTuple insert_entity_tuple(ResultRelInfo *resultRelInfo,
TupleTableSlot *elemTupleSlot,
EState *estate)
{
HeapTuple tuple;
return insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate,
GetCurrentCommandId(true));
}

/*
* Insert the edge/vertex tuple into the table and indices. Check that the
* table's constraints have not been violated.
*
* This function uses the passed cid for updates.
*/
HeapTuple insert_entity_tuple_cid(ResultRelInfo *resultRelInfo,
TupleTableSlot *elemTupleSlot,
EState *estate, CommandId cid)
{
HeapTuple tuple = NULL;

ExecStoreVirtualTuple(elemTupleSlot);
tuple = ExecMaterializeSlot(elemTupleSlot);

// Check the constraints of the tuple
/* Check the constraints of the tuple */
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
if (resultRelInfo->ri_RelationDesc->rd_att->constr != NULL)
{
ExecConstraints(resultRelInfo, elemTupleSlot, estate);
}

// Insert the tuple normally
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
GetCurrentCommandId(true), 0, NULL);
/* Insert the tuple using the passed in cid */
heap_insert(resultRelInfo->ri_RelationDesc, tuple, cid, 0, NULL);

// Insert index entries for the tuple
/* Insert index entries for the tuple */
if (resultRelInfo->ri_NumIndices > 0)
{
ExecInsertIndexTuples(elemTupleSlot, &(tuple->t_self), estate, false,
Expand Down
4 changes: 4 additions & 0 deletions src/include/executor/cypher_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct cypher_merge_custom_scan_state
AttrNumber merge_function_attr;
bool created_new_path;
bool found_a_path;
CommandId base_currentCommandId;
} cypher_merge_custom_scan_state;

TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot,
Expand All @@ -103,5 +104,8 @@ bool entity_exists(EState *estate, Oid graph_oid, graphid id);
HeapTuple insert_entity_tuple(ResultRelInfo *resultRelInfo,
TupleTableSlot *elemTupleSlot,
EState *estate);
HeapTuple insert_entity_tuple_cid(ResultRelInfo *resultRelInfo,
TupleTableSlot *elemTupleSlot,
EState *estate, CommandId cid);

#endif

0 comments on commit 99e7c62

Please sign in to comment.