Skip to content

Commit

Permalink
[#24692, #24784] YSQL: Refactor INSERT ... ON CONFLICT batching - Par…
Browse files Browse the repository at this point in the history
…t 1 + Bug Fixes

Summary:
D38354 introduced batching support for INSERT ... ON CONFLICT queries.
This revision seeks to address a subset of the design comments on that revision. In particular this refactor aims to:
 - Better integrate with/reuse existing YBCTID libraries for identifying unique index tuples.
 - Fix inconsistent results produced by different batch sizes.
 - Fix visibility issues observed in queries with a pipeline pattern (multiple CTEs, each doing a ModifyTable, each with a returning clause, feeding into the next)

**Summary of changes**
- Reuse existing YBCTID container infrastructure in pggate for storing tuple ID of “on-conflict” tuples.
- Use separate containers for storing batch-read tuples and “just-inserted” tuples.
- Move logic to update map of “just-inserted” tuples from execIndexing (at the time of enqueueing the index request) to nodeModifyTable

**Reusing existing YBCTID container infrastructure**

This revision reuses the YBCTID infrastructure that is used by the foreign key and explicit row locking buffers that already exists today. To do this, the key columns of unique (and non-NULL) indexes are treated as YBCTIDs. A session-level container is created to store the mapping between the IDs of tuples read by the index scan and their corresponding table slots. This design allows us to:

- Reuse existing YBCTID helper functions for equality comparisons.
- Mitigate tuple visibility issues through the creation of the global map when multiple INSERT … ON CONFLICT CTEs are chained together.
- Inherit benefits from improvements to YBCTID processing logic in the future.

**Use of separate containers to store batch-read tuples and “just-inserted” tuples**

The index batch-read workflow requires mapping key columns of a unique index to the tuple returned by the index scan. To check if the same row is modified twice, identifiers (ie. key columns) to index tuples are required to be stored. While the data stored is similar (hash of the key columns of the index), the data can be stored in two separate structures: a **hash map** for the index batch-read, and a **hash set** for the list of modified tuples. This design allows us to:

- Separate the scaling of the two data structures. If no restriction is placed on the size of the hash set, we can detect "command cannot affect row a second time" errors across batches of writes.
- Remain consistent with the design of other buffers such as the foreign key buffer. The foreign key buffer currently has no restriction on how many YBCTID intents are placed within its hash set.
- Cleanly remove the **hash set** when DocDB natively supports detection of tuples that are modified multiple times by the same query.

**Rework logic of when entries are added/deleted to on-conflict map**

The previous design adds and deletes entries to/from the on-conflict map when the payload for the index insert/delete operation is prepared (in ExecIndexing.c). This suffers from the following drawbacks:

- Index updates (DELETEs + INSERTs) may be skipped. If the update of an arbiter index is skipped, it is incorrectly not added/deleted from the on-conflict map.
- Primary key and secondary index modifications (insert/delete/update) have two distinct paths. This means that the previous design would have had to replicate this logic in both code paths, which it did not. This duplication is not required in the current design.

This revision reworks the above design to add/delete entries to the map in two different places (in nodeModifyTable.c):

- During on-conflict checking IF it is decided if the tuple will be inserted. Note that at this stage, the DO UPDATE projections have not been carried out (so we don’t know whether the updated values change the index keys).
- After the updated values have been projected if it is decided that the tuple will be updated.

This design currently has the drawback that the index tuple will have to be constructed one extra time (`FormIndexDatum()`) per tuple for tuples that are to be updated. This is particularly significant for expression indexes as it would involve evaluating the expression an extra time. This can be fixed for non-expression indexes by directly copying over the index key columns from the tuple slot. However, the fix for expression indexes requires storing and propagating the index tuple which is a large change relative to the benefit it offers.

**Future Work**
A second revision is planned that will:
- Add support for RETURNING clause
- Add limited support for foreign key relationships
- Fix bugs related to partitioned tables
Jira: DB-13766, DB-13883

Test Plan:
On Almalinux 8:

```
#!/usr/bin/env bash
set -euo pipefail
./yb_build.sh fastdebug --gcc11
find java/yb-pgsql/src/test/java/org/yb/pgsql -name 'TestPgRegressInsertOnConflict*' \
| grep -oE 'TestPgRegress\w+' \
| while read -r testname; do
  ./yb_build.sh fastdebug --gcc11 --java-test "$testname" --sj
done
```

Reviewers: amartsinchyk, smishra, jason, telgersma

Reviewed By: jason, telgersma

Subscribers: jason, smishra, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D39023
  • Loading branch information
karthik-ramanathan-3006 committed Nov 6, 2024
1 parent 841f7bf commit 3f2ac8c
Show file tree
Hide file tree
Showing 23 changed files with 1,381 additions and 510 deletions.
1 change: 0 additions & 1 deletion src/postgres/src/backend/executor/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ OBJS = \
nodeYbBitmapIndexscan.o \
nodeYbBitmapTablescan.o \
nodeYbSeqscan.o \
ybInsertOnConflictBatchingMap.o \
ybOptimizeModifyTable.o \
ybcExpr.o \
ybcFunction.o \
Expand Down
167 changes: 80 additions & 87 deletions src/postgres/src/backend/executor/execIndexing.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@

/* Yugabyte includes */
#include "catalog/pg_am_d.h"
#include "executor/ybInsertOnConflictBatchingMap.h"
#include "executor/ybcModifyTable.h"
#include "funcapi.h"
#include "utils/relcache.h"
Expand Down Expand Up @@ -286,8 +285,7 @@ YbExecDoInsertIndexTuple(ResultRelInfo *resultRelInfo,
bool *specConflict,
List *arbiterIndexes,
bool update,
ItemPointer tupleid,
struct yb_insert_on_conflict_batching_hash *ybConflictMap)
ItemPointer tupleid)
{
bool applyNoDupErr;
IndexUniqueCheck checkUnique;
Expand All @@ -312,18 +310,6 @@ YbExecDoInsertIndexTuple(ResultRelInfo *resultRelInfo,
values,
isnull);

if (ybConflictMap)
{
int indnkeyatts =
IndexRelationGetNumberOfKeyAttributes(indexRelation);

YbInsertOnConflictBatchingMapInsert(ybConflictMap,
indnkeyatts,
values,
isnull,
NULL /* slot */);
}

/*
* After updating INSERT ON CONFLICT batching map, PK is no longer
* relevant from here on.
Expand Down Expand Up @@ -549,8 +535,7 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
* TODO(neil) The following YB check might not be needed due to later work on indexes.
* We keep this check for now as this bugfix will be backported to ealier releases.
*/
if (isYBRelation && YBIsCoveredByMainTable(indexRelation) &&
!YbIsInsertOnConflictReadBatchingEnabled(resultRelInfo))
if (isYBRelation && YBIsCoveredByMainTable(indexRelation))
continue;

/* If the index is marked as read-only, ignore it */
Expand All @@ -567,10 +552,7 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
if (YbExecDoInsertIndexTuple(resultRelInfo, indexRelation, indexInfo,
slot, estate, noDupErr,
specConflict, arbiterIndexes, update,
tupleid,
(resultRelInfo->ri_YbConflictMap ?
resultRelInfo->ri_YbConflictMap[i] :
NULL)))
tupleid))
result = lappend_oid(result, RelationGetRelid(indexRelation));
}

Expand All @@ -595,8 +577,7 @@ YbExecDoDeleteIndexTuple(ResultRelInfo *resultRelInfo,
IndexInfo *indexInfo,
TupleTableSlot *slot,
Datum ybctid,
EState *estate,
struct yb_insert_on_conflict_batching_hash *ybConflictMap)
EState *estate)
{
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
Expand All @@ -621,17 +602,6 @@ YbExecDoDeleteIndexTuple(ResultRelInfo *resultRelInfo,
indexInfo); /* index AM may need this */
MemoryContextSwitchTo(oldContext);
}

if (ybConflictMap)
{
int indnkeyatts =
IndexRelationGetNumberOfKeyAttributes(indexRelation);

YbInsertOnConflictBatchingMapDelete(ybConflictMap,
indnkeyatts,
values,
isnull);
}
}

/* ----------------------------------------------------------------
Expand Down Expand Up @@ -696,8 +666,7 @@ ExecDeleteIndexTuples(ResultRelInfo *resultRelInfo, Datum ybctid, HeapTuple tupl
* - As a result, we don't need distinguish between Postgres and YugaByte here.
* I update this code only for clarity.
*/
if (isYBRelation && YBIsCoveredByMainTable(indexRelation) &&
!YbIsInsertOnConflictReadBatchingEnabled(resultRelInfo))
if (isYBRelation && YBIsCoveredByMainTable(indexRelation))
continue;

indexInfo = indexInfoArray[i];
Expand Down Expand Up @@ -725,10 +694,7 @@ ExecDeleteIndexTuples(ResultRelInfo *resultRelInfo, Datum ybctid, HeapTuple tupl
}

YbExecDoDeleteIndexTuple(resultRelInfo, indexRelation, indexInfo,
slot, ybctid, estate,
(resultRelInfo->ri_YbConflictMap ?
resultRelInfo->ri_YbConflictMap[i] :
NULL));
slot, ybctid, estate);
}

/* Drop the temporary slot */
Expand Down Expand Up @@ -849,8 +815,7 @@ YbExecUpdateIndexTuples(ResultRelInfo *resultRelInfo,
* Primary key is a part of the base relation in Yugabyte and does not
* need to be updated here.
*/
if (YBIsCoveredByMainTable(indexRelation) &&
!YbIsInsertOnConflictReadBatchingEnabled(resultRelInfo))
if (YBIsCoveredByMainTable(indexRelation))
continue;

indexInfo = indexInfoArray[i];
Expand Down Expand Up @@ -1039,10 +1004,7 @@ YbExecUpdateIndexTuples(ResultRelInfo *resultRelInfo,
index = lfirst_int(lc);
YbExecDoDeleteIndexTuple(resultRelInfo, relationDescs[index],
indexInfoArray[index], deleteSlot, ybctid,
estate,
(resultRelInfo->ri_YbConflictMap ?
resultRelInfo->ri_YbConflictMap[index] :
NULL));
estate);
}

econtext->ecxt_scantuple = slot;
Expand All @@ -1055,10 +1017,7 @@ YbExecUpdateIndexTuples(ResultRelInfo *resultRelInfo,
NULL /* specConflict */,
NIL /* arbiterIndexes */,
true /* update */,
tupleid,
(resultRelInfo->ri_YbConflictMap ?
resultRelInfo->ri_YbConflictMap[index] :
NULL)))
tupleid))
result = lappend_oid(result, RelationGetRelid(relationDescs[index]));
}

Expand Down Expand Up @@ -1129,36 +1088,13 @@ ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
for (i = 0; i < numIndices; i++)
{
Relation indexRelation = relationDescs[i];
IndexInfo *indexInfo;
IndexInfo *indexInfo = indexInfoArray[i];
bool satisfiesConstraint;

if (indexRelation == NULL)
if (!YbShouldCheckUniqueOrExclusionIndex(indexInfo, indexRelation,
heapRelation, arbiterIndexes))
continue;

indexInfo = indexInfoArray[i];
Assert(indexInfo->ii_ReadyForInserts ==
indexRelation->rd_index->indisready);

if (!indexInfo->ii_Unique && !indexInfo->ii_ExclusionOps)
continue;

/* If the index is marked as read-only, ignore it */
if (!indexInfo->ii_ReadyForInserts)
continue;

/* When specific arbiter indexes requested, only examine them */
if (arbiterIndexes != NIL &&
!list_member_oid(arbiterIndexes,
indexRelation->rd_index->indexrelid))
continue;

if (!indexRelation->rd_index->indimmediate)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ON CONFLICT does not support deferrable unique constraints/exclusion constraints as arbiters"),
errtableconstraint(heapRelation,
RelationGetRelationName(indexRelation))));

checkedIndex = true;

if (YbIsInsertOnConflictReadBatchingEnabled(resultRelInfo))
Expand Down Expand Up @@ -1860,12 +1796,6 @@ yb_batch_fetch_conflicting_rows(int idx, ResultRelInfo *resultRelInfo,
return;
}

/* Create an ON CONFLICT batching map. */
resultRelInfo->ri_YbConflictMap[idx] =
YbInsertOnConflictBatchingMapCreate(estate->es_query_cxt,
resultRelInfo->ri_BatchSize,
index->rd_att);

/*
* Create the array used for the RHS of the batch read RPC.
* Parts copied from ExecEvalArrayExpr.
Expand Down Expand Up @@ -1968,6 +1898,7 @@ yb_batch_fetch_conflicting_rows(int idx, ResultRelInfo *resultRelInfo,
{
Datum existing_values[INDEX_MAX_KEYS];
bool existing_isnull[INDEX_MAX_KEYS];
MemoryContext oldcontext;

/*
* Extract the index column values and isnull flags from the existing
Expand All @@ -1976,11 +1907,19 @@ yb_batch_fetch_conflicting_rows(int idx, ResultRelInfo *resultRelInfo,
FormIndexDatum(indexInfo, existing_slot, estate,
existing_values, existing_isnull);

YbInsertOnConflictBatchingMapInsert(resultRelInfo->ri_YbConflictMap[idx],
indnkeyatts,
existing_values,
existing_isnull,
existing_slot);
/*
* Irrespective of how distinctness of NULLs are treated by the index,
* the index keys having NULL values are filtered out above, and will
* not be a part of the index scan result.
*/
Assert(!YbIsAnyIndexKeyColumnNull(indexInfo, existing_isnull));

oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
YBCPgInsertOnConflictKeyInfo info = {existing_slot};
YBCPgYBTupleIdDescriptor *descr =
YBCBuildNonNullUniqueIndexYBTupleId(index, existing_values);
HandleYBStatus(YBCPgAddInsertOnConflictKey(descr, &info));
MemoryContextSwitchTo(oldcontext);

existing_slot = table_slot_create(heap, NULL);
econtext->ecxt_scantuple = existing_slot;
Expand All @@ -1991,3 +1930,57 @@ yb_batch_fetch_conflicting_rows(int idx, ResultRelInfo *resultRelInfo,
econtext->ecxt_scantuple = save_scantuple;
ExecDropSingleTupleTableSlot(existing_slot);
}

bool
YbIsAnyIndexKeyColumnNull(IndexInfo *indexInfo, bool isnull[INDEX_MAX_KEYS])
{
for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
if (isnull[i])
return true;
}

return false;
}

/*
* YbShouldCheckUniqueOrExclusionIndex
*
* Function to determine if the given index satisfies prerequisites for a
* unique or exclusion constraint check.
* Logic has been lifted from ExecCheckIndexConstraints.
*/
bool
YbShouldCheckUniqueOrExclusionIndex(IndexInfo *indexInfo,
Relation indexRelation,
Relation heapRelation,
List *arbiterIndexes)
{
if (indexRelation == NULL)
return false;

Assert(indexInfo->ii_ReadyForInserts ==
indexRelation->rd_index->indisready);

if (!indexInfo->ii_Unique && !indexInfo->ii_ExclusionOps)
return false;

/* If the index is marked as read-only, ignore it */
if (!indexInfo->ii_ReadyForInserts)
return false;

/* When specific arbiter indexes requested, only examine them */
if (arbiterIndexes != NIL &&
!list_member_oid(arbiterIndexes,
indexRelation->rd_index->indexrelid))
return false;

if (!indexRelation->rd_index->indimmediate)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ON CONFLICT does not support deferrable unique constraints/exclusion constraints as arbiters"),
errtableconstraint(heapRelation,
RelationGetRelationName(indexRelation))));

return true;
}
Loading

0 comments on commit 3f2ac8c

Please sign in to comment.