Skip to content

Commit

Permalink
Add c2c support
Browse files Browse the repository at this point in the history
  • Loading branch information
boneanxs committed Feb 29, 2024
1 parent fcfd758 commit 48caff2
Show file tree
Hide file tree
Showing 26 changed files with 510 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,8 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"WriteFilesTransformer metrics update is not supported in CH backend")
}

override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"ColumnarToColumnar metrics update is not supported in CH backend")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
case _ => super.postProcessPushDownFilter(extraFilters, sparkExecNode)
}
}

override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase = {
throw new UnsupportedOperationException(
"ColumnarToColumnarExec is not supported in ch backend.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ class MetricsApiImpl extends MetricsApi with Logging {
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert")
)

override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert")
)

override def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
override def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase =
RowToVeloxColumnarExec(child)

/**
* Generate ColumnarToColumnarExec.
*
* @param child
* @return
*/
override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase =
ColumnarToVeloxColumnarExec(child)

/**
* Generate FilterExecTransformer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,6 @@ object BackendSettings extends BackendSettingsApi {
// vanilla Spark, we need to rewrite the aggregate to get the correct data type.
true
}

override def supportColumnarToColumnarExec(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,31 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.velox.ValidatorApiImpl
import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.Runtimes
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.utils.{ArrowAbiUtil, Iterators}
import io.glutenproject.vectorized.VanillaColumnarToNativeColumnarJniWrapper
import org.apache.arrow.c.{ArrowArray, ArrowSchema, Data}
import org.apache.spark.TaskContext

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.arrow.ArrowColumnarBatchConverter
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.TaskResources

case class VanillaColumnarToVeloxColumnarExec(child: SparkPlan) extends GlutenPlan with UnaryExecNode {

override def supportsColumnar: Boolean = true
import org.apache.arrow.c.{ArrowArray, ArrowSchema, Data}

override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColumnarExecBase(child) {

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
new ValidatorApiImpl().doSchemaValidate(schema).foreach {
reason =>
throw new UnsupportedOperationException(
s"Input schema contains unsupported type when convert columnar to columnar for $schema " +
s"due to $reason")
s"Input schema contains unsupported type when performing columnar" +
s" to columnar for $schema " + s"due to $reason")
}

val numInputBatches = longMetric("numInputBatches")
Expand All @@ -63,33 +55,30 @@ case class VanillaColumnarToVeloxColumnarExec(child: SparkPlan) extends GlutenPl
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = schema
child.execute().mapPartitions {
child.executeColumnar().mapPartitions {
rowIterator =>
VanillaColumnarToVeloxColumnarExec.toColumnarBatchIterator(
rowIterator.asInstanceOf[Iterator[ColumnarBatch]],
ColumnarToVeloxColumnarExec.toColumnarBatchIterator(
rowIterator,
localSchema,
numInputBatches,
numOutputBatches,
convertTime,
TaskContext.get())
convertTime)
}
}

override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
copy(child = newChild)
}

override def output: Seq[Attribute] = child.output
}

object VanillaColumnarToVeloxColumnarExec {
object ColumnarToVeloxColumnarExec {

def toColumnarBatchIterator(it: Iterator[ColumnarBatch],
schema: StructType,
numInputBatches: SQLMetric,
numOutputBatches: SQLMetric,
convertTime: SQLMetric,
taskContext: TaskContext): Iterator[ColumnarBatch] = {
def toColumnarBatchIterator(
it: Iterator[ColumnarBatch],
schema: StructType,
numInputBatches: SQLMetric,
numOutputBatches: SQLMetric,
convertTime: SQLMetric): Iterator[ColumnarBatch] = {
if (it.isEmpty) {
return Iterator.empty
}
Expand All @@ -99,49 +88,51 @@ object VanillaColumnarToVeloxColumnarExec {
val jniWrapper = VanillaColumnarToNativeColumnarJniWrapper.create()
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
val c2cHandle =
try {
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
jniWrapper.init(
cSchema.memoryAddress(),
NativeMemoryManagers
.contextInstance("ColumnarToColumnar")
.getNativeInstanceHandle)
} finally {
cSchema.close()
}
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val c2cHandle = jniWrapper.init(
cSchema.memoryAddress(),
NativeMemoryManagers
.contextInstance("ColumnarToColumnar")
.getNativeInstanceHandle)

val converter = ArrowColumnarBatchConverter.create(arrowSchema, allocator)

TaskResources.addRecycler("ColumnarToColumnar_resourceClean", 100) {
jniWrapper.close(c2cHandle)
converter.close()
cSchema.close()
}

val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {

var arrowArray: ArrowArray = null
TaskResources.addRecycler("ColumnarToColumnar_arrowArray", 100) {
if (arrowArray != null) {
arrowArray.release()
arrowArray.close()
converter.reset()
}
}

override def hasNext: Boolean = {
if (arrowArray != null) {
arrowArray.release()
arrowArray.close()
converter.reset()
arrowArray = null
}
it.hasNext
}

def nativeConvert(cb: ColumnarBatch): ColumnarBatch = {
var arrowArray: ArrowArray = null
TaskResources.addRecycler("ColumnarToColumnar_arrowArray", 100) {
// Remind, remove isOpen here
if (arrowArray != null) {
arrowArray.close()
}
}

numInputBatches += 1
try {
arrowArray = ArrowArray.allocateNew(allocator)
converter.write(cb)
converter.finish()
Data.exportVectorSchemaRoot(allocator, converter.root, null, arrowArray)
val handle = jniWrapper
.nativeConvertVanillaColumnarToColumnar(c2cHandle, arrowArray.memoryAddress())
ColumnarBatches.create(Runtimes.contextInstance(), handle)
} finally {
converter.reset()
arrowArray.close()
arrowArray = null
}
arrowArray = ArrowArray.allocateNew(allocator)
converter.write(cb)
converter.finish()
Data.exportVectorSchemaRoot(allocator, converter.root, null, arrowArray)
val handle = jniWrapper
.nativeConvertVanillaColumnarToColumnar(c2cHandle, arrowArray.memoryAddress())
ColumnarBatches.create(Runtimes.contextInstance(), handle)
}

override def next(): ColumnarBatch = {
Expand All @@ -154,20 +145,12 @@ object VanillaColumnarToVeloxColumnarExec {
}
}

if (taskContext != null) {
taskContext.addTaskCompletionListener[Unit] { _ =>
jniWrapper.close(c2cHandle)
allocator.close()
converter.close()
}
}

Iterators
.wrap(res)
.recycleIterator {
jniWrapper.close(c2cHandle)
allocator.close()
converter.close()
cSchema.close()
}
.recyclePayload(_.close())
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.sql.sources.useV1SourceList", "avro")
.set("spark.gluten.sql.columnar.batchscan", "false")
.set("spark.sql.columnVector.offheap.enabled", "true")
}

test("simple_select") {
Expand Down
7 changes: 7 additions & 0 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "memory/ColumnarBatch.h"
#include "memory/MemoryManager.h"
#include "operators/c2r/ColumnarToRow.h"
#include "operators/c2c/ColumnarToColumnar.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "operators/writer/Datasource.h"
Expand Down Expand Up @@ -104,6 +105,12 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
MemoryManager* memoryManager,
struct ArrowSchema* cSchema) = 0;

/// This function is used to create certain converter from Spark ColumnarBatch
/// to the format used by the backend ColumnarBatch
virtual std::shared_ptr<ColumnarToColumnarConverter>createColumnar2ColumnarConverter(
MemoryManager* memoryManager,
struct ArrowSchema* cSchema) = 0;

virtual std::shared_ptr<ShuffleWriter> createShuffleWriter(
int numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
Expand Down
42 changes: 42 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,48 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniWr
JNI_METHOD_END()
}

JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_init( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong cSchema,
jlong memoryManagerHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);

return ctx->objectStore()->save(
ctx->createColumnar2ColumnarConverter(memoryManager, reinterpret_cast<struct ArrowSchema*>(cSchema)));
JNI_METHOD_END(kInvalidResourceHandle)
}

JNIEXPORT jlong JNICALL
Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_nativeConvertVanillaColumnarToColumnar( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong c2cHandle,
jlong memoryAddress) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

struct ArrowArray* cArray = reinterpret_cast<struct ArrowArray*>(memoryAddress);

auto converter = ctx->objectStore()->retrieve<ColumnarToColumnarConverter>(c2cHandle);
auto cb = converter->convert(cArray);
return ctx->objectStore()->save(cb);
JNI_METHOD_END(kInvalidResourceHandle)
}

JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong c2cHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

ctx->objectStore()->release(c2cHandle);
JNI_METHOD_END()
}

JNIEXPORT jstring JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_getType( // NOLINT
JNIEnv* env,
jobject wrapper,
Expand Down
36 changes: 36 additions & 0 deletions cpp/core/operators/c2c/ColumnarToColumnar.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "memory/ColumnarBatch.h"
#include "utils/exception.h"

namespace gluten {

class ColumnarToColumnarConverter {
public:
ColumnarToColumnarConverter() {}

virtual ~ColumnarToColumnarConverter() = default;

virtual std::shared_ptr<ColumnarBatch> convert(ArrowArray* cArray) {
throw GlutenException("Not implement column to column");
}
};

} // namespace gluten
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ set(VELOX_SRCS
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
operators/serializer/VeloxColumnarToColumnarConverter.cc
operators/writer/VeloxParquetDatasource.cc
shuffle/VeloxShuffleReader.cc
shuffle/VeloxShuffleWriter.cc
Expand Down
8 changes: 8 additions & 0 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "compute/VeloxPlanConverter.h"
#include "config/GlutenConfig.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
#include "operators/serializer/VeloxColumnarToColumnarConverter.h"
#include "shuffle/VeloxShuffleReader.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "utils/ConfigExtractor.h"
Expand Down Expand Up @@ -170,6 +171,13 @@ std::shared_ptr<RowToColumnarConverter> VeloxRuntime::createRow2ColumnarConverte
return std::make_shared<VeloxRowToColumnarConverter>(cSchema, ctxVeloxPool);
}

std::shared_ptr<ColumnarToColumnarConverter> VeloxRuntime::createColumnar2ColumnarConverter(
MemoryManager* memoryManager,
struct ArrowSchema* cSchema) {
auto ctxVeloxPool = getLeafVeloxPool(memoryManager);
return std::make_shared<VeloxColumnarToColumnarConverter>(cSchema, ctxVeloxPool);
}

std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter(
int numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
Expand Down
Loading

0 comments on commit 48caff2

Please sign in to comment.