Skip to content

Commit

Permalink
Expand target entries with merged array/jsonb subscripting ops into m…
Browse files Browse the repository at this point in the history
…ultiple ones

When re-writing query tree, postgres combines multiple subscripting
operators referencing to the same attribute into a single `TargetEntry`
by linking `SubscriptingRef` objects to each other via their `refexpr`
fields. (See `rewriteTargetListIU` & `process_matched_tle` functions.)

However, ruleutils function `get_update_query_targetlist_def` doesn't
know how to properly deparse such a `TargetEntry`. As a result, we were
only taking the last set-by-subscript operation into account when
generating the shard query for such an `UPDATE` command.

In postgres, this doesn't cause any problems (e.g.: when generating
definition of a rule based object) since the query-rewrite
transformations aren't performed on the query tree that
`get_update_query_targetlist_def` is expected to process.

For this reason; with this commit, before processing the target entry
list in our ruleutils based deparser, we first expand such target
entries into multiple ones.

To detect such `SubscriptingRef` objects, we also need to investigate
`FieldStore` and `CoerceToDomain` objects as postgres functions
processing `SubscriptingRef` objects do --although they do so for
different purposes. However, given that Citus already doesn't allow
`INSERT/UPDATE` via `FieldStore`, we only do that for `CoerceToDomain`
objects.
  • Loading branch information
onurctirtir committed Feb 9, 2022
1 parent 1e3c8e3 commit dc29396
Show file tree
Hide file tree
Showing 17 changed files with 521 additions and 11 deletions.
127 changes: 127 additions & 0 deletions src/backend/distributed/deparser/citus_ruleutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ static void deparse_index_columns(StringInfo buffer, List *indexParameterList,
static void AppendStorageParametersToString(StringInfo stringBuffer,
List *optionList);
static void simple_quote_literal(StringInfo buf, const char *val);
static SubscriptingRef * TargetEntryExprFindSubsRef(Expr *expr);
static char * flatten_reloptions(Oid relid);
static void AddVacuumParams(ReindexStmt *reindexStmt, StringInfo buffer);

Expand Down Expand Up @@ -1353,3 +1354,129 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier)
}
}
}


/*
* ExpandMergedSubscriptingRefEntries takes a list of target entries and expands
* each one that references a SubscriptingRef node that indicates multiple (field)
* updates on the same attribute, which is applicable for array/json types atm.
*/
List *
ExpandMergedSubscriptingRefEntries(List *targetEntryList)
{
List *newTargetEntryList = NIL;

TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, targetEntryList)
{
List *expandedTargetEntries = NIL;

Expr *expr = targetEntry->expr;
while (expr)
{
SubscriptingRef *subsRef = TargetEntryExprFindSubsRef(expr);
if (!subsRef)
{
break;
}

/*
* Remove refexpr from the SubscriptingRef that we are about to
* wrap in a new TargetEntry and save it for the next one.
*/
Expr *refexpr = subsRef->refexpr;
subsRef->refexpr = NULL;

/*
* Wrap the Expr that holds SubscriptingRef (directly or indirectly)
* in a new TargetEntry; note that it doesn't have a refexpr anymore.
*/
TargetEntry *newTargetEntry = copyObject(targetEntry);
newTargetEntry->expr = expr;
expandedTargetEntries = lappend(expandedTargetEntries, newTargetEntry);

/* now inspect the refexpr that SubscriptingRef at hand were holding */
expr = refexpr;
}

if (expandedTargetEntries == NIL)
{
/* return original entry since it doesn't hold a SubscriptingRef node */
newTargetEntryList = lappend(newTargetEntryList, targetEntry);
}
else
{
/*
* Need to concat expanded target list entries in reverse order
* to preserve ordering of the original target entry list.
*/
newTargetEntryList = list_concat(newTargetEntryList,
list_reverse(expandedTargetEntries));
}
}

return newTargetEntryList;
}


/*
* TargetEntryExprFindSubsRef searches given Expr --assuming that it is part
* of a target list entry-- to see if it directly (i.e.: itself) or indirectly
* (e.g.: behind some level of coercions) holds a SubscriptingRef node.
*
* Returns the original SubscriptingRef node on success or NULL otherwise.
*
* Note that it wouldn't add much value to use expression_tree_walker here
* since we are only interested in a subset of the fields of a few certain
* node types.
*/
static SubscriptingRef *
TargetEntryExprFindSubsRef(Expr *expr)
{
Node *node = (Node *) expr;
while (node)
{
if (IsA(node, FieldStore))
{
/*
* ModifyPartialQuerySupported doesn't allow INSERT/UPDATE via
* FieldStore. If we decide supporting such commands, then we
* should take the first element of "newvals" list into account
* here. This is because, to support such commands, we will need
* to expand merged FieldStore into separate target entries too.
*
* For this reason, this block is not reachable atm and need to
* uncomment the following if we decide supporting such commands.
*
* """
* FieldStore *fieldStore = (FieldStore *) node;
* node = (Node *) linitial(fieldStore->newvals);
* """
*/
ereport(ERROR, (errmsg("unexpectedly got FieldStore object when "
"generating shard query")));
}
else if (IsA(node, CoerceToDomain))
{
CoerceToDomain *coerceToDomain = (CoerceToDomain *) node;
if (coerceToDomain->coercionformat != COERCE_IMPLICIT_CAST)
{
/* not an implicit coercion, cannot reach to a SubscriptingRef */
break;
}

node = (Node *) coerceToDomain->arg;
}
else if (IsA(node, SubscriptingRef))
{
return (SubscriptingRef *) node;
}
else
{
/* got a node that we are not interested in */
break;
}
}

return NULL;
}
2 changes: 2 additions & 0 deletions src/backend/distributed/deparser/ruleutils_12.c
Original file line number Diff line number Diff line change
Expand Up @@ -3324,6 +3324,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
SubLink *cur_ma_sublink;
List *ma_sublinks;

targetList = ExpandMergedSubscriptingRefEntries(targetList);

/*
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
* into a list. We expect them to appear, in ID order, in resjunk tlist
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/deparser/ruleutils_13.c
Original file line number Diff line number Diff line change
Expand Up @@ -3263,6 +3263,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
SubLink *cur_ma_sublink;
List *ma_sublinks;

targetList = ExpandMergedSubscriptingRefEntries(targetList);

/*
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
* into a list. We expect them to appear, in ID order, in resjunk tlist
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/deparser/ruleutils_14.c
Original file line number Diff line number Diff line change
Expand Up @@ -3439,6 +3439,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
SubLink *cur_ma_sublink;
List *ma_sublinks;

targetList = ExpandMergedSubscriptingRefEntries(targetList);

/*
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
* into a list. We expect them to appear, in ID order, in resjunk tlist
Expand Down
7 changes: 6 additions & 1 deletion src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,12 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
if (FindNodeMatchingCheckFunction((Node *) targetEntry->expr,
NodeIsFieldStore))
{
/* DELETE cannot do field indirection already */
/*
* DELETE cannot do field indirection already, so assert here.
*
* NB: See TargetEntryExprFindSubsRef if you decide removing
* this error check.
*/
Assert(commandType == CMD_UPDATE || commandType == CMD_INSERT);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"inserting or modifying composite type fields is not "
Expand Down
17 changes: 17 additions & 0 deletions src/backend/distributed/utils/listutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,20 @@ GenerateListFromElement(void *listElement, int listLength)

return list;
}


/*
* list_reverse returns a new list by reverting order of the elements within
* given list.
*/
List *
list_reverse(const List *list)
{
List *newList = NIL;
for (int i = list_length(list) - 1; i >= 0; i--)
{
newList = lappend(newList, list_nth(list, i));
}

return newList;
}
1 change: 1 addition & 0 deletions src/include/distributed/citus_ruleutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
extern List * ExpandMergedSubscriptingRefEntries(List *targetEntryList);

/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/listutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,6 @@ extern List * ListTake(List *pointerList, int size);
extern void * safe_list_nth(const List *list, int index);
extern List * GeneratePositiveIntSequenceList(int upTo);
extern List * GenerateListFromElement(void *listElement, int listLength);
extern List * list_reverse(const List *list);

#endif /* CITUS_LISTUTILS_H */
4 changes: 0 additions & 4 deletions src/test/regress/expected/multi_deparse_shard_query.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
--
SET citus.next_shard_id TO 13100000;
SET citus.shard_replication_factor TO 1;
CREATE FUNCTION deparse_shard_query_test(text)
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT;
-- create the first table
CREATE TABLE raw_events_1
(tenant_id bigint,
Expand Down
4 changes: 4 additions & 0 deletions src/test/regress/expected/multi_test_helpers.out
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ BEGIN
END LOOP;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION deparse_shard_query_test(text)
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT;
77 changes: 77 additions & 0 deletions src/test/regress/expected/pg14.out
Original file line number Diff line number Diff line change
Expand Up @@ -1270,5 +1270,82 @@ SELECT count(*) FROM
1
(1 row)

CREATE TABLE jsonb_subscript_update (id INT, data JSONB);
SELECT create_distributed_table('jsonb_subscript_update', 'id');
create_distributed_table
---------------------------------------------------------------------

(1 row)

INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}');
UPDATE jsonb_subscript_update
SET data['b'] = updated_vals.b::TEXT::jsonb,
data['c'] = updated_vals.c::TEXT::jsonb,
data['d'] = updated_vals.d::TEXT::jsonb
FROM (
SELECT id,
data['a'] AS a,
data['a']::NUMERIC + 1 AS b,
data['a']::NUMERIC + 2 AS c,
data['a']::NUMERIC + 3 AS d
FROM jsonb_subscript_update
) updated_vals
WHERE jsonb_subscript_update.id = updated_vals.id;
SELECT * FROM jsonb_subscript_update ORDER BY 1,2;
id | data
---------------------------------------------------------------------
1 | {"a": 1, "b": 2, "c": 3, "d": 4}
2 | {"a": 2, "b": 3, "c": 4, "d": 5}
(2 rows)

TRUNCATE jsonb_subscript_update;
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}'), (4, '{"a": 4, "b": 10}');
ALTER TABLE jsonb_subscript_update ADD CONSTRAINT pkey PRIMARY KEY (id, data);
INSERT INTO jsonb_subscript_update VALUES (1, '{"a": 1}'), (2, '{"a": 2}')
ON CONFLICT (id, data)
DO UPDATE SET data['d']=(jsonb_subscript_update.data['a']::INT*100)::TEXT::JSONB,
data['b']=(jsonb_subscript_update.data['a']::INT*-100)::TEXT::JSONB;
SELECT * FROM jsonb_subscript_update ORDER BY 1,2;
id | data
---------------------------------------------------------------------
1 | {"a": 1, "b": -100, "d": 100}
2 | {"a": 2, "b": -200, "d": 200}
4 | {"a": 4, "b": 10}
(3 rows)

CREATE TABLE nested_obj_update(id INT, data JSONB, text_col TEXT);
SELECT create_distributed_table('nested_obj_update', 'id');
create_distributed_table
---------------------------------------------------------------------

(1 row)

INSERT INTO nested_obj_update VALUES
(1, '{"a": [1,2,3], "b": [4,5,6], "c": [7,8,9], "d": [1,2,1,2]}', '4'),
(2, '{"a": [10,20,30], "b": [41,51,61], "c": [72,82,92], "d": [11,21,11,21]}', '6');
UPDATE nested_obj_update
SET data['a'][0] = (updated_vals.b * 1)::TEXT::JSONB,
data['b'][2] = (updated_vals.c * 2)::TEXT::JSONB,
data['c'][0] = (updated_vals.d * 3)::TEXT::JSONB,
text_col = (nested_obj_update.id*1000)::TEXT,
data['a'][0] = (text_col::INT * data['a'][0]::INT)::TEXT::JSONB,
data['d'][6] = (nested_obj_update.id*1)::TEXT::JSONB,
data['d'][4] = (nested_obj_update.id*2)::TEXT::JSONB
FROM (
SELECT id,
data['a'][0] AS a,
data['b'][0]::NUMERIC + 1 AS b,
data['c'][0]::NUMERIC + 2 AS c,
data['c'][1]::NUMERIC + 3 AS d
FROM nested_obj_update
) updated_vals
WHERE nested_obj_update.id = updated_vals.id;
SELECT * FROM nested_obj_update ORDER BY 1,2,3;
id | data | text_col
---------------------------------------------------------------------
1 | {"a": [4, 2, 3], "b": [4, 5, 18], "c": [33, 8, 9], "d": [1, 2, 1, 2, 2, null, 1]} | 1000
2 | {"a": [60, 20, 30], "b": [41, 51, 148], "c": [255, 82, 92], "d": [11, 21, 11, 21, 4, null, 2]} | 2000
(2 rows)

set client_min_messages to error;
drop schema pg14 cascade;
Loading

0 comments on commit dc29396

Please sign in to comment.