Skip to content

Commit

Permalink
[columnar] Vectorized direct aggregates (#143)
Browse files Browse the repository at this point in the history
Handle vacuum process with only one full stripe

* If there is only one stripe that is populated with maximum number of
  rows do nothing.

Vector read should count only non deleted rows

* When reading next vector for processing we should count only rows that are not deleted.

Vectorized direct aggregates

* Supported version 14/15
* Reused nodeAgg.c to handle vectorized aggregates
* Implementation of aggregate function to handle vector input

Add O3 optimization

* Uses O3 optimization. Flag will be used when `COLUMNAR_O3` is set.
  • Loading branch information
mkaruza authored Sep 12, 2023
1 parent 0151073 commit 5f97011
Show file tree
Hide file tree
Showing 28 changed files with 10,728 additions and 137 deletions.
7 changes: 6 additions & 1 deletion columnar/src/backend/columnar/Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
citus_subdir = src/backend/columnar
citus_top_builddir = ../../..
safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib
SUBDIRS = . safeclib vectorization vectorization/types
SUBDIRS = . safeclib vectorization vectorization/types vectorization/nodes
SUBDIRS +=
ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS))
OBJS += \
$(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c))))

MODULE_big = columnar
PG_CPPFLAGS += -I$(libpq_srcdir) -I$(safestringlib_srcdir)/include

ifdef COLUMNAR_O3
PG_CFLAGS += -O3
endif

EXTENSION = columnar

template_sql_files = $(patsubst $(citus_abs_srcdir)/%,%,$(wildcard $(citus_abs_srcdir)/sql/*.sql))
Expand Down
9 changes: 6 additions & 3 deletions columnar/src/backend/columnar/columnar.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_PG_LZ
#endif

static void columnar_guc_init(void);

int columnar_compression = DEFAULT_COMPRESSION_TYPE;
int columnar_stripe_row_limit = DEFAULT_STRIPE_ROW_COUNT;
int columnar_chunk_group_row_limit = DEFAULT_CHUNK_ROW_COUNT;
Expand Down Expand Up @@ -64,13 +66,14 @@ static const struct config_enum_entry columnar_compression_options[] =
void
columnar_init(void)
{
columnar_init_gucs();
columnar_guc_init();
columnar_tableam_init();
columnar_planner_init();
}


void
columnar_init_gucs()
static void
columnar_guc_init()
{
DefineCustomEnumVariable("columnar.compression",
"Compression type for columnar.",
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/backend/columnar/columnar.control
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
comment = 'Hydra Columnar extension'
default_version = '11.1-8'
default_version = '11.1-9'
module_pathname = '$libdir/columnar'
relocatable = false
144 changes: 114 additions & 30 deletions columnar/src/backend/columnar/columnar_customscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ typedef struct ColumnarScanState
struct
{
bool vectorizationEnabled;
bool vectorizationAggregate;
TupleTableSlot *scanVectorSlot;
TupleTableSlot *resultVectorSlot;
uint32 vectorPendingRowNumber;
uint32 vectorRowIndex;
List *vectorizedQualList;
List *constructedVectorizedQualList;
List *attrNeededList;
} vectorization;

/* Scan snapshot*/
Expand Down Expand Up @@ -232,6 +235,13 @@ static const struct config_enum_entry debug_level_options[] = {
};


const CustomScanMethods *
columnar_customscan_methods(void)
{
return &ColumnarScanScanMethods;
}


/*
* columnar_customscan_init installs the hook required to intercept the postgres planner and
* provide extra paths for columnar tables
Expand Down Expand Up @@ -440,8 +450,6 @@ ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,

if (EnableColumnarCustomScan)
{
ereport(DEBUG1, (errmsg("pathlist hook for columnar table am")));

/*
* When columnar custom scan is enabled (columnar.enable_custom_scan),
* we only consider ColumnarScanPath's & IndexPath's. For this reason,
Expand Down Expand Up @@ -1899,13 +1907,41 @@ ColumnarScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int ef
if (lthird(cscan->custom_exprs) != NIL)
columnarScanState->vectorization.vectorizedQualList = lthird(cscan->custom_exprs);

ListCell *lc;
foreach(lc, cscan->custom_private)
{
Const *privateCustomData = lfirst(lc);
if (privateCustomData->consttype == CUSTOM_SCAN_VECTORIZED_AGGREGATE)
{
columnarScanState->vectorization.vectorizationAggregate =
privateCustomData->constvalue;
}
}

/*
* Vectorization is enabled if global variable is set and there is at least one
* filter which can be vectorized.
*/
columnarScanState->vectorization.vectorizationEnabled =
columnar_enable_vectorization &&
columnarScanState->vectorization.vectorizedQualList != NULL;
(columnarScanState->vectorization.vectorizedQualList != NULL ||
columnarScanState->vectorization.vectorizationAggregate);

if (columnarScanState->vectorization.vectorizationAggregate)
{
ScanState *node = (ScanState *) &columnarScanState->custom_scanstate.ss;

if (node->ps.ps_ProjInfo)
{
columnarScanState->vectorization.resultVectorSlot =
CreateVectorTupleTableSlot(node->ps.ps_ProjInfo->pi_state.resultslot->tts_tupleDescriptor);
}
else
{
columnarScanState->vectorization.resultVectorSlot =
CreateVectorTupleTableSlot(node->ps.ps_ResultTupleDesc);
}
}

if (columnarScanState->vectorization.vectorizationEnabled)
{
Expand All @@ -1916,6 +1952,13 @@ ColumnarScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int ef
columnarScanState->attrNeeded =
ColumnarAttrNeeded(&cscanstate->ss, columnarScanState->vectorization.vectorizedQualList);

int bmsMember = -1;
while ((bmsMember = bms_next_member(columnarScanState->attrNeeded, bmsMember)) >= 0)
{
columnarScanState->vectorization.attrNeededList =
lappend_int(columnarScanState->vectorization.attrNeededList, bmsMember);
}

/*
* If we have pending changes that need to be flushed (row_mask after update/delete)
* or new stripe we need to to them here because sequential columnar scan
Expand Down Expand Up @@ -1981,14 +2024,11 @@ ColumnarAttrNeeded(ScanState *ss, List *customList)

if (var->varattno == 0 )
{
elog(DEBUG1, "Need attribute: all");

/* all attributes are required, we don't need to add more so break*/
attr_needed = bms_add_range(attr_needed, 0, natts - 1);
break;
}

elog(DEBUG1, "Need attribute: %d", var->varattno);
attr_needed = bms_add_member(attr_needed, var->varattno - 1);
}

Expand Down Expand Up @@ -2140,6 +2180,15 @@ CustomExecScan(ColumnarScanState *columnarScanState,
* storage allocated in the previous tuple cycle.
*/
ResetExprContext(econtext);

uint resultVectorTupleSlotIdx = 0;

if (columnarScanState->vectorization.vectorizationAggregate)
{
ExecClearTuple(columnarScanState->vectorization.resultVectorSlot);
CleanupVectorSlot((VectorTupleTableSlot *)
columnarScanState->vectorization.resultVectorSlot);
}

/*
* get a tuple from the access method. Loop until we obtain a tuple that
Expand All @@ -2149,13 +2198,25 @@ CustomExecScan(ColumnarScanState *columnarScanState,
{
TupleTableSlot *slot = NULL;

if (columnarScanState->vectorization.vectorizationEnabled &&
columnarScanState->vectorization.vectorPendingRowNumber == 0 &&
resultVectorTupleSlotIdx > 0)
{
((VectorTupleTableSlot *) columnarScanState->vectorization.resultVectorSlot)->dimension = resultVectorTupleSlotIdx;
ExecStoreVirtualTuple(columnarScanState->vectorization.resultVectorSlot);
resultVectorTupleSlotIdx = 0;
return columnarScanState->vectorization.resultVectorSlot;
}

/*
* No vectorizaion in place so fetch slot by slot.
*/
if (!columnarScanState->vectorization.vectorizationEnabled)
{
slot = ExecScanFetch(node, accessMtd, recheckMtd);

resultVectorTupleSlotIdx = 0;

/*
* if the slot returned by the accessMtd contains NULL, then it means
* there is nothing more to scan so we just return an empty slot,
Expand Down Expand Up @@ -2193,27 +2254,37 @@ CustomExecScan(ColumnarScanState *columnarScanState,
return slot;
}

if (columnarScanState->vectorization.constructedVectorizedQualList == NULL)
if (columnarScanState->vectorization.vectorizedQualList != NULL)
{
columnarScanState->vectorization.constructedVectorizedQualList =
ConstructVectorizedQualList(slot, columnarScanState->vectorization.vectorizedQualList);
}

VectorTupleTableSlot *vectorSlot = (VectorTupleTableSlot *) slot;
if (columnarScanState->vectorization.constructedVectorizedQualList == NULL)
{
columnarScanState->vectorization.constructedVectorizedQualList =
ConstructVectorizedQualList(slot, columnarScanState->vectorization.vectorizedQualList);
}

VectorTupleTableSlot *vectorSlot = (VectorTupleTableSlot *) slot;

bool *resultQual =
ExecuteVectorizedQual(slot,
columnarScanState->vectorization.constructedVectorizedQualList,
AND_EXPR);

int i;
for (i = 0; i < vectorSlot->dimension; i++)
bool *resultQual =
ExecuteVectorizedQual(slot,
columnarScanState->vectorization.constructedVectorizedQualList,
AND_EXPR);

memcpy(vectorSlot->keep, resultQual, COLUMNAR_VECTOR_COLUMN_SIZE);
}
/*
* No qual, no vectorized qual, no projection but we need to return vector
* for vectorized aggregate
*/
else if (!qual && !projInfo && columnarScanState->vectorization.vectorizationAggregate)
{
if (!resultQual[i])
vectorSlot->skip[i] = true;
columnarScanState->vectorization.vectorPendingRowNumber = 0;
return slot;
}
}

uint64 rowNumber = 0;

/*
* Get next tuple from vector tuple slot if vectorization execution
* is enabled.
Expand All @@ -2228,14 +2299,18 @@ CustomExecScan(ColumnarScanState *columnarScanState,
while (columnarScanState->vectorization.vectorPendingRowNumber != 0 &&
rowFound == false)
{
if (!vectorSlot->skip[columnarScanState->vectorization.vectorRowIndex])
if (vectorSlot->keep[columnarScanState->vectorization.vectorRowIndex])
{
slot = columnarScanState->custom_scanstate.ss.ss_ScanTupleSlot;
ExecClearTuple(slot);
extractTupleFromVectorSlot(slot,
ExtractTupleFromVectorSlot(slot,
vectorSlot,
columnarScanState->vectorization.vectorRowIndex,
columnarScanState->attrNeeded);
columnarScanState->vectorization.attrNeededList);

rowNumber = vectorSlot->rowNumber[columnarScanState->vectorization.vectorRowIndex];
if (!columnarScanState->vectorization.vectorizationAggregate)
slot->tts_tid = row_number_to_tid(rowNumber);
rowFound = true;
}
columnarScanState->vectorization.vectorPendingRowNumber--;
Expand Down Expand Up @@ -2269,13 +2344,21 @@ CustomExecScan(ColumnarScanState *columnarScanState,
* Form a projection tuple, store it in the result tuple slot
* and return it.
*/
return ExecProject(projInfo);
slot = ExecProject(projInfo);
}

if (columnarScanState->vectorization.vectorizationAggregate)
{
WriteTupleToVectorSlot(slot,
(VectorTupleTableSlot *) columnarScanState->vectorization.resultVectorSlot,
resultVectorTupleSlotIdx);
((VectorTupleTableSlot *)
columnarScanState->vectorization.resultVectorSlot)->rowNumber[resultVectorTupleSlotIdx] = rowNumber;
resultVectorTupleSlotIdx++;
continue;
}
else
{
/*
* Here, we aren't projecting, so just return scan tuple.
*/
return slot;
}
}
Expand Down Expand Up @@ -2337,10 +2420,10 @@ ColumnarScanNext(ColumnarScanState *columnarScanState)
{
VectorTupleTableSlot *vectorSlot = (VectorTupleTableSlot *) slot;
int attrIndex = -1;
while ((attrIndex = bms_next_member(columnarScanState->attrNeeded, attrIndex)) >= 0)
foreach_int(attrIndex, columnarScanState->vectorization.attrNeededList)
{
VectorColumn *column = (VectorColumn *) vectorSlot->tts.tts_values[attrIndex];
memset(column->isnull, 1, COLUMNAR_VECTOR_COLUMN_SIZE);
memset(column->isnull, true, COLUMNAR_VECTOR_COLUMN_SIZE);
column->dimension = 0;
}
vectorSlot->dimension = 0;
Expand Down Expand Up @@ -2482,7 +2565,8 @@ ColumnarScan_ExplainCustomScan(CustomScanState *node, List *ancestors,
}
}

if (columnarScanState->vectorization.vectorizationEnabled)
if (columnarScanState->vectorization.vectorizationEnabled &&
columnarScanState->vectorization.vectorizedQualList != NULL)
{
const char *vectorizedWhereClauses = ColumnarPushdownClausesStr(
context, columnarScanState->vectorization.vectorizedQualList);
Expand Down
Loading

0 comments on commit 5f97011

Please sign in to comment.