diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index 838612036af2f..b05aa45e41d6e 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -382,4 +382,8 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { s"WriteFilesTransformer metrics update is not supported in CH backend") } + override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = { + throw new UnsupportedOperationException( + s"ColumnarToColumnar metrics update is not supported in CH backend") + } } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala index bd52f9d93a491..13795a90873c0 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -735,4 +735,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { case _ => super.postProcessPushDownFilter(extraFilters, sparkExecNode) } } + + override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase = { + throw new UnsupportedOperationException( + "ColumnarToColumnarExec is not supported in ch backend.") + } } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala index 2d1eb1315c876..7b0350e849d8f 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala @@ -313,6 +313,13 @@ class MetricsApiImpl extends MetricsApi with Logging { "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert") ) + override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + Map( + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert") + ) + override def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index cd53b74560a49..1ea8ffa1c2b78 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -158,6 +158,15 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { override def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase = RowToVeloxColumnarExec(child) + /** + * Generate ColumnarToColumnarExec. + * + * @param child + * @return + */ + override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase = + ColumnarToVeloxColumnarExec(child) + /** * Generate FilterExecTransformer. * diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 939ea3d7617df..14fc001c5a0af 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -449,4 +449,6 @@ object BackendSettings extends BackendSettingsApi { // vanilla Spark, we need to rewrite the aggregate to get the correct data type. true } + + override def supportColumnarToColumnarExec(): Boolean = true } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/VanillaColumnarToVeloxColumnarExec.scala b/backends-velox/src/main/scala/io/glutenproject/execution/ColumnarToVeloxColumnarExec.scala similarity index 59% rename from backends-velox/src/main/scala/io/glutenproject/execution/VanillaColumnarToVeloxColumnarExec.scala rename to backends-velox/src/main/scala/io/glutenproject/execution/ColumnarToVeloxColumnarExec.scala index 8c4f4b0bbe617..ea9c8ec3de1ec 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/VanillaColumnarToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/ColumnarToVeloxColumnarExec.scala @@ -19,39 +19,31 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.velox.ValidatorApiImpl import io.glutenproject.columnarbatch.ColumnarBatches import io.glutenproject.exec.Runtimes -import io.glutenproject.extension.GlutenPlan import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.utils.{ArrowAbiUtil, Iterators} import io.glutenproject.vectorized.VanillaColumnarToNativeColumnarJniWrapper -import org.apache.arrow.c.{ArrowArray, ArrowSchema, Data} -import org.apache.spark.TaskContext + import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.arrow.ArrowColumnarBatchConverter import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkArrowUtil import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.TaskResources -case class VanillaColumnarToVeloxColumnarExec(child: SparkPlan) extends GlutenPlan with UnaryExecNode { - - override def supportsColumnar: Boolean = true +import org.apache.arrow.c.{ArrowArray, ArrowSchema, Data} - override protected def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") - } +case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColumnarExecBase(child) { - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { new ValidatorApiImpl().doSchemaValidate(schema).foreach { reason => throw new UnsupportedOperationException( - s"Input schema contains unsupported type when convert columnar to columnar for $schema " + - s"due to $reason") + s"Input schema contains unsupported type when performing columnar" + + s" to columnar for $schema " + s"due to $reason") } val numInputBatches = longMetric("numInputBatches") @@ -63,33 +55,30 @@ case class VanillaColumnarToVeloxColumnarExec(child: SparkPlan) extends GlutenPl // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. val localSchema = schema - child.execute().mapPartitions { + child.executeColumnar().mapPartitions { rowIterator => - VanillaColumnarToVeloxColumnarExec.toColumnarBatchIterator( - rowIterator.asInstanceOf[Iterator[ColumnarBatch]], + ColumnarToVeloxColumnarExec.toColumnarBatchIterator( + rowIterator, localSchema, numInputBatches, numOutputBatches, - convertTime, - TaskContext.get()) + convertTime) } } override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { copy(child = newChild) } - - override def output: Seq[Attribute] = child.output } -object VanillaColumnarToVeloxColumnarExec { +object ColumnarToVeloxColumnarExec { - def toColumnarBatchIterator(it: Iterator[ColumnarBatch], - schema: StructType, - numInputBatches: SQLMetric, - numOutputBatches: SQLMetric, - convertTime: SQLMetric, - taskContext: TaskContext): Iterator[ColumnarBatch] = { + def toColumnarBatchIterator( + it: Iterator[ColumnarBatch], + schema: StructType, + numInputBatches: SQLMetric, + numOutputBatches: SQLMetric, + convertTime: SQLMetric): Iterator[ColumnarBatch] = { if (it.isEmpty) { return Iterator.empty } @@ -99,49 +88,51 @@ object VanillaColumnarToVeloxColumnarExec { val jniWrapper = VanillaColumnarToNativeColumnarJniWrapper.create() val allocator = ArrowBufferAllocators.contextInstance() val cSchema = ArrowSchema.allocateNew(allocator) - val c2cHandle = - try { - ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) - jniWrapper.init( - cSchema.memoryAddress(), - NativeMemoryManagers - .contextInstance("ColumnarToColumnar") - .getNativeInstanceHandle) - } finally { - cSchema.close() - } + ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) + val c2cHandle = jniWrapper.init( + cSchema.memoryAddress(), + NativeMemoryManagers + .contextInstance("ColumnarToColumnar") + .getNativeInstanceHandle) val converter = ArrowColumnarBatchConverter.create(arrowSchema, allocator) + TaskResources.addRecycler("ColumnarToColumnar_resourceClean", 100) { + jniWrapper.close(c2cHandle) + converter.close() + cSchema.close() + } + val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] { + var arrowArray: ArrowArray = null + TaskResources.addRecycler("ColumnarToColumnar_arrowArray", 100) { + if (arrowArray != null) { + arrowArray.release() + arrowArray.close() + converter.reset() + } + } + override def hasNext: Boolean = { + if (arrowArray != null) { + arrowArray.release() + arrowArray.close() + converter.reset() + arrowArray = null + } it.hasNext } def nativeConvert(cb: ColumnarBatch): ColumnarBatch = { - var arrowArray: ArrowArray = null - TaskResources.addRecycler("ColumnarToColumnar_arrowArray", 100) { - // Remind, remove isOpen here - if (arrowArray != null) { - arrowArray.close() - } - } - numInputBatches += 1 - try { - arrowArray = ArrowArray.allocateNew(allocator) - converter.write(cb) - converter.finish() - Data.exportVectorSchemaRoot(allocator, converter.root, null, arrowArray) - val handle = jniWrapper - .nativeConvertVanillaColumnarToColumnar(c2cHandle, arrowArray.memoryAddress()) - ColumnarBatches.create(Runtimes.contextInstance(), handle) - } finally { - converter.reset() - arrowArray.close() - arrowArray = null - } + arrowArray = ArrowArray.allocateNew(allocator) + converter.write(cb) + converter.finish() + Data.exportVectorSchemaRoot(allocator, converter.root, null, arrowArray) + val handle = jniWrapper + .nativeConvertVanillaColumnarToColumnar(c2cHandle, arrowArray.memoryAddress()) + ColumnarBatches.create(Runtimes.contextInstance(), handle) } override def next(): ColumnarBatch = { @@ -154,20 +145,12 @@ object VanillaColumnarToVeloxColumnarExec { } } - if (taskContext != null) { - taskContext.addTaskCompletionListener[Unit] { _ => - jniWrapper.close(c2cHandle) - allocator.close() - converter.close() - } - } - Iterators .wrap(res) .recycleIterator { jniWrapper.close(c2cHandle) - allocator.close() converter.close() + cSchema.close() } .recyclePayload(_.close()) .create() diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala index e4a3a8f2392c7..059b6f59a21b5 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala @@ -52,8 +52,6 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") .set("spark.sql.sources.useV1SourceList", "avro") - .set("spark.gluten.sql.columnar.batchscan", "false") - .set("spark.sql.columnVector.offheap.enabled", "true") } test("simple_select") { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index f8a3a5a24b839..5854ed7bff978 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -25,6 +25,7 @@ #include "memory/ColumnarBatch.h" #include "memory/MemoryManager.h" #include "operators/c2r/ColumnarToRow.h" +#include "operators/c2c/ColumnarToColumnar.h" #include "operators/r2c/RowToColumnar.h" #include "operators/serializer/ColumnarBatchSerializer.h" #include "operators/writer/Datasource.h" @@ -104,6 +105,12 @@ class Runtime : public std::enable_shared_from_this { MemoryManager* memoryManager, struct ArrowSchema* cSchema) = 0; + /// This function is used to create certain converter from Spark ColumnarBatch + /// to the format used by the backend ColumnarBatch + virtual std::shared_ptrcreateColumnar2ColumnarConverter( + MemoryManager* memoryManager, + struct ArrowSchema* cSchema) = 0; + virtual std::shared_ptr createShuffleWriter( int numPartitions, std::unique_ptr partitionWriter, diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 17a07aa091976..c194e773d3a38 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -663,6 +663,48 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniWr JNI_METHOD_END() } +JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_init( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong cSchema, + jlong memoryManagerHandle) { + JNI_METHOD_START + auto ctx = gluten::getRuntime(env, wrapper); + auto memoryManager = jniCastOrThrow(memoryManagerHandle); + + return ctx->objectStore()->save( + ctx->createColumnar2ColumnarConverter(memoryManager, reinterpret_cast(cSchema))); + JNI_METHOD_END(kInvalidResourceHandle) +} + +JNIEXPORT jlong JNICALL +Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_nativeConvertVanillaColumnarToColumnar( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong c2cHandle, + jlong memoryAddress) { + JNI_METHOD_START + auto ctx = gluten::getRuntime(env, wrapper); + + struct ArrowArray* cArray = reinterpret_cast(memoryAddress); + + auto converter = ctx->objectStore()->retrieve(c2cHandle); + auto cb = converter->convert(cArray); + return ctx->objectStore()->save(cb); + JNI_METHOD_END(kInvalidResourceHandle) +} + +JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_close( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong c2cHandle) { + JNI_METHOD_START + auto ctx = gluten::getRuntime(env, wrapper); + + ctx->objectStore()->release(c2cHandle); + JNI_METHOD_END() +} + JNIEXPORT jstring JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_getType( // NOLINT JNIEnv* env, jobject wrapper, diff --git a/cpp/core/operators/c2c/ColumnarToColumnar.h b/cpp/core/operators/c2c/ColumnarToColumnar.h new file mode 100644 index 0000000000000..4f9d0b2d253bc --- /dev/null +++ b/cpp/core/operators/c2c/ColumnarToColumnar.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "memory/ColumnarBatch.h" +#include "utils/exception.h" + +namespace gluten { + +class ColumnarToColumnarConverter { + public: + ColumnarToColumnarConverter() {} + + virtual ~ColumnarToColumnarConverter() = default; + + virtual std::shared_ptr convert(ArrowArray* cArray) { + throw GlutenException("Not implement column to column"); + } +}; + +} // namespace gluten diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index b15fd395ff235..215ba7473575e 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -307,6 +307,7 @@ set(VELOX_SRCS operators/serializer/VeloxColumnarToRowConverter.cc operators/serializer/VeloxColumnarBatchSerializer.cc operators/serializer/VeloxRowToColumnarConverter.cc + operators/serializer/VeloxColumnarToColumnarConverter.cc operators/writer/VeloxParquetDatasource.cc shuffle/VeloxShuffleReader.cc shuffle/VeloxShuffleWriter.cc diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 654d374a3194d..45149b0184358 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -28,6 +28,7 @@ #include "compute/VeloxPlanConverter.h" #include "config/GlutenConfig.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" +#include "operators/serializer/VeloxColumnarToColumnarConverter.h" #include "shuffle/VeloxShuffleReader.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/ConfigExtractor.h" @@ -170,6 +171,13 @@ std::shared_ptr VeloxRuntime::createRow2ColumnarConverte return std::make_shared(cSchema, ctxVeloxPool); } +std::shared_ptr VeloxRuntime::createColumnar2ColumnarConverter( + MemoryManager* memoryManager, + struct ArrowSchema* cSchema) { + auto ctxVeloxPool = getLeafVeloxPool(memoryManager); + return std::make_shared(cSchema, ctxVeloxPool); +} + std::shared_ptr VeloxRuntime::createShuffleWriter( int numPartitions, std::unique_ptr partitionWriter, diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 0233b3f82e977..5db8ee67ee789 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -21,6 +21,7 @@ #include "compute/Runtime.h" #include "memory/VeloxMemoryManager.h" #include "operators/serializer/VeloxColumnarBatchSerializer.h" +#include "operators/serializer/VeloxColumnarToColumnarConverter.h" #include "operators/serializer/VeloxColumnarToRowConverter.h" #include "operators/writer/VeloxParquetDatasource.h" #include "shuffle/ShuffleReader.h" @@ -82,6 +83,10 @@ class VeloxRuntime final : public Runtime { std::shared_ptr createRow2ColumnarConverter( MemoryManager* memoryManager, struct ArrowSchema* cSchema) override; + + std::shared_ptr createColumnar2ColumnarConverter( + MemoryManager* memoryManager, + struct ArrowSchema* cSchema) override; std::shared_ptr createShuffleWriter( int numPartitions, diff --git a/cpp/velox/operators/serializer/VeloxColumnarToColumnarConverter.cc b/cpp/velox/operators/serializer/VeloxColumnarToColumnarConverter.cc new file mode 100644 index 0000000000000..ff0c1f564afea --- /dev/null +++ b/cpp/velox/operators/serializer/VeloxColumnarToColumnarConverter.cc @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "VeloxColumnarToColumnarConverter.h" +#include "memory/VeloxColumnarBatch.h" +#include "utils/VeloxArrowUtils.h" +#include "velox/row/UnsafeRowDeserializers.h" + +using namespace facebook; +using namespace facebook::velox; +namespace gluten { +VeloxColumnarToColumnarConverter::VeloxColumnarToColumnarConverter( + struct ArrowSchema* cSchema, + std::shared_ptr memoryPool) + : ColumnarToColumnarConverter(), cSchema_(cSchema), pool_(memoryPool){} + +VeloxColumnarToColumnarConverter::~VeloxColumnarToColumnarConverter() { + if (cSchema_) { + ArrowSchemaRelease(cSchema_); + } +} + +std::shared_ptr +VeloxColumnarToColumnarConverter::convert(ArrowArray* cArray) { + auto vp = importFromArrowAsViewer(*cSchema_, *cArray, ArrowUtils::getBridgeOptions(), pool_.get()); + return std::make_shared(std::dynamic_pointer_cast(vp)); +} +} // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxColumnarToColumnarConverter.h b/cpp/velox/operators/serializer/VeloxColumnarToColumnarConverter.h new file mode 100644 index 0000000000000..4cb9e9222ad5c --- /dev/null +++ b/cpp/velox/operators/serializer/VeloxColumnarToColumnarConverter.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "memory/ColumnarBatch.h" +#include "operators/c2c/ColumnarToColumnar.h" +#include "velox/common/memory/Memory.h" +#include "velox/type/Type.h" + +namespace gluten { + +class VeloxColumnarToColumnarConverter final : public ColumnarToColumnarConverter { + public: + VeloxColumnarToColumnarConverter( + struct ArrowSchema* cSchema, + std::shared_ptr memoryPool); + ~VeloxColumnarToColumnarConverter(); + + std::shared_ptr convert(ArrowArray* cArray); + + protected: + struct ArrowSchema* cSchema_; + std::shared_ptr pool_; + +}; + +} // namespace gluten diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index dff210b7f99fa..e7d77ff2ff9dc 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -39,7 +39,7 @@ endfunction() add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc) # TODO: ORC is not well supported. # add_velox_test(orc_test SOURCES OrcTest.cc) -add_velox_test(velox_operators_test SOURCES VeloxColumnarToRowTest.cc VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc) +add_velox_test(velox_operators_test SOURCES VeloxColumnarToRowTest.cc VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc VeloxColumnarToColumnarTest.cc) add_velox_test( velox_plan_conversion_test SOURCES diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 0c51f0bd08cf2..8668d4c2f35f2 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -56,6 +56,11 @@ class DummyRuntime final : public Runtime { struct ArrowSchema* cSchema) override { throw GlutenException("Not yet implemented"); } + std::shared_ptrcreateColumnar2ColumnarConverter( + MemoryManager* memoryManager, + struct ArrowSchema* cSchema) override { + throw GlutenException("Not yet implemented"); + } std::shared_ptr createShuffleWriter( int numPartitions, std::unique_ptr partitionWriter, diff --git a/cpp/velox/tests/VeloxColumnarToColumnarTest.cc b/cpp/velox/tests/VeloxColumnarToColumnarTest.cc new file mode 100644 index 0000000000000..3e00a82746caa --- /dev/null +++ b/cpp/velox/tests/VeloxColumnarToColumnarTest.cc @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "memory/ArrowMemoryPool.h" +#include "memory/VeloxColumnarBatch.h" +#include "operators/serializer/VeloxColumnarToColumnarConverter.h" +#include "utils/VeloxArrowUtils.h" +#include "velox/vector/arrow/Bridge.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook; +using namespace facebook::velox; + +namespace gluten { +class VeloxColumnarToColumnarTest : public ::testing::Test, public test::VectorTestBase { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + void testRowVectorEqual(ArrowSchema cSchema, ArrowArray arrowArray, velox::RowVectorPtr vector) { + auto columnarToColumnarConverter = std::make_shared(&cSchema, pool_); + + auto cb = columnarToColumnarConverter->convert(&cSchema, &arrowArray); + auto vp = std::dynamic_pointer_cast(cb)->getRowVector(); + velox::test::assertEqualVectors(vector, vp); + } + + private: + std::shared_ptr arrowPool_ = defaultArrowMemoryPool(); +}; + +TEST_F(VeloxColumnarToColumnarTest, allTypes) { + auto vector = makeRowVector({ + makeNullableFlatVector({1, 2, 3, std::nullopt, 4, std::nullopt, 5, 6, std::nullopt, 7}), + makeNullableFlatVector({1, -1, std::nullopt, std::nullopt, -2, 2, std::nullopt, std::nullopt, 3, -3}), + makeNullableFlatVector({1, 2, 3, 4, std::nullopt, 5, 6, 7, 8, std::nullopt}), + makeNullableFlatVector( + {std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt}), + makeNullableFlatVector( + {-0.1234567, + std::nullopt, + 0.1234567, + std::nullopt, + -0.142857, + std::nullopt, + 0.142857, + 0.285714, + 0.428617, + std::nullopt}), + makeNullableFlatVector( + {std::nullopt, true, false, std::nullopt, true, true, false, true, std::nullopt, std::nullopt}), + makeFlatVector( + {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", "boB7", "ALICE8", "BOB9"}), + makeNullableFlatVector( + {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob", std::nullopt, "alicE", std::nullopt, "boB"}), + }); + + ArrowSchema cSchema; + ArrowArray arrowArray; + exportToArrow(vector, cSchema, ArrowUtils::getBridgeOptions()); + exportToArrow(vector, arrowArray, pool(), ArrowUtils::getBridgeOptions()); + testRowVectorEqual(cSchema, arrowArray, vector); +} +} // namespace gluten diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 74973a60c6761..9b5a080b69734 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -132,4 +132,6 @@ trait BackendSettingsApi { def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = false def shouldRewriteTypedImperativeAggregate(): Boolean = false + + def supportColumnarToColumnarExec(): Boolean = false } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala index c0448af78c1ef..a8c460fb18b9c 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala @@ -83,6 +83,8 @@ trait MetricsApi extends Serializable { def genRowToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] + def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] + def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] def genLimitTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index 807563b94daa4..32d3c4ce41b12 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -69,6 +69,14 @@ trait SparkPlanExecApi { */ def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase + /** + * Generate ColumnarToColumnarExec. + * + * @param child + * @return + */ + def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase + /** * Generate FilterExecTransformer. * diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ColumnarToColumnarExecBase.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ColumnarToColumnarExecBase.scala new file mode 100644 index 0000000000000..27b908d373045 --- /dev/null +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ColumnarToColumnarExecBase.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.backendsapi.BackendsApiManager +import io.glutenproject.extension.GlutenPlan + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Provides a common executor to translate an [[RDD]] of Vanilla [[ColumnarBatch]] into an [[RDD]] + * of native [[ColumnarBatch]]. + */ +abstract class ColumnarToColumnarExecBase(child: SparkPlan) extends GlutenPlan with UnaryExecNode { + + // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. + @transient override lazy val metrics = + BackendsApiManager.getMetricsApiInstance.genColumnarToColumnarMetrics(sparkContext) + + override def supportsColumnar: Boolean = true + + override protected def doExecute(): RDD[InternalRow] = { + child.execute() + } + + final override def doExecuteColumnar(): RDD[ColumnarBatch] = { + doExecuteColumnarInternal() + } + + final override def output: Seq[Attribute] = child.output + + final override def outputPartitioning: Partitioning = child.outputPartitioning + + final override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + def doExecuteColumnarInternal(): RDD[ColumnarBatch] +} diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 685639a7e59e1..d767d3407f793 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -555,8 +555,28 @@ case class VanillaColumnarPlanOverrides(session: SparkSession) extends Rule[Spar plan.withNewChildren(plan.children.map(replaceWithVanillaRowToColumnar)) } + private def replaceWithColumnarToColumnar(plan: SparkPlan): SparkPlan = { + plan match { + case p: RowToColumnarExecBase if p.child.isInstanceOf[ColumnarToRowExec] => + val replacedChild = replaceWithColumnarToColumnar( + p.child.asInstanceOf[ColumnarToRowExec].child) + BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToColumnarExec(replacedChild) + case _ => + plan.withNewChildren(plan.children.map(replaceWithColumnarToColumnar)) + } + } + def apply(plan: SparkPlan): SparkPlan = { - val newPlan = replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan)) + val newPlan = + if ( + GlutenConfig.getConf.enableNativeColumnarToColumnar && BackendsApiManager.getSettings + .supportColumnarToColumnarExec() + ) { + replaceWithColumnarToColumnar( + replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))) + } else { + replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan)) + } planChangeLogger.logRule(ruleName, plan, newPlan) newPlan } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java index 3698d660ac0ef..59b6b7ba1913f 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.glutenproject.vectorized; import io.glutenproject.exec.Runtime; @@ -5,25 +21,24 @@ import io.glutenproject.exec.Runtimes; public class VanillaColumnarToNativeColumnarJniWrapper implements RuntimeAware { - private final Runtime runtime; + private final Runtime runtime; - private VanillaColumnarToNativeColumnarJniWrapper(Runtime runtime) { - this.runtime = runtime; - } + private VanillaColumnarToNativeColumnarJniWrapper(Runtime runtime) { + this.runtime = runtime; + } - public static VanillaColumnarToNativeColumnarJniWrapper create() { - return new VanillaColumnarToNativeColumnarJniWrapper(Runtimes.contextInstance()); - } + public static VanillaColumnarToNativeColumnarJniWrapper create() { + return new VanillaColumnarToNativeColumnarJniWrapper(Runtimes.contextInstance()); + } - @Override - public long handle() { - return runtime.getHandle(); - } + @Override + public long handle() { + return runtime.getHandle(); + } - public native long init(long cSchema, long memoryManagerHandle); + public native long init(long cSchema, long memoryManagerHandle); - public native long nativeConvertVanillaColumnarToColumnar( - long c2cHandle, long bufferAddress); + public native long nativeConvertVanillaColumnarToColumnar(long c2cHandle, long memoryAddress); - public native void close(long c2cHandle); + public native void close(long c2cHandle); } diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala index aa73d31267470..1ac1c34c121ce 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala @@ -16,22 +16,23 @@ */ package org.apache.spark.sql.execution.arrow -import org.apache.arrow.memory.BufferAllocator - -import scala.collection.JavaConverters._ -import org.apache.arrow.vector._ -import org.apache.arrow.vector.complex._ -import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.types._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.sql.vectorized.ColumnVector +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector._ +import org.apache.arrow.vector.complex._ +import org.apache.arrow.vector.types.pojo.Schema + +import scala.collection.JavaConverters._ object ArrowColumnarBatchConverter { @@ -41,9 +42,10 @@ object ArrowColumnarBatchConverter { } def create(root: VectorSchemaRoot): ArrowColumnarBatchConverter = { - val children = root.getFieldVectors.asScala.map { vector => - vector.allocateNew() - createFieldWriter(vector) + val children = root.getFieldVectors.asScala.map { + vector => + vector.allocateNew() + createFieldWriter(vector) } new ArrowColumnarBatchConverter(root, children.toArray) } @@ -72,8 +74,8 @@ object ArrowColumnarBatchConverter { val valueWriter = createFieldWriter(structVector.getChild(MapVector.VALUE_NAME)) new MapWriter(vector, structVector, keyWriter, valueWriter) case (StructType(_), vector: StructVector) => - val children = (0 until vector.size()).map { ordinal => - createFieldWriter(vector.getChildByOrdinal(ordinal)) + val children = (0 until vector.size()).map { + ordinal => createFieldWriter(vector.getChildByOrdinal(ordinal)) } new StructWriter(vector, children.toArray) case (NullType, vector: NullVector) => new NullWriter(vector) @@ -103,7 +105,8 @@ case class ColumnarSpecializedGetters(columnVector: ColumnVector) extends Specia override def getDouble(rowId: Int): Double = columnVector.getDouble(rowId) - override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal = columnVector.getDecimal(rowId, precision, scale) + override def getDecimal(rowId: Int, precision: Int, scale: Int): Decimal = + columnVector.getDecimal(rowId, precision, scale) override def getUTF8String(rowId: Int): UTF8String = columnVector.getUTF8String(rowId) @@ -127,11 +130,12 @@ class ArrowColumnarBatchConverter(val root: VectorSchemaRoot, fields: Array[Arro private var count: Int = 0 def write(columnarBatch: ColumnarBatch): Unit = { - fields.zipWithIndex.foreach { case (field, ordinal) => - val columnVector = ColumnarSpecializedGetters(columnarBatch.column(ordinal)) - for (rowId <- 0 until columnarBatch.numRows()) { - field.write(columnVector, rowId) - } + fields.zipWithIndex.foreach { + case (field, ordinal) => + val columnVector = ColumnarSpecializedGetters(columnarBatch.column(ordinal)) + for (rowId <- 0 until columnarBatch.numRows()) { + field.write(columnVector, rowId) + } } count += columnarBatch.numRows() } @@ -150,4 +154,4 @@ class ArrowColumnarBatchConverter(val root: VectorSchemaRoot, fields: Array[Arro def close(): Unit = { root.close() } -} \ No newline at end of file +} diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 07adacf42079c..5b4926adafa69 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -70,6 +70,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableNativeColumnarToRow: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED) + def enableNativeColumnarToColumnar: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_COLUMNAR_ENABLED) + def forceShuffledHashJoin: Boolean = conf.getConf(COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED) def enableColumnarSortMergeJoin: Boolean = conf.getConf(COLUMNAR_SORTMERGEJOIN_ENABLED) @@ -774,6 +776,13 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val COLUMNAR_COLUMNAR_TO_COLUMNAR_ENABLED = + buildConf("spark.gluten.sql.columnar.columnarToColumnar") + .internal() + .doc("Enable or disable columnar columnarToColumnar.") + .booleanConf + .createWithDefault(false) + val COLUMNAR_SORTMERGEJOIN_ENABLED = buildConf("spark.gluten.sql.columnar.sortMergeJoin") .internal()