Skip to content

Commit

Permalink
Take: Support AC->C cases directly in "array_take" with a simple stra…
Browse files Browse the repository at this point in the history
…tegy
  • Loading branch information
felipecrv committed Jun 15, 2024
1 parent 4f7e39c commit 30877b4
Showing 1 changed file with 66 additions and 29 deletions.
95 changes: 66 additions & 29 deletions cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -768,20 +768,15 @@ class TakeMetaFunction : public MetaFunction {
return result.chunked_array();
}

static Result<std::shared_ptr<ChunkedArray>> TakeACC(const Array& values,
const ChunkedArray& indices,
static Result<std::shared_ptr<ChunkedArray>> TakeACC(const std::vector<Datum>& args,
const TakeOptions& options,
ExecContext* ctx) {
auto num_chunks = indices.num_chunks();
std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
std::vector<Datum> args = {values, /*placeholder*/ {}};
for (int i = 0; i < num_chunks; i++) {
// Take with that indices chunk
args[1] = indices.chunk(i);
ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx));
new_chunks[i] = MakeArray(chunk);
}
return std::make_shared<ChunkedArray>(std::move(new_chunks), values.type());
// "array_take" can handle AC->C cases directly
// (via their VectorKernel::chunked_exec)
DCHECK_EQ(args[0].kind(), Datum::ARRAY);
DCHECK_EQ(args[1].kind(), Datum::CHUNKED_ARRAY);
ARROW_ASSIGN_OR_RAISE(auto result, CallArrayTake(args, options, ctx));
return result.chunked_array();
}

static Result<std::shared_ptr<RecordBatch>> TakeRAR(const RecordBatch& batch,
Expand Down Expand Up @@ -835,10 +830,10 @@ class TakeMetaFunction : public MetaFunction {
const auto& take_opts = static_cast<const TakeOptions&>(*options);
switch (args[0].kind()) {
case Datum::ARRAY:
if (index_kind == Datum::ARRAY) {
return TakeAAA(args, take_opts, ctx);
} else if (index_kind == Datum::CHUNKED_ARRAY) {
return TakeACC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx);
// "array_take" can handle AA->A and AC->C cases directly
// (via their VectorKernel::exec and VectorKernel::chunked_exec)
if (index_kind == Datum::ARRAY || index_kind == Datum::CHUNKED_ARRAY) {
return CallArrayTake(args, take_opts, ctx);
}
break;
case Datum::CHUNKED_ARRAY:
Expand Down Expand Up @@ -916,24 +911,40 @@ Status CallCAAKernel(VectorKernel::ChunkedExec take_caa_exec, KernelContext* ctx
return take_caa_exec(ctx, chunked_array_array_batch, out);
}

/// \brief Generic (slower) VectorKernel::chunked_exec (for `CC->C` and `CA->C` cases).
Status TakeACCChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
const ExecBatch& batch, Datum* out) {
auto& values = batch.values[0].array();
auto& indices = batch.values[1].chunked_array();
auto num_chunks = indices->num_chunks();
std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
for (int i = 0; i < num_chunks; i++) {
// Take with that indices chunk
auto& indices_chunk = indices->chunk(i)->data();
Datum result = PrepareOutput(batch, values->length);
RETURN_NOT_OK(CallAAAKernel(take_aaa_exec, ctx, values, indices_chunk, out));
new_chunks[i] = MakeArray(result.array());
}
out->value = std::make_shared<ChunkedArray>(std::move(new_chunks), values->type);
return Status::OK();
}

/// \brief Generic (slower) VectorKernel::chunked_exec (`CA->C`, `CC->C`, and `AC->C`).
///
/// This function concatenates the chunks of the values and then calls the
/// `AA->A` take kernel to handle the `CA->A` cases.
/// This function concatenates the chunks of the values and then calls the `AA->A` take
/// kernel to handle the `CA->C` cases. The ArrayData returned by the `AA->A` kernel is
/// converted to a ChunkedArray with a single chunk to honor the `CA->C` contract.
///
/// For `CC->C` cases, it concatenates the chunks of the values and calls the `AA->A` take
/// kernel for each chunk of the indices, producing a new chunked array with the same
/// shape as the indices.
///
/// `AC->C` cases are trivially delegated to TakeACCChunkedExec without any concatenation.
///
/// \param take_aaa_exec The `AA->A` take kernel to use.
Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
const ExecBatch& batch, Datum* out) {
const auto& args = batch.values;
if (args[0].kind() == Datum::CHUNKED_ARRAY) {
// The generic implementation for C_->_ cases concatenates the chunks of the values,
// so it can delegate the work to the AA->A take kernel. SpecialTakeChunkedExec<> can
// be used for more efficient implementations that can delegate to the CA->A take
// kernel directly.
auto& values_chunked = args[0].chunked_array();
ARROW_ASSIGN_OR_RAISE(auto values_array,
ChunkedArrayAsArray(values_chunked, ctx->memory_pool()));
Expand Down Expand Up @@ -965,6 +976,17 @@ Status GenericTakeChunkedExec(ArrayKernelExec take_aaa_exec, KernelContext* ctx,
std::make_shared<ChunkedArray>(std::move(new_chunks), values_chunked->type());
return Status::OK();
}
} else {
// VectorKernel::chunked_exec are only called when at least one of the inputs is
// chunked, so we should be able to assume that args[1] is a chunked array when
// everything is wired up correctly.
if (args[1].kind(), Datum::CHUNKED_ARRAY) {
// AC->C
return TakeACCChunkedExec(take_aaa_exec, ctx, batch, out);
} else {
DCHECK(false) << "Unexpected kind for array_take's chunked_exec kernel: values="
<< args[0].ToString() << ", indices=" << args[1].ToString();
}
}
return Status::NotImplemented(
"Unsupported kinds for 'array_take', try using 'take': "
Expand All @@ -979,15 +1001,19 @@ struct GenericTakeChunkedExecFunctor {
}
};

/// \brief Specialized (faster) VectorKernel::chunked_exec for `CC->C` and `CA->C` cases.
/// \brief Specialized (faster) VectorKernel::chunked_exec (`CA->C`, `CC->C`, `AC->C`).
///
/// This function doesn't ever need to concatenate the chunks of the values, so it can be
/// more efficient than the generic implementation. It can delegate to the `CA->A` take
/// kernel directly [1] and for `CC->C` cases it can call the `CA->A` take kernel for each
/// chunk of the indices, producing a new chunked array with the same shape as the
/// indices.
/// more efficient than GenericTakeChunkedExec that can only delegate to the `AA->A` take
/// kernels.
///
/// [1] And trivially convert the result to a ChunkedArray of a single chunk.
/// For `CA->C` cases, it can call the `CA->A` take kernel directly [1] and trivially
/// convert the result to a ChunkedArray of a single chunk to honor the `CA->C` contract.
///
/// For `CC->C` cases it can call the `CA->A` take kernel for each chunk of the indices to
/// get each chunk that becomes the ChunkedArray output.
///
/// `AC->C` cases are trivially delegated to TakeACCChunkedExec.
///
/// \param take_aaa_exec The `AA->A` take kernel to use.
Status SpecialTakeChunkedExec(const ArrayKernelExec take_aaa_exec,
Expand Down Expand Up @@ -1048,6 +1074,17 @@ Status SpecialTakeChunkedExec(const ArrayKernelExec take_aaa_exec,
std::make_shared<ChunkedArray>(std::move(new_chunks), values_chunked->type());
return Status::OK();
}
} else {
// VectorKernel::chunked_exec are only called when at least one of the inputs is
// chunked, so we should be able to assume that args[1] is a chunked array when
// everything is wired up correctly.
if (args[1].kind(), Datum::CHUNKED_ARRAY) {
// AC->C
return TakeACCChunkedExec(take_aaa_exec, ctx, batch, out);
} else {
DCHECK(false) << "Unexpected kind for array_take's chunked_exec kernel: values="
<< args[0].ToString() << ", indices=" << args[1].ToString();
}
}
return Status::NotImplemented(
"Unsupported kinds for 'array_take', try using 'take': "
Expand Down

0 comments on commit 30877b4

Please sign in to comment.