From e82c97a335e6839c0036cb0e459050e02b25eecc Mon Sep 17 00:00:00 2001 From: Hong Date: Thu, 16 Jun 2022 17:06:45 +0800 Subject: [PATCH 01/12] [NSE-913] Add support for Hadoop 3.3.1 (#966) --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index 256c3f1f7..5f97fae48 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,12 @@ 3.2.0 + + hadoop-3.3 + + 3.3.1 + + dataproc-2.0 From 256b6373a581806c1bf9d676d398416ce99912f7 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 20 Jun 2022 15:16:43 +0800 Subject: [PATCH 02/12] [NSE-927][NSE-126] BackPort PR#975 and PR#977 to branch-1.4 (#978) * [NSE-927] Add macro __AVX512BW__ check for different CPU architecture (#975) * Add __AVX512BW__ check * Fix cFormat * [NSE-126] set default codegen opt to O1 for branch-1.4 --- native-sql-engine/cpp/src/CMakeLists.txt | 2 +- .../arrow_compute/ext/codegen_common.cc | 2 +- .../operators/columnar_to_row_converter.cc | 19 ++++++++++++++++--- native-sql-engine/cpp/src/shuffle/splitter.cc | 5 ++++- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 2e493a7ab..1a1180db9 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -495,7 +495,7 @@ file(COPY codegen/common/hash_relation_number.h DESTINATION ${root_directory}/re add_definitions(-DNATIVESQL_SRC_PATH="${root_directory}/releases") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations -Wno-attributes") -set(NATIVE_AVX512_FLAG "-march=icelake-server") +set(NATIVE_AVX512_FLAG "-march=native") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${NATIVE_AVX512_FLAG}") set(SPARK_COLUMNAR_PLUGIN_SRCS jni/jni_wrapper.cc diff --git a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc index ab40ff637..1d99cd435 100644 --- a/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc +++ b/native-sql-engine/cpp/src/codegen/arrow_compute/ext/codegen_common.cc @@ -628,7 +628,7 @@ arrow::Status CompileCodes(std::string codes, std::string signature) { } std::string env_jar = std::string(env_jar_); - std::string env_codegen_option = " -O3 -march=native "; + std::string env_codegen_option = " -O1 -march=native "; char* env_codegen_option_ = std::getenv("CODEGEN_OPTION"); if (env_codegen_option_ != nullptr) { diff --git a/native-sql-engine/cpp/src/operators/columnar_to_row_converter.cc b/native-sql-engine/cpp/src/operators/columnar_to_row_converter.cc index 8dc46052b..51626f66e 100644 --- a/native-sql-engine/cpp/src/operators/columnar_to_row_converter.cc +++ b/native-sql-engine/cpp/src/operators/columnar_to_row_converter.cc @@ -144,6 +144,7 @@ arrow::Status ColumnarToRowConverter::Init( int32_t j = 0; int32_t* length_data = lengths_.data(); +#ifdef __AVX512BW__ if (ARROW_PREDICT_TRUE(support_avx512_)) { __m256i x7_8x = _mm256_load_si256((__m256i*)x_7); __m256i x8_8x = _mm256_load_si256((__m256i*)x_8); @@ -172,6 +173,7 @@ arrow::Status ColumnarToRowConverter::Init( _mm_prefetch(&offsetarray[j + (128 + 128) / sizeof(offset_type)], _MM_HINT_T0); } } +#endif for (j; j < num_rows_; j++) { offset_type length = offsetarray[j + 1] - offsetarray[j]; @@ -192,10 +194,13 @@ arrow::Status ColumnarToRowConverter::Init( // allocate one more cache line to ease avx operations if (buffer_ == nullptr || buffer_->capacity() < total_memory_size + 64) { ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(total_memory_size + 64, memory_pool_)); +#ifdef __AVX512BW__ if (ARROW_PREDICT_TRUE(support_avx512_)) { memset(buffer_->mutable_data() + total_memory_size, 0, buffer_->capacity() - total_memory_size); - } else { + } else +#endif + { memset(buffer_->mutable_data(), 0, buffer_->capacity()); } } @@ -384,6 +389,7 @@ inline arrow::Status FillBuffer(int32_t& row_start, int32_t batch_rows, std::vector& typewidth, std::vector>& arrays, bool support_avx512) { +#ifdef __AVX512BW__ if (ARROW_PREDICT_TRUE(support_avx512)) { __m256i fill_0_8x; fill_0_8x = _mm256_xor_si256(fill_0_8x, fill_0_8x); @@ -395,6 +401,7 @@ inline arrow::Status FillBuffer(int32_t& row_start, int32_t batch_rows, } } } +#endif for (auto col_index = 0; col_index < num_cols; col_index++) { auto& array = arrays[col_index]; @@ -427,6 +434,7 @@ inline arrow::Status FillBuffer(int32_t& row_start, int32_t batch_rows, offset_type length = BinaryOffsets[j + 1] - BinaryOffsets[j]; auto value = &dataptrs[col_index][2][BinaryOffsets[j]]; +#ifdef __AVX512BW__ if (ARROW_PREDICT_TRUE(support_avx512)) { // write the variable value offset_type k; @@ -440,7 +448,9 @@ inline arrow::Status FillBuffer(int32_t& row_start, int32_t batch_rows, __m256i v = _mm256_maskz_loadu_epi8(mask, value + k); _mm256_mask_storeu_epi8(buffer_address + offsets[j] + buffer_cursor[j] + k, mask, v); - } else { + } else +#endif + { // write the variable value memcpy(buffer_address + offsets[j] + buffer_cursor[j], value, length); } @@ -508,11 +518,14 @@ inline arrow::Status FillBuffer(int32_t& row_start, int32_t batch_rows, for (auto j = row_start; j < row_start + batch_rows; j++) { if (nullvec[col_index] || (!array->IsNull(j))) { const uint8_t* srcptr = dataptr + (j << shift); +#ifdef __AVX512BW__ if (ARROW_PREDICT_TRUE(support_avx512)) { __m256i v = _mm256_maskz_loadu_epi8(mask, srcptr); _mm256_mask_storeu_epi8(buffer_address_tmp + offsets[j], mask, v); _mm_prefetch(srcptr + 64, _MM_HINT_T0); - } else { + } else +#endif + { memcpy(buffer_address_tmp + offsets[j], srcptr, typewidth[col_index]); } } else { diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index cd257b9f4..c7d4d64ac 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -1250,6 +1250,7 @@ arrow::Status Splitter::SplitBinaryType(const uint8_t* src_addr, const T* src_of << " strlen = " << strlength << std::endl; } auto value_src_ptr = src_addr + src_offset_addr[src_offset]; +#ifdef __AVX512BW__ if (ARROW_PREDICT_TRUE(support_avx512_)) { // write the variable value T k; @@ -1260,7 +1261,9 @@ arrow::Status Splitter::SplitBinaryType(const uint8_t* src_addr, const T* src_of auto mask = (1L << (strlength - k)) - 1; __m256i v = _mm256_maskz_loadu_epi8(mask, value_src_ptr + k); _mm256_mask_storeu_epi8(dst_value_base + k, mask, v); - } else { + } else +#endif + { memcpy(dst_value_base, value_src_ptr, strlength); } dst_value_base += strlength; From 11dcf98bd8f0abca93ffe9e24d86f9618c74c4b5 Mon Sep 17 00:00:00 2001 From: lviiii Date: Thu, 21 Jul 2022 09:14:08 +0000 Subject: [PATCH 03/12] Add the strategy to fallback to Vanilla Spark shuffle manager. --- .../intel/oap/vectorized/IteratorWrapper.java | 43 ++++ .../vectorized/ShuffleSplitterJniWrapper.java | 37 ++++ .../intel/oap/vectorized/SplitIterator.java | 189 ++++++++++++++++++ .../com/intel/oap/GazellePluginConfig.scala | 5 + .../intel/oap/expression/ConverterUtils.scala | 44 ++-- .../oap/extension/ColumnarOverrides.scala | 4 +- .../columnar/ColumnarGuardRule.scala | 3 +- .../ArrowColumnarBatchSerializer.scala | 53 ++++- .../CloseablePartitionedBlockIterator.scala | 58 ++++++ .../ColumnarShuffleExchangeExec.scala | 151 +++++++++++--- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 177 ++++++++++++++++ native-sql-engine/cpp/src/shuffle/splitter.cc | 118 ++++++++--- native-sql-engine/cpp/src/shuffle/splitter.h | 38 ++++ 13 files changed, 834 insertions(+), 86 deletions(-) create mode 100644 native-sql-engine/core/src/main/java/com/intel/oap/vectorized/IteratorWrapper.java create mode 100644 native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java create mode 100644 native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/IteratorWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/IteratorWrapper.java new file mode 100644 index 000000000..8f24b38a5 --- /dev/null +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/IteratorWrapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.vectorized; + + +import scala.collection.convert.Wrappers; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +public class IteratorWrapper { + + private Iterator> in; + + public IteratorWrapper(Iterator> in) { + this.in = in; + } + + public boolean hasNext() { + return in.hasNext(); + } + + public List next() { + return in.next(); + } + +} diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java index 93d5f3223..f8f27f527 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java @@ -83,6 +83,28 @@ public native long nativeMake( long memoryPoolId, boolean writeSchema); + public long make( + NativePartitioning part, + long offheapPerTask, + int bufferSize) { + return initSplit( + part.getShortName(), + part.getNumPartitions(), + part.getSchema(), + part.getExprList(), + offheapPerTask, + bufferSize + ); + } + + public native long initSplit( + String shortName, + int numPartitions, + byte[] schema, + byte[] exprList, + long offheapPerTask, + int bufferSize); + /** * * Spill partition data to disk. @@ -127,6 +149,21 @@ public native long split( */ public native SplitResult stop(long splitterId) throws IOException; + public native byte[][] cacheBuffer( + long splitterId, + int numRows) + throws RuntimeException; + + + /** + * Clear the buffer. And stop processing splitting + * + * @param splitterId splitter instance id + * @return SplitResult + */ + public native SplitResult clear(long splitterId) throws IOException; + + /** * Release resources associated with designated splitter instance. * diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java new file mode 100644 index 000000000..9e82573a7 --- /dev/null +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.vectorized; + + +import com.intel.oap.expression.ConverterUtils; +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BufferLayout; +import org.apache.arrow.vector.ipc.message.ArrowBuffer; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +public class SplitIterator implements Iterator{ + + public static class IteratorOptions implements Serializable { + private static final long serialVersionUID = -1L; + + private int partitionNum; + + private String name; + + private long offheapPerTask; + + private int bufferSize; + + private String expr; + + private Schema schema; + + public NativePartitioning getNativePartitioning() { + return nativePartitioning; + } + + public void setNativePartitioning(NativePartitioning nativePartitioning) { + this.nativePartitioning = nativePartitioning; + } + + NativePartitioning nativePartitioning; + + public int getPartitionNum() { + return partitionNum; + } + + public void setPartitionNum(int partitionNum) { + this.partitionNum = partitionNum; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getOffheapPerTask() { + return offheapPerTask; + } + + public void setOffheapPerTask(long offheapPerTask) { + this.offheapPerTask = offheapPerTask; + } + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } + + public String getExpr() { + return expr; + } + + public void setExpr(String expr) { + this.expr = expr; + } + + public Schema getSchema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + } + + ShuffleSplitterJniWrapper jniWrapper; + + private long nativeSplitter = 0; + private final Iterator iterator; + private final IteratorOptions options; + + public SplitIterator(ShuffleSplitterJniWrapper jniWrapper, + Iterator iterator, IteratorOptions options) { + this.jniWrapper = jniWrapper; + this.iterator = iterator; + this.options = options; + } + + private void nativeCreateInstance() { + ColumnarBatch cb = iterator.next(); + ArrowRecordBatch recordBatch = ConverterUtils.createArrowRecordBatch(cb); + try { + nativeSplitter = jniWrapper.make( + options.nativePartitioning, + options.offheapPerTask, + options.bufferSize); + int len = recordBatch.getBuffers().size(); + long[] bufAddrs = new long[len]; + long[] bufSizes = new long[len]; + int i = 0, j = 0; + for (ArrowBuf buffer: recordBatch.getBuffers()) { + bufAddrs[i++] = buffer.memoryAddress(); + } + for (ArrowBuffer buffer: recordBatch.getBuffersLayout()) { + bufSizes[j++] = buffer.getSize(); + } + jniWrapper.split(nativeSplitter, cb.numRows(), bufAddrs, bufSizes, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + private native boolean nativeHasNext(long instance); + + // first + @Override + public boolean hasNext() { + if (nativeHasNext(nativeSplitter)) { + return true; + } else if (!iterator.hasNext()) { + nativeCreateInstance(); + } + return nativeHasNext(nativeSplitter); + } + + private native byte[] nativeNext(long instance); + + @Override + public ColumnarBatch next() { + byte[] serializedRecordBatch = nativeNext(nativeSplitter); + return ConverterUtils.createRecordBatch(serializedRecordBatch, options.getSchema()); + } + + private native int nativeNextPartitionId(long nativeSplitter); + + public int nextPartitionId() { + return nativeNextPartitionId(nativeSplitter); + } + + @Override + protected void finalize() throws Throwable { + try { + jniWrapper.clear(nativeSplitter); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index 408c70e0e..459006000 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -83,6 +83,11 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val enableColumnarShuffledHashJoin: Boolean = conf.getConfString("spark.oap.sql.columnar.shuffledhashjoin", "true").toBoolean && enableCpu + // enable or disable fallback shuffle manager + val enableFallbackShuffle: Boolean = conf + .getConfString("spark.oap.sql.columnar.enableFallbackShuffle", "true") + .equals("true") && enableCpu + val enableArrowColumnarToRow: Boolean = conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index 6dfbcece5..bf787bdb3 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, IOException} import java.nio.channels.Channels import java.nio.ByteBuffer import java.util.ArrayList - import com.intel.oap.vectorized.ArrowWritableColumnVector import io.netty.buffer.{ByteBufAllocator, ByteBufOutputStream} import org.apache.arrow.memory.ArrowBuf @@ -47,30 +46,38 @@ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import io.netty.buffer.{ByteBuf, ByteBufAllocator, ByteBufOutputStream} -import java.nio.channels.{Channels, WritableByteChannel} - import com.google.common.collect.Lists +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer + import java.io.{InputStream, OutputStream} import java.util -import java.util.concurrent.TimeUnit.SECONDS - import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.types.TimeUnit -import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID -import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision} import org.apache.spark.sql.catalyst.util.DateTimeConstants -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkSchemaUtils -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils +import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils, SparkVectorUtils} object ConverterUtils extends Logging { def calcuateEstimatedSize(columnarBatch: ColumnarBatch): Long = { SparkVectorUtils.estimateSize(columnarBatch) } + def createRecordBatch(serializedRecordBatch: Array[Byte], schema: Schema): ColumnarBatch = { + val allocator = SparkMemoryUtils.contextAllocatorForBufferImport + val resultBatch = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch) + if (resultBatch == null) { + val resultColumnVectors = + ArrowWritableColumnVector.loadColumns(0, schema, resultBatch).toArray + new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 0) + } else { + val resultColumnVectorList = + ConverterUtils.fromArrowRecordBatch(schema, resultBatch) + val length = resultBatch.getLength() + ConverterUtils.releaseArrowRecordBatch(resultBatch) + new ColumnarBatch(resultColumnVectorList.map(v => v.asInstanceOf[ColumnVector]), length) + } + } + def createArrowRecordBatch(columnarBatch: ColumnarBatch): ArrowRecordBatch = { SparkVectorUtils.toArrowRecordBatch(columnarBatch) } @@ -368,6 +375,19 @@ object ConverterUtils extends Logging { } } + def getShortAttributeName(attr: Attribute): String = { + val index = attr.name.indexOf("(") + if (index != -1) { + attr.name.substring(0, index) + } else { + attr.name + } + } + + def genColumnNameWithExprId(attr: Attribute): String = { + getShortAttributeName(attr) + "#" + attr.exprId.id + } + def getResultAttrFromExpr( fieldExpr: Expression, name: String = "None", diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 461b55503..4f454a5b0 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -182,7 +182,7 @@ case class ColumnarPreOverrides(session: SparkSession) extends Rule[SparkPlan] { case plan: ShuffleExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - if ((child.supportsColumnar || columnarConf.enablePreferColumnar) && columnarConf.enableColumnarShuffle) { + if ((child.supportsColumnar || columnarConf.enablePreferColumnar) && (columnarConf.enableColumnarShuffle || columnarConf.enableFallbackShuffle)) { if (isSupportAdaptive) { new ColumnarShuffleExchangeAdaptor( plan.outputPartitioning, @@ -289,7 +289,7 @@ case class ColumnarPreOverrides(session: SparkSession) extends Rule[SparkPlan] { case plan if (SparkShimLoader.getSparkShims.isCustomShuffleReaderExec(plan) - && columnarConf.enableColumnarShuffle) => + && (columnarConf.enableColumnarShuffle || columnarConf.enableFallbackShuffle)) => val child = SparkShimLoader.getSparkShims.getChildOfCustomShuffleReaderExec(plan) val partitionSpecs = SparkShimLoader.getSparkShims.getPartitionSpecsOfCustomShuffleReaderExec(plan) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index 16cb31216..e933a96b4 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -56,6 +56,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { val preferColumnar = columnarConf.enablePreferColumnar val optimizeLevel = columnarConf.joinOptimizationThrottle val enableColumnarShuffle = columnarConf.enableColumnarShuffle + val enableFallbackShuffle = columnarConf.enableFallbackShuffle val enableColumnarSort = columnarConf.enableColumnarSort val enableColumnarWindow = columnarConf.enableColumnarWindow val enableColumnarSortMergeJoin = columnarConf.enableColumnarSortMergeJoin @@ -133,7 +134,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { if (!enableColumnarSort) return false new ColumnarSortExec(plan.sortOrder, plan.global, plan.child, plan.testSpillFrequency) case plan: ShuffleExchangeExec => - if (!enableColumnarShuffle) return false + if (!enableColumnarShuffle && !enableFallbackShuffle) return false new ColumnarShuffleExchangeExec( plan.outputPartitioning, plan.child) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala index a5e4d814b..d910bdc84 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala @@ -19,22 +19,19 @@ package com.intel.oap.vectorized import java.io._ import java.nio.ByteBuffer - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag - import com.intel.oap.GazellePluginConfig import com.intel.oap.expression.ConverterUtils import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer import org.apache.arrow.memory.ArrowBuf import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.vector.ipc.ArrowStreamReader -import org.apache.arrow.vector.VectorLoader -import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.{ArrowStreamReader, WriteChannel} +import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer} +import org.apache.arrow.vector.{FieldVector, VectorLoader, VectorSchemaRoot} import org.apache.arrow.vector.types.pojo.Schema - import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.serializer.DeserializationStream @@ -43,6 +40,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.SerializerInstance import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkVectorUtils.toArrowRecordBatch import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -50,6 +48,8 @@ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnVector import org.apache.spark.sql.vectorized.ColumnarBatch +import java.nio.channels.Channels + class ArrowColumnarBatchSerializer( schema: StructType, readBatchNumRows: SQLMetric, numOutputRows: SQLMetric) extends Serializer with Serializable { @@ -252,9 +252,44 @@ private class ArrowColumnarBatchSerializerInstance( } } - // Columnar shuffle write process don't need this. - override def serializeStream(s: OutputStream): SerializationStream = - throw new UnsupportedOperationException + override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { + + // 32768 + private[this] var writeBuffer: Array[Byte] = new Array[Byte](8196) + + override def writeKey[T: ClassTag](key: T): SerializationStream = { + // The key is only needed on the map side when computing partition ids. + // It does not need to be shuffled. + assert(null == key || key.isInstanceOf[Int]) + this + } + + override def writeValue[T: ClassTag](value: T): SerializationStream = { + val cb = value.asInstanceOf[ColumnarBatch] + val recordBatch = ConverterUtils.createArrowRecordBatch(cb) + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), recordBatch) + this + } + + override def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream = { + // This method is never called by shuffle code + throw new UnsupportedOperationException + } + + override def writeObject[T: ClassTag](t: T): SerializationStream = { + // This method is never called by shuffle code + throw new UnsupportedOperationException + } + + override def flush(): Unit = { + out.flush() + } + + override def close(): Unit = { + writeBuffer = null + out.close() + } + } // These methods are never called by shuffle code. override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala new file mode 100644 index 000000000..ff6b27231 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.vectorized + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * An Iterator that insures that the batches [[ColumnarBatch]]s it iterates over are all closed + * properly. + */ +class CloseablePartitionedBlockIterator(itr: Iterator[Product2[Int, ColumnarBatch]]) + extends Iterator[Product2[Int, ColumnarBatch]] + with Logging { + var cb: ColumnarBatch = null + + private def closeCurrentBatch(): Unit = { + itr.finalize() + if (cb != null) { + cb.close() + cb = null + } + } + + + TaskContext.get().addTaskCompletionListener[Unit] { _ => + closeCurrentBatch() + } + + override def hasNext: Boolean = { + itr.hasNext + } + + override def next(): Product2[Int, ColumnarBatch] = { + closeCurrentBatch() + val value = itr.next() + cb = value._2 + value + } + +} diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 54dcfebf3..41a65b822 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution import com.google.common.collect.Lists import com.intel.oap.expression.{CodeGeneration, ColumnarExpression, ColumnarExpressionConverter, ConverterUtils} import com.intel.oap.GazellePluginConfig -import com.intel.oap.vectorized.{ArrowColumnarBatchSerializer, ArrowWritableColumnVector, NativePartitioning} +import com.intel.oap.vectorized.SplitIterator.IteratorOptions +import com.intel.oap.vectorized.{ArrowColumnarBatchSerializer, ArrowWritableColumnVector, CloseablePartitionedBlockIterator, NativePartitioning, ShuffleSplitterJniWrapper, SplitIterator, SplitResult} +import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} @@ -45,11 +47,13 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.{MutablePair, Utils} + import scala.collection.JavaConverters._ import scala.concurrent.Future - import org.apache.spark.sql.util.ArrowUtils +import scala.collection.mutable.ListBuffer + case class ColumnarShuffleExchangeExec( override val outputPartitioning: Partitioning, child: SparkPlan, @@ -339,6 +343,16 @@ object ColumnarShuffleExchangeExec extends Logging { } } + private val conf = SparkEnv.get.conf + private val offHeadSize = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) + private val executorNum = conf.getInt("spark.executor.cores", 1) + private val offheapPerTask = offHeadSize / executorNum + private val nativeBufferSize = GazellePluginConfig.getSessionConf.shuffleSplitDefaultSize + + private val jniWrapper = new ShuffleSplitterJniWrapper() + private var splitResult: SplitResult = _ + private var firstRecordBatch: Boolean = true + def prepareShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -354,8 +368,9 @@ object ColumnarShuffleExchangeExec extends Logging { compressTime: SQLMetric, prepareTime: SQLMetric): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { val arrowFields = outputAttributes.map(attr => ConverterUtils.createArrowField(attr)) + var schema: Schema = null def serializeSchema(fields: Seq[Field]): Array[Byte] = { - val schema = new Schema(fields.asJava) + schema = new Schema(fields.asJava) ConverterUtils.getSchemaBytesBuf(schema) } @@ -455,39 +470,115 @@ object ColumnarShuffleExchangeExec extends Logging { // Thus in Columnar Shuffle we never use the "key" part. val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition - val rddWithDummyKey: RDD[Product2[Int, ColumnarBatch]] = newPartitioning match { - case RangePartitioning(sortingExpressions, _) => - rdd.mapPartitionsWithIndexInternal((_, cbIter) => { - val partitionKeyExtractor: InternalRow => Any = { - val projection = - UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) - row => projection(row) - } - val newIter = computeAndAddPartitionId(cbIter, partitionKeyExtractor) + val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] = + if (GazellePluginConfig.getSessionConf.enableColumnarShuffle) { + newPartitioning match { + case RangePartitioning(sortingExpressions, _) => + rdd.mapPartitionsWithIndexInternal((_, cbIter) => { + val partitionKeyExtractor: InternalRow => Any = { + val projection = + UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) + row => projection(row) + } + val newIter = computeAndAddPartitionId(cbIter, partitionKeyExtractor) - SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit] { _ => - newIter.closeAppendedVector() - } + SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit] { _ => + newIter.closeAppendedVector() + } - newIter - }, isOrderSensitive = isOrderSensitive) - case _ => - rdd.mapPartitionsWithIndexInternal( - (_, cbIter) => - cbIter.map { cb => - (0 until cb.numCols).foreach( - cb.column(_) - .asInstanceOf[ArrowWritableColumnVector] - .getValueVector - .setValueCount(cb.numRows)) - (0, cb) + newIter + }, isOrderSensitive = isOrderSensitive) + case _ => + rdd.mapPartitionsWithIndexInternal( + (_, cbIter) => + cbIter.map { cb => + (0 until cb.numCols).foreach( + cb.column(_) + .asInstanceOf[ArrowWritableColumnVector] + .getValueVector + .setValueCount(cb.numRows)) + (0, cb) + }, + isOrderSensitive = isOrderSensitive) + } + } else { + val options = new IteratorOptions + options.setExpr("") + options.setSchema(schema) + options.setOffheapPerTask(offheapPerTask) + options.setBufferSize(nativeBufferSize) + options.setNativePartitioning(nativePartitioning) + newPartitioning match { + case HashPartitioning(exprs, n) => + rdd.mapPartitionsWithIndexInternal( + (_, cbIter) => { + options.setPartitionNum(n) + val fields = exprs.zipWithIndex.map { + case (expr, i) => + val attribute = ConverterUtils.getAttrFromExpr(expr) + ConverterUtils.genColumnNameWithExprId(attribute) + } + options.setExpr(fields.mkString(",")) + options.setName("hash") + // ColumnarBatch Iterator + val iter = new Iterator[Product2[Int, ColumnarBatch]] { + val splitIterator = new SplitIterator(jniWrapper, + cbIter.asJava, options) + + override def hasNext: Boolean = splitIterator.hasNext + + override def next(): Product2[Int, ColumnarBatch] = + (splitIterator.nextPartitionId(), splitIterator.next()); + } + new CloseablePartitionedBlockIterator(iter) }, - isOrderSensitive = isOrderSensitive) - } + isOrderSensitive = isOrderSensitive + ) + case RoundRobinPartitioning(n) => + rdd.mapPartitionsWithIndexInternal( + (_, cbIter) => { + options.setPartitionNum(n) + options.setName("rr") + // ColumnarBatch Iterator + val iter = new Iterator[Product2[Int, ColumnarBatch]] { + val splitIterator = new SplitIterator(jniWrapper, + cbIter.asJava, options) + + override def hasNext: Boolean = splitIterator.hasNext + + override def next(): Product2[Int, ColumnarBatch] = + (splitIterator.nextPartitionId(), splitIterator.next()); + } + new CloseablePartitionedBlockIterator(iter) + }, + isOrderSensitive = isOrderSensitive + ) + case SinglePartition => + rdd.mapPartitionsWithIndexInternal( + (_, cbIter) => { + options.setPartitionNum(1) + options.setName("single") + // ColumnarBatch Iterator + val iter = new Iterator[Product2[Int, ColumnarBatch]] { + val splitIterator = new SplitIterator(jniWrapper, + cbIter.asJava, options) + + override def hasNext: Boolean = splitIterator.hasNext + + override def next(): Product2[Int, ColumnarBatch] = + (splitIterator.nextPartitionId(), splitIterator.next()); + } + new CloseablePartitionedBlockIterator(iter) + }, + isOrderSensitive = isOrderSensitive + ) + case _ => + throw new UnsupportedOperationException("Unsupported operations") + }} val dependency = new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch]( - rddWithDummyKey, + rddWithPartitionKey, new PartitionIdPassthrough(newPartitioning.numPartitions), serializer, shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics), diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 52c4b61dc..d7aaca192 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1028,6 +1028,123 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeEvaluate2( JNI_METHOD_END(nullptr) } +JNIEXPORT jboolean JNICALL +Java_com_intel_oap_vectorized_SplitIterator_nativeHasNext( + JNIEnv* env, jobject, jlong splitter_id) { + JNI_METHOD_START + auto splitter_ = shuffle_splitter_holder_.Lookup(splitter_id); + if (!splitter_) { + std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); + JniThrow(error_message); + } + return splitter_->hasNext(); + JNI_METHOD_END(false) +} + +JNIEXPORT jobject JNICALL +Java_com_intel_oap_vectorized_SplitIterator_nativeNext( + JNIEnv* env, jobject, jlong splitter_id) { + JNI_METHOD_START + auto splitter_ = shuffle_splitter_holder_.Lookup(splitter_id); + if (!splitter_) { + std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); + JniThrow(error_message); + } + jbyteArray serialized_record_batch = JniGetOrThrow( + ToBytes(env, splitter_->nextBatch()), "Error deserializing message"); + return serialized_record_batch; + + JNI_METHOD_END(nullptr) +} + +JNIEXPORT jlong JNICALL +Java_com_intel_oap_vectorized_SplitIterator_nativeNextPartitionId( + JNIEnv* env, jobject, jlong splitter_id) { + JNI_METHOD_START + auto splitter_ = shuffle_splitter_holder_.Lookup(splitter_id); + if (!splitter_) { + std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); + JniThrow(error_message); + } + return splitter_->nextPartitionId(); + JNI_METHOD_END(-1L) +} + +JNIEXPORT jlong JNICALL +Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_initSplit( + JNIEnv* env, jobject, jstring partitioning_name_jstr, jint num_partitions, + jbyteArray schema_arr, jbyteArray expr_arr, jlong offheap_per_task, jint buffer_size) { + JNI_METHOD_START + if (partitioning_name_jstr == NULL) { + JniThrow(std::string("Short partitioning name can't be null")); + return 0; + } + if (schema_arr == NULL) { + JniThrow(std::string("Make splitter schema can't be null")); + } + + auto partitioning_name_c = env->GetStringUTFChars(partitioning_name_jstr, JNI_FALSE); + auto partitioning_name = std::string(partitioning_name_c); + env->ReleaseStringUTFChars(partitioning_name_jstr, partitioning_name_c); + + auto splitOptions = SplitOptions::Defaults(); + splitOptions.prefer_spill = false; + splitOptions.buffered_write = true; + if (buffer_size > 0) { + splitOptions.buffer_size = buffer_size; + } + splitOptions.offheap_per_task = offheap_per_task; + +// auto* pool = reinterpret_cast(memory_pool_id); +// if (pool == nullptr) { +// JniThrow("Memory pool does not exist or has been closed"); +// } +// splitOptions.memory_pool = pool; + + std::shared_ptr schema; + // ValueOrDie in MakeSchema + MakeSchema(env, schema_arr, &schema); + + gandiva::ExpressionVector expr_vector = {}; + if (expr_arr != NULL) { + gandiva::FieldVector ret_types; + JniAssertOkOrThrow(MakeExprVector(env, expr_arr, &expr_vector, &ret_types), + "Failed to parse expressions protobuf"); + } + + jclass cls = env->FindClass("java/lang/Thread"); + jmethodID mid = env->GetStaticMethodID(cls, "currentThread", "()Ljava/lang/Thread;"); + jobject thread = env->CallStaticObjectMethod(cls, mid); + if (thread == NULL) { + std::cout << "Thread.currentThread() return NULL" << std::endl; + } else { + jmethodID mid_getid = env->GetMethodID(cls, "getId", "()J"); + jlong sid = env->CallLongMethod(thread, mid_getid); + splitOptions.thread_id = (int64_t)sid; + } + + jclass tc_cls = env->FindClass("org/apache/spark/TaskContext"); + jmethodID get_tc_mid = + env->GetStaticMethodID(tc_cls, "get", "()Lorg/apache/spark/TaskContext;"); + jobject tc_obj = env->CallStaticObjectMethod(tc_cls, get_tc_mid); + if (tc_obj == NULL) { + std::cout << "TaskContext.get() return NULL" << std::endl; + } else { + jmethodID get_tsk_attmpt_mid = env->GetMethodID(tc_cls, "taskAttemptId", "()J"); + jlong attmpt_id = env->CallLongMethod(tc_obj, get_tsk_attmpt_mid); + splitOptions.task_attempt_id = (int64_t)attmpt_id; + } + + auto splitter = + JniGetOrThrow(Splitter::Make(partitioning_name, std::move(schema), num_partitions, + expr_vector, std::move(splitOptions)), + "Failed create native shuffle splitter"); + std::cout << "jni.wrapper" << std::endl; + return shuffle_splitter_holder_.Insert(std::shared_ptr(splitter)); + + JNI_METHOD_END(-1L) +} + JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_nativeMake( JNIEnv* env, jobject, jstring partitioning_name_jstr, jint num_partitions, @@ -1194,6 +1311,66 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_ JNI_METHOD_END(-1L) } +JNIEXPORT jobjectArray JNICALL +Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_cacheBuffer( + JNIEnv* env, jobject obj, jlong splitter_id, + jint num_rows) { + JNI_METHOD_START + + auto splitter_ = shuffle_splitter_holder_.Lookup(splitter_id); + if (!splitter_) { + std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); + JniThrow(error_message); + } + //const auto& partition_lengths = splitter->PartitionLengths(); + jobjectArray serialized_record_batch_array = + env->NewObjectArray(50, byte_array_class, nullptr); + int index = 0; + for (auto pid = 0; pid < 50; ++pid) { + std::vector>>& rbArrays = splitter_->Collect(); + for (auto& recordBatch : rbArrays[pid]) { + jbyteArray serialized_record_batch = + JniGetOrThrow(ToBytes(env, recordBatch), "Error deserializing message"); + env->SetObjectArrayElement(serialized_record_batch_array, index++, + serialized_record_batch); + + recordBatch = nullptr; + } + } + + return serialized_record_batch_array; + JNI_METHOD_END(nullptr) +} + +JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_clear( + JNIEnv* env, jobject, jlong splitter_id) { + JNI_METHOD_START + auto splitter = shuffle_splitter_holder_.Lookup(splitter_id); + if (!splitter) { + std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); + JniThrow(error_message); + } + + JniAssertOkOrThrow(splitter->Clear(), "Native split:: splitter close failed"); + const auto& partition_lengths = splitter->PartitionLengths(); + auto partition_length_arr = env->NewLongArray(partition_lengths.size()); + auto src = reinterpret_cast(partition_lengths.data()); + env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); + + const auto& raw_partition_lengths = splitter->RawPartitionLengths(); + auto raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); + auto raw_src = reinterpret_cast(raw_partition_lengths.data()); + env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); + + jobject split_result = env->NewObject(split_result_class, split_result_constructor, splitter->TotalComputePidTime(), + splitter->TotalWriteTime(), splitter->TotalSpillTime(), + splitter->TotalCompressTime(), splitter->TotalBytesWritten(), + splitter->TotalBytesSpilled(), partition_length_arr, raw_partition_length_arr + ); + return split_result; + JNI_METHOD_END(nullptr) +} + JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_stop( JNIEnv* env, jobject, jlong splitter_id) { JNI_METHOD_START diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index cd257b9f4..a04cbb15c 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -33,6 +33,7 @@ #include #include #include +#include #include "shuffle/utils.h" #include "utils/macros.h" @@ -223,8 +224,6 @@ arrow::Status Splitter::Init() { const auto& fields = schema_->fields(); ARROW_ASSIGN_OR_RAISE(column_type_id_, ToSplitterTypeId(schema_->fields())); - partition_writer_.resize(num_partitions_); - // pre-computed row count for each partition after the record batch split partition_id_cnt_.resize(num_partitions_); // pre-allocated buffer size for each partition, unit is row count @@ -237,6 +236,8 @@ arrow::Status Splitter::Init() { partition_cached_recordbatch_.resize(num_partitions_); partition_cached_recordbatch_size_.resize(num_partitions_); + // output_rb_.resize(num_partitions_); + partition_cached_arb_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); raw_partition_lengths_.resize(num_partitions_); reducer_offset_offset_.resize(num_partitions_ + 1); @@ -294,38 +295,40 @@ arrow::Status Splitter::Init() { partition_list_builders_[i].resize(num_partitions_); } - ARROW_ASSIGN_OR_RAISE(configured_dirs_, GetConfiguredLocalDirs()); - sub_dir_selection_.assign(configured_dirs_.size(), 0); - - // Both data_file and shuffle_index_file should be set through jni. - // For test purpose, Create a temporary subdirectory in the system temporary - // dir with prefix "columnar-shuffle" - if (options_.data_file.length() == 0) { - ARROW_ASSIGN_OR_RAISE(options_.data_file, CreateTempShuffleFile(configured_dirs_[0])); - } + if (!options_.data_file.empty()) { + partition_writer_.resize(num_partitions_); - auto& ipc_write_options = options_.ipc_write_options; - ipc_write_options.memory_pool = options_.memory_pool; - ipc_write_options.use_threads = false; + ARROW_ASSIGN_OR_RAISE(configured_dirs_, GetConfiguredLocalDirs()); + sub_dir_selection_.assign(configured_dirs_.size(), 0); - if (options_.compression_type == arrow::Compression::FASTPFOR) { - ARROW_ASSIGN_OR_RAISE(ipc_write_options.codec, - arrow::util::Codec::CreateInt32(arrow::Compression::FASTPFOR)); + // Both data_file and shuffle_index_file should be set through jni. + // For test purpose, Create a temporary subdirectory in the system temporary + // dir with prefix "columnar-shuffle" + if (options_.data_file.length() == 0) { + ARROW_ASSIGN_OR_RAISE(options_.data_file, CreateTempShuffleFile(configured_dirs_[0])); + } + auto& ipc_write_options = options_.ipc_write_options; + ipc_write_options.memory_pool = options_.memory_pool; + ipc_write_options.use_threads = false; + if (options_.compression_type == arrow::Compression::FASTPFOR) { + ARROW_ASSIGN_OR_RAISE(ipc_write_options.codec, + arrow::util::Codec::CreateInt32(arrow::Compression::FASTPFOR)); + + } else if (options_.compression_type == arrow::Compression::LZ4_FRAME) { + ARROW_ASSIGN_OR_RAISE(ipc_write_options.codec, + arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME)); + } else { + ARROW_ASSIGN_OR_RAISE(ipc_write_options.codec, arrow::util::Codec::CreateInt32( + arrow::Compression::UNCOMPRESSED)); + } - } else if (options_.compression_type == arrow::Compression::LZ4_FRAME) { - ARROW_ASSIGN_OR_RAISE(ipc_write_options.codec, - arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME)); - } else { - ARROW_ASSIGN_OR_RAISE(ipc_write_options.codec, arrow::util::Codec::CreateInt32( - arrow::Compression::UNCOMPRESSED)); + // initialize tiny batch write options + tiny_bach_write_options_ = ipc_write_options; + ARROW_ASSIGN_OR_RAISE( + tiny_bach_write_options_.codec, + arrow::util::Codec::CreateInt32(arrow::Compression::UNCOMPRESSED)); } - // initialize tiny batch write options - tiny_bach_write_options_ = ipc_write_options; - ARROW_ASSIGN_OR_RAISE( - tiny_bach_write_options_.codec, - arrow::util::Codec::CreateInt32(arrow::Compression::UNCOMPRESSED)); - // Allocate first buffer for split reducer ARROW_ASSIGN_OR_RAISE(combine_buffer_, arrow::AllocateResizableBuffer(0, options_.memory_pool)); @@ -392,6 +395,55 @@ arrow::Status Splitter::Split(const arrow::RecordBatch& rb) { return arrow::Status::OK(); } +bool Splitter::hasNext() { + if (!output_rb_.empty()){ + next_partition_id = output_rb_.top().first; + next_batch = output_rb_.top().second; + } + return !output_rb_.empty(); +} + +std::shared_ptr Splitter::nextBatch() { + if (!output_rb_.empty()) { + output_rb_.pop(); + } + return next_batch; +} + +int32_t Splitter::nextPartitionId() { + return next_partition_id; +} + +/** +* Collect the rb. +*/ +std::vector>>& Splitter::Collect() { + EVAL_START("close", options_.thread_id) + // collect buffers and collect metrics + for (auto pid = 0; pid < num_partitions_; ++pid) { + CacheRecordBatch(pid, true); + if (partition_cached_recordbatch_size_[pid] > 0) { + std::cout << "partition data is: " << pid << std::endl; + if (partition_cached_arb_[pid].size() == 0) { + std::cout << "partition_cached_arb_ is null. " << std::endl; + } + } + } + EVAL_END("close", options_.thread_id, options_.task_attempt_id) + return partition_cached_arb_; +} + + +arrow::Status Splitter::Clear() { + EVAL_START("close", options_.thread_id) + //ClearCache(); + this -> combine_buffer_.reset(); + this -> schema_payload_.reset(); + partition_buffers_.clear(); + EVAL_END("close", options_.thread_id, options_.task_attempt_id) + return arrow::Status::OK(); +} + arrow::Status Splitter::Stop() { EVAL_START("write", options_.thread_id) // open data file output stream @@ -599,8 +651,10 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer arrow::ipc::GetRecordBatchPayload(*batch, tiny_bach_write_options_, payload.get())); #endif - + output_rb_.emplace(std::pair(partition_id, batch)); + partition_cached_arb_[partition_id].push_back(batch); partition_cached_recordbatch_size_[partition_id] += payload->body_length; + std::cout << "partition_id: " << partition_id << "size: " << partition_cached_recordbatch_size_[partition_id] << std::endl; partition_cached_recordbatch_[partition_id].push_back(std::move(payload)); partition_buffer_idx_base_[partition_id] = 0; } @@ -787,10 +841,10 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { } if (partition_to_spill != -1) { RETURN_NOT_OK(SpillPartition(partition_to_spill)); -#ifdef DEBUG +//#ifdef DEBUG std::cout << "Spilled partition " << std::to_string(partition_to_spill) << ", " << std::to_string(max_size) << " bytes released" << std::endl; -#endif +//#endif *size = max_size; } else { *size = 0; diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index f27c061bf..de919f58b 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -28,6 +28,7 @@ #include #include #include +#include #include "shuffle/type.h" #include "shuffle/utils.h" @@ -72,11 +73,38 @@ class Splitter { */ virtual arrow::Status Split(const arrow::RecordBatch&); + /** + * Iterator for splitting rb + */ + virtual bool hasNext(); + + /** + * Iterator for splitting rb + */ + virtual std::shared_ptr nextBatch(); + + /** + * Iterator for splitting rb + */ + virtual int32_t nextPartitionId(); + /** * Compute the compresse size of record batch. */ virtual int64_t CompressedSize(const arrow::RecordBatch&); + /** + * Collect the rb. + */ + virtual std::vector>>& Collect(); + + + /** + * Clear the buffer. And stop processing splitting + */ + arrow::Status Clear(); + + /** * For each partition, merge spilled file into shuffle data file and write any * cached record batch to shuffle data file. Close all resources and collect @@ -231,6 +259,16 @@ class Splitter { // page std::shared_ptr combine_buffer_; + + int32_t next_partition_id = -1; + std::shared_ptr next_batch = nullptr; + + std::stack>> output_rb_; + + // partid + std::vector>> + partition_cached_arb_; + // partid std::vector>> partition_cached_recordbatch_; From b96bc62636930c1a38fbdfa68399b9e425afcf00 Mon Sep 17 00:00:00 2001 From: lviiii Date: Mon, 25 Jul 2022 14:05:19 +0000 Subject: [PATCH 04/12] Fix the problem "wrong data". --- .../vectorized/ShuffleSplitterJniWrapper.java | 5 ++ .../intel/oap/vectorized/SplitIterator.java | 48 +++++++++---------- .../intel/oap/expression/ConverterUtils.scala | 12 ++--- ...> CloseablePartitionedBatchIterator.scala} | 4 +- .../ColumnarShuffleExchangeExec.scala | 37 +++++--------- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 16 ++++++- native-sql-engine/cpp/src/shuffle/splitter.cc | 42 ++++++++++------ 7 files changed, 89 insertions(+), 75 deletions(-) rename native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/{CloseablePartitionedBlockIterator.scala => CloseablePartitionedBatchIterator.scala} (90%) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java index f8f27f527..a9d5f59fa 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java @@ -135,6 +135,11 @@ public native long split( long splitterId, int numRows, long[] bufAddrs, long[] bufSizes, boolean firstRecordBatch) throws IOException; + /** + * Collect the record batch after splitting. + */ + public native void collect(long splitterId, int numRows) throws IOException; + /** * Update the compress type. */ diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index 9e82573a7..ffb0f1231 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -19,22 +19,14 @@ import com.intel.oap.expression.ConverterUtils; -import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer; import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.BufferLayout; import org.apache.arrow.vector.ipc.message.ArrowBuffer; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; -import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; -import java.util.stream.Collectors; public class SplitIterator implements Iterator{ @@ -51,8 +43,6 @@ public static class IteratorOptions implements Serializable { private String expr; - private Schema schema; - public NativePartitioning getNativePartitioning() { return nativePartitioning; } @@ -103,13 +93,6 @@ public void setExpr(String expr) { this.expr = expr; } - public Schema getSchema() { - return schema; - } - - public void setSchema(Schema schema) { - this.schema = schema; - } } ShuffleSplitterJniWrapper jniWrapper; @@ -130,9 +113,9 @@ private void nativeCreateInstance() { ArrowRecordBatch recordBatch = ConverterUtils.createArrowRecordBatch(cb); try { nativeSplitter = jniWrapper.make( - options.nativePartitioning, - options.offheapPerTask, - options.bufferSize); + options.getNativePartitioning(), + options.getOffheapPerTask(), + options.getBufferSize()); int len = recordBatch.getBuffers().size(); long[] bufAddrs = new long[len]; long[] bufSizes = new long[len]; @@ -143,7 +126,8 @@ private void nativeCreateInstance() { for (ArrowBuffer buffer: recordBatch.getBuffersLayout()) { bufSizes[j++] = buffer.getSize(); } - jniWrapper.split(nativeSplitter, cb.numRows(), bufAddrs, bufSizes, true); + jniWrapper.split(nativeSplitter, cb.numRows(), bufAddrs, bufSizes, false); + jniWrapper.collect(nativeSplitter, cb.numRows()); } catch (IOException e) { throw new RuntimeException(e); } @@ -152,12 +136,26 @@ private void nativeCreateInstance() { private native boolean nativeHasNext(long instance); - // first + /** + * First to check, + * @return + */ @Override public boolean hasNext() { + + // 1. Init the native splitter + if (nativeSplitter == 0) { + if (!iterator.hasNext()) { + return false; + } else { + nativeCreateInstance(); + } + } + // 2. Call native hasNext if (nativeHasNext(nativeSplitter)) { return true; - } else if (!iterator.hasNext()) { + } else if (iterator.hasNext()) { + // 3. Split next rb nativeCreateInstance(); } return nativeHasNext(nativeSplitter); @@ -168,7 +166,9 @@ public boolean hasNext() { @Override public ColumnarBatch next() { byte[] serializedRecordBatch = nativeNext(nativeSplitter); - return ConverterUtils.createRecordBatch(serializedRecordBatch, options.getSchema()); + ColumnarBatch cb = ConverterUtils.createRecordBatch(serializedRecordBatch, + options.getNativePartitioning().getSchema()); + return cb; } private native int nativeNextPartitionId(long nativeSplitter); diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index bf787bdb3..54309a499 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -62,17 +62,15 @@ object ConverterUtils extends Logging { SparkVectorUtils.estimateSize(columnarBatch) } - def createRecordBatch(serializedRecordBatch: Array[Byte], schema: Schema): ColumnarBatch = { + def createRecordBatch(serializedRecordBatch: Array[Byte], serializedSchema: Array[Byte]): ColumnarBatch = { + val schema = ConverterUtils.getSchemaFromBytesBuf(serializedSchema); val allocator = SparkMemoryUtils.contextAllocatorForBufferImport val resultBatch = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch) if (resultBatch == null) { - val resultColumnVectors = - ArrowWritableColumnVector.loadColumns(0, schema, resultBatch).toArray - new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), 0) + throw new Exception("Error from SerializedRecordBatch to ColumnarBatch.") } else { - val resultColumnVectorList = - ConverterUtils.fromArrowRecordBatch(schema, resultBatch) - val length = resultBatch.getLength() + val resultColumnVectorList = fromArrowRecordBatch(schema, resultBatch) + val length = resultBatch.getLength ConverterUtils.releaseArrowRecordBatch(resultBatch) new ColumnarBatch(resultColumnVectorList.map(v => v.asInstanceOf[ColumnVector]), length) } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBatchIterator.scala similarity index 90% rename from native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala rename to native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBatchIterator.scala index ff6b27231..fc87a4b47 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBlockIterator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/CloseablePartitionedBatchIterator.scala @@ -19,20 +19,18 @@ package com.intel.oap.vectorized import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.vectorized.ColumnarBatch /** * An Iterator that insures that the batches [[ColumnarBatch]]s it iterates over are all closed * properly. */ -class CloseablePartitionedBlockIterator(itr: Iterator[Product2[Int, ColumnarBatch]]) +class CloseablePartitionedBatchIterator(itr: Iterator[Product2[Int, ColumnarBatch]]) extends Iterator[Product2[Int, ColumnarBatch]] with Logging { var cb: ColumnarBatch = null private def closeCurrentBatch(): Unit = { - itr.finalize() if (cb != null) { cb.close() cb = null diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 41a65b822..7c7421e14 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -21,8 +21,7 @@ import com.google.common.collect.Lists import com.intel.oap.expression.{CodeGeneration, ColumnarExpression, ColumnarExpressionConverter, ConverterUtils} import com.intel.oap.GazellePluginConfig import com.intel.oap.vectorized.SplitIterator.IteratorOptions -import com.intel.oap.vectorized.{ArrowColumnarBatchSerializer, ArrowWritableColumnVector, CloseablePartitionedBlockIterator, NativePartitioning, ShuffleSplitterJniWrapper, SplitIterator, SplitResult} -import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer +import com.intel.oap.vectorized.{ArrowColumnarBatchSerializer, ArrowWritableColumnVector, CloseablePartitionedBatchIterator, NativePartitioning, ShuffleSplitterJniWrapper, SplitIterator, SplitResult} import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} @@ -36,10 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.CoalesceExec.EmptyPartition import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.createShuffleWriteProcessor import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} @@ -50,9 +47,6 @@ import org.apache.spark.util.{MutablePair, Utils} import scala.collection.JavaConverters._ import scala.concurrent.Future -import org.apache.spark.sql.util.ArrowUtils - -import scala.collection.mutable.ListBuffer case class ColumnarShuffleExchangeExec( override val outputPartitioning: Partitioning, @@ -504,7 +498,6 @@ object ColumnarShuffleExchangeExec extends Logging { } else { val options = new IteratorOptions options.setExpr("") - options.setSchema(schema) options.setOffheapPerTask(offheapPerTask) options.setBufferSize(nativeBufferSize) options.setNativePartitioning(nativePartitioning) @@ -530,7 +523,7 @@ object ColumnarShuffleExchangeExec extends Logging { override def next(): Product2[Int, ColumnarBatch] = (splitIterator.nextPartitionId(), splitIterator.next()); } - new CloseablePartitionedBlockIterator(iter) + new CloseablePartitionedBatchIterator(iter) }, isOrderSensitive = isOrderSensitive ) @@ -549,27 +542,21 @@ object ColumnarShuffleExchangeExec extends Logging { override def next(): Product2[Int, ColumnarBatch] = (splitIterator.nextPartitionId(), splitIterator.next()); } - new CloseablePartitionedBlockIterator(iter) + new CloseablePartitionedBatchIterator(iter) }, isOrderSensitive = isOrderSensitive ) case SinglePartition => rdd.mapPartitionsWithIndexInternal( - (_, cbIter) => { - options.setPartitionNum(1) - options.setName("single") - // ColumnarBatch Iterator - val iter = new Iterator[Product2[Int, ColumnarBatch]] { - val splitIterator = new SplitIterator(jniWrapper, - cbIter.asJava, options) - - override def hasNext: Boolean = splitIterator.hasNext - - override def next(): Product2[Int, ColumnarBatch] = - (splitIterator.nextPartitionId(), splitIterator.next()); - } - new CloseablePartitionedBlockIterator(iter) - }, + (_, cbIter) => + cbIter.map { cb => + (0 until cb.numCols).foreach( + cb.column(_) + .asInstanceOf[ArrowWritableColumnVector] + .getValueVector + .setValueCount(cb.numRows)) + (0, cb) + }, isOrderSensitive = isOrderSensitive ) case _ => diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index d7aaca192..c30c5d4d3 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1139,7 +1139,6 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_initSplit( JniGetOrThrow(Splitter::Make(partitioning_name, std::move(schema), num_partitions, expr_vector, std::move(splitOptions)), "Failed create native shuffle splitter"); - std::cout << "jni.wrapper" << std::endl; return shuffle_splitter_holder_.Insert(std::shared_ptr(splitter)); JNI_METHOD_END(-1L) @@ -1311,6 +1310,19 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_ JNI_METHOD_END(-1L) } +JNIEXPORT void JNICALL +Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_collect( + JNIEnv* env, jobject obj, jlong splitter_id, jint num_rows) { + JNI_METHOD_START + auto splitter_ = shuffle_splitter_holder_.Lookup(splitter_id); + if (!splitter_) { + std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); + JniThrow(error_message); + } + splitter_->Collect(); + JNI_METHOD_END() +} + JNIEXPORT jobjectArray JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_cacheBuffer( JNIEnv* env, jobject obj, jlong splitter_id, @@ -1334,7 +1346,7 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_cacheBuffer( env->SetObjectArrayElement(serialized_record_batch_array, index++, serialized_record_batch); - recordBatch = nullptr; + // recordBatch = nullptr; } } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index a04cbb15c..c34bfd08c 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -236,7 +236,6 @@ arrow::Status Splitter::Init() { partition_cached_recordbatch_.resize(num_partitions_); partition_cached_recordbatch_size_.resize(num_partitions_); - // output_rb_.resize(num_partitions_); partition_cached_arb_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); raw_partition_lengths_.resize(num_partitions_); @@ -407,6 +406,10 @@ std::shared_ptr Splitter::nextBatch() { if (!output_rb_.empty()) { output_rb_.pop(); } +// #ifndef DEBUG +// std::cout << "Output partitionid is: " << next_partition_id << +// ", output_batch_rows: " << next_batch->num_rows() << std::endl; +// #endif return next_batch; } @@ -415,18 +418,18 @@ int32_t Splitter::nextPartitionId() { } /** -* Collect the rb. +* Collect the rb after splitting. */ std::vector>>& Splitter::Collect() { EVAL_START("close", options_.thread_id) // collect buffers and collect metrics for (auto pid = 0; pid < num_partitions_; ++pid) { - CacheRecordBatch(pid, true); - if (partition_cached_recordbatch_size_[pid] > 0) { - std::cout << "partition data is: " << pid << std::endl; - if (partition_cached_arb_[pid].size() == 0) { - std::cout << "partition_cached_arb_ is null. " << std::endl; - } + if (partition_buffer_idx_base_[pid] > 0) { + #ifdef DEBUG + std::cout << "Collect buffers to output, cache the record batch, current partition id is " << pid << + ", partition_buffer_idx_base_ is: " << partition_buffer_idx_base_[pid] << std::endl; + #endif + CacheRecordBatch(pid, true); } } EVAL_END("close", options_.thread_id, options_.task_attempt_id) @@ -436,7 +439,12 @@ std::vector>>& Splitter::Collect arrow::Status Splitter::Clear() { EVAL_START("close", options_.thread_id) - //ClearCache(); + next_batch = nullptr; + for (auto pid = 0; pid < num_partitions_; ++pid) { + partition_cached_arb_[pid].clear(); + partition_cached_recordbatch_[pid].clear(); + partition_cached_recordbatch_size_[pid] = 0; + } this -> combine_buffer_.reset(); this -> schema_payload_.reset(); partition_buffers_.clear(); @@ -651,10 +659,10 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer arrow::ipc::GetRecordBatchPayload(*batch, tiny_bach_write_options_, payload.get())); #endif - output_rb_.emplace(std::pair(partition_id, batch)); - partition_cached_arb_[partition_id].push_back(batch); + std::pair> part_batch = std::make_pair(partition_id, batch); + output_rb_.emplace(part_batch); + // partition_cached_arb_[partition_id].push_back(batch); partition_cached_recordbatch_size_[partition_id] += payload->body_length; - std::cout << "partition_id: " << partition_id << "size: " << partition_cached_recordbatch_size_[partition_id] << std::endl; partition_cached_recordbatch_[partition_id].push_back(std::move(payload)); partition_buffer_idx_base_[partition_id] = 0; } @@ -841,10 +849,10 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { } if (partition_to_spill != -1) { RETURN_NOT_OK(SpillPartition(partition_to_spill)); -//#ifdef DEBUG +#ifdef DEBUG std::cout << "Spilled partition " << std::to_string(partition_to_spill) << ", " << std::to_string(max_size) << " bytes released" << std::endl; -//#endif +#endif *size = max_size; } else { *size = 0; @@ -986,6 +994,12 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { // update partition buffer base after split for (auto pid = 0; pid < num_partitions_; ++pid) { partition_buffer_idx_base_[pid] += partition_id_cnt_[pid]; + #ifdef DEBUG + if (partition_buffer_idx_base_[pid] > 0) { + std::cout << "Update partition buffer base after split, current partition id is " << pid << + ", partition_buffer_idx_base_ is: " << partition_buffer_idx_base_[pid] << std::endl; + } + #endif } return arrow::Status::OK(); From 7c516963cf767f9a3e7e0e5a212e1914b852dd88 Mon Sep 17 00:00:00 2001 From: jiao_lv Date: Thu, 28 Jul 2022 14:01:04 +0800 Subject: [PATCH 05/12] Update GazellePluginConfig.scala Disabled the configuration "spark.oap.sql.columnar.enableFallbackShuffle". --- .../core/src/main/scala/com/intel/oap/GazellePluginConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index 459006000..67c4a4b1c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -85,7 +85,7 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { // enable or disable fallback shuffle manager val enableFallbackShuffle: Boolean = conf - .getConfString("spark.oap.sql.columnar.enableFallbackShuffle", "true") + .getConfString("spark.oap.sql.columnar.enableFallbackShuffle", "false") .equals("true") && enableCpu val enableArrowColumnarToRow: Boolean = From ca9e99586a4c9cdb821620d435c039aab3208ffd Mon Sep 17 00:00:00 2001 From: lviiii Date: Sun, 7 Aug 2022 19:44:36 +0000 Subject: [PATCH 06/12] Fix the oom issue. --- .../vectorized/ShuffleSplitterJniWrapper.java | 6 -- .../intel/oap/vectorized/SplitIterator.java | 66 ++++++++++++++----- .../ArrowColumnarBatchSerializer.scala | 14 ++-- .../ColumnarShuffleExchangeExec.scala | 25 +++++-- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 33 +--------- native-sql-engine/cpp/src/shuffle/splitter.cc | 65 +++++++++--------- native-sql-engine/cpp/src/shuffle/splitter.h | 6 +- 7 files changed, 109 insertions(+), 106 deletions(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java index a9d5f59fa..2e47f538c 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ShuffleSplitterJniWrapper.java @@ -154,12 +154,6 @@ public native long split( */ public native SplitResult stop(long splitterId) throws IOException; - public native byte[][] cacheBuffer( - long splitterId, - int numRows) - throws RuntimeException; - - /** * Clear the buffer. And stop processing splitting * diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index ffb0f1231..62e331132 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -95,27 +95,33 @@ public void setExpr(String expr) { } - ShuffleSplitterJniWrapper jniWrapper; + ShuffleSplitterJniWrapper jniWrapper = null; private long nativeSplitter = 0; private final Iterator iterator; private final IteratorOptions options; - public SplitIterator(ShuffleSplitterJniWrapper jniWrapper, - Iterator iterator, IteratorOptions options) { - this.jniWrapper = jniWrapper; + private ColumnarBatch cb = null; + + public SplitIterator(Iterator iterator, IteratorOptions options) { this.iterator = iterator; this.options = options; } private void nativeCreateInstance() { - ColumnarBatch cb = iterator.next(); ArrowRecordBatch recordBatch = ConverterUtils.createArrowRecordBatch(cb); try { + if (jniWrapper == null) { + jniWrapper = new ShuffleSplitterJniWrapper(); + } + if (nativeSplitter != 0) { + throw new Exception("NativeSplitter is not clear."); + } nativeSplitter = jniWrapper.make( options.getNativePartitioning(), options.getOffheapPerTask(), options.getBufferSize()); + int len = recordBatch.getBuffers().size(); long[] bufAddrs = new long[len]; long[] bufSizes = new long[len]; @@ -128,24 +134,43 @@ private void nativeCreateInstance() { } jniWrapper.split(nativeSplitter, cb.numRows(), bufAddrs, bufSizes, false); jniWrapper.collect(nativeSplitter, cb.numRows()); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); + } finally { + ConverterUtils.releaseArrowRecordBatch(recordBatch); + cb.close(); } } private native boolean nativeHasNext(long instance); - /** - * First to check, - * @return - */ + public boolean hasRecordBatch(){ + while (iterator.hasNext()) { + cb = iterator.next(); + if (cb.numRows() != 0 && cb.numCols() != 0) { + return true; + } + } + if (nativeSplitter != 0) { + try { + jniWrapper.clear(nativeSplitter); + nativeSplitter = 0; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { +// jniWrapper.close(nativeSplitter); + } + } + return false; + } + @Override public boolean hasNext() { - // 1. Init the native splitter if (nativeSplitter == 0) { - if (!iterator.hasNext()) { + boolean flag = hasRecordBatch(); + if (!flag) { return false; } else { nativeCreateInstance(); @@ -154,9 +179,13 @@ public boolean hasNext() { // 2. Call native hasNext if (nativeHasNext(nativeSplitter)) { return true; - } else if (iterator.hasNext()) { - // 3. Split next rb - nativeCreateInstance(); + } else { + boolean flag = hasRecordBatch(); + if (!flag) { + return false; + } else { + nativeCreateInstance(); + } } return nativeHasNext(nativeSplitter); } @@ -180,9 +209,14 @@ public int nextPartitionId() { @Override protected void finalize() throws Throwable { try { - jniWrapper.clear(nativeSplitter); + if (nativeSplitter != 0) { + jniWrapper.clear(nativeSplitter); + nativeSplitter = 0; + } } catch (IOException e) { throw new RuntimeException(e); + } finally { + jniWrapper.close(nativeSplitter); } } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala index d910bdc84..132194225 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala @@ -254,9 +254,6 @@ private class ArrowColumnarBatchSerializerInstance( override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { - // 32768 - private[this] var writeBuffer: Array[Byte] = new Array[Byte](8196) - override def writeKey[T: ClassTag](key: T): SerializationStream = { // The key is only needed on the map side when computing partition ids. // It does not need to be shuffled. @@ -267,7 +264,15 @@ private class ArrowColumnarBatchSerializerInstance( override def writeValue[T: ClassTag](value: T): SerializationStream = { val cb = value.asInstanceOf[ColumnarBatch] val recordBatch = ConverterUtils.createArrowRecordBatch(cb) - MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), recordBatch) + try { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), recordBatch) + } catch { + case e: Exception => + logError("Failed to serialize current RecordBatch", e) + } finally { + ConverterUtils.releaseArrowRecordBatch(recordBatch) + cb.close + } this } @@ -286,7 +291,6 @@ private class ArrowColumnarBatchSerializerInstance( } override def close(): Unit = { - writeBuffer = null out.close() } } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 7c7421e14..8ba380ff0 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -362,9 +362,8 @@ object ColumnarShuffleExchangeExec extends Logging { compressTime: SQLMetric, prepareTime: SQLMetric): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { val arrowFields = outputAttributes.map(attr => ConverterUtils.createArrowField(attr)) - var schema: Schema = null def serializeSchema(fields: Seq[Field]): Array[Byte] = { - schema = new Schema(fields.asJava) + val schema = new Schema(fields.asJava) ConverterUtils.getSchemaBytesBuf(schema) } @@ -515,8 +514,7 @@ object ColumnarShuffleExchangeExec extends Logging { options.setName("hash") // ColumnarBatch Iterator val iter = new Iterator[Product2[Int, ColumnarBatch]] { - val splitIterator = new SplitIterator(jniWrapper, - cbIter.asJava, options) + val splitIterator = new SplitIterator(cbIter.asJava, options) override def hasNext: Boolean = splitIterator.hasNext @@ -534,8 +532,7 @@ object ColumnarShuffleExchangeExec extends Logging { options.setName("rr") // ColumnarBatch Iterator val iter = new Iterator[Product2[Int, ColumnarBatch]] { - val splitIterator = new SplitIterator(jniWrapper, - cbIter.asJava, options) + val splitIterator = new SplitIterator(cbIter.asJava, options) override def hasNext: Boolean = splitIterator.hasNext @@ -560,7 +557,21 @@ object ColumnarShuffleExchangeExec extends Logging { isOrderSensitive = isOrderSensitive ) case _ => - throw new UnsupportedOperationException("Unsupported operations") + logError("Unsupported operations: newPartitioning.") + rdd.mapPartitionsWithIndexInternal( + (_, cbIter) => { + val iter = new Iterator[Product2[Int, ColumnarBatch]] { + val splitIterator = new SplitIterator(cbIter.asJava, options) + + override def hasNext: Boolean = splitIterator.hasNext + + override def next(): Product2[Int, ColumnarBatch] = + (splitIterator.nextPartitionId(), splitIterator.next()); + } + new CloseablePartitionedBatchIterator(iter) + }, + isOrderSensitive = isOrderSensitive + ) }} val dependency = diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index c30c5d4d3..4eb50f5c9 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -1319,41 +1319,10 @@ Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_collect( std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); JniThrow(error_message); } - splitter_->Collect(); + JniAssertOkOrThrow(splitter_->Collect(), "Native split: splitter collect failed"); JNI_METHOD_END() } -JNIEXPORT jobjectArray JNICALL -Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_cacheBuffer( - JNIEnv* env, jobject obj, jlong splitter_id, - jint num_rows) { - JNI_METHOD_START - - auto splitter_ = shuffle_splitter_holder_.Lookup(splitter_id); - if (!splitter_) { - std::string error_message = "Invalid splitter id " + std::to_string(splitter_id); - JniThrow(error_message); - } - //const auto& partition_lengths = splitter->PartitionLengths(); - jobjectArray serialized_record_batch_array = - env->NewObjectArray(50, byte_array_class, nullptr); - int index = 0; - for (auto pid = 0; pid < 50; ++pid) { - std::vector>>& rbArrays = splitter_->Collect(); - for (auto& recordBatch : rbArrays[pid]) { - jbyteArray serialized_record_batch = - JniGetOrThrow(ToBytes(env, recordBatch), "Error deserializing message"); - env->SetObjectArrayElement(serialized_record_batch_array, index++, - serialized_record_batch); - - // recordBatch = nullptr; - } - } - - return serialized_record_batch_array; - JNI_METHOD_END(nullptr) -} - JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_ShuffleSplitterJniWrapper_clear( JNIEnv* env, jobject, jlong splitter_id) { JNI_METHOD_START diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index e0ad0b918..2dbb25fb1 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -234,9 +234,6 @@ arrow::Status Splitter::Init() { // the offset of each partition during record batch split partition_buffer_idx_offset_.resize(num_partitions_); - partition_cached_recordbatch_.resize(num_partitions_); - partition_cached_recordbatch_size_.resize(num_partitions_); - partition_cached_arb_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); raw_partition_lengths_.resize(num_partitions_); reducer_offset_offset_.resize(num_partitions_ + 1); @@ -295,6 +292,8 @@ arrow::Status Splitter::Init() { } if (!options_.data_file.empty()) { + partition_cached_recordbatch_.resize(num_partitions_); + partition_cached_recordbatch_size_.resize(num_partitions_); partition_writer_.resize(num_partitions_); ARROW_ASSIGN_OR_RAISE(configured_dirs_, GetConfiguredLocalDirs()); @@ -420,20 +419,16 @@ int32_t Splitter::nextPartitionId() { /** * Collect the rb after splitting. */ -std::vector>>& Splitter::Collect() { +arrow::Status Splitter::Collect() { EVAL_START("close", options_.thread_id) // collect buffers and collect metrics for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_buffer_idx_base_[pid] > 0) { - #ifdef DEBUG - std::cout << "Collect buffers to output, cache the record batch, current partition id is " << pid << - ", partition_buffer_idx_base_ is: " << partition_buffer_idx_base_[pid] << std::endl; - #endif - CacheRecordBatch(pid, true); + RETURN_NOT_OK(CacheRecordBatch(pid, true)); } } EVAL_END("close", options_.thread_id, options_.task_attempt_id) - return partition_cached_arb_; + return arrow::Status::OK(); } @@ -441,9 +436,12 @@ arrow::Status Splitter::Clear() { EVAL_START("close", options_.thread_id) next_batch = nullptr; for (auto pid = 0; pid < num_partitions_; ++pid) { - partition_cached_arb_[pid].clear(); - partition_cached_recordbatch_[pid].clear(); - partition_cached_recordbatch_size_[pid] = 0; + partition_lengths_[pid] = 0; + raw_partition_lengths_[pid] = 0; + } + if (output_rb_.size() > 0) { + std::cerr << "Dirty stack output_rb_" << std::endl; + output_rb_ = std::stack>>(); } this -> combine_buffer_.reset(); this -> schema_payload_.reset(); @@ -642,28 +640,31 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer int64_t raw_size = batch_nbytes(batch); raw_partition_lengths_[partition_id] += raw_size; - auto payload = std::make_shared(); + + if (!options_.data_file.empty()) { + auto payload = std::make_shared(); #ifndef SKIPCOMPRESS - if (num_rows <= options_.batch_compress_threshold) { - TIME_NANO_OR_RAISE(total_compress_time_, - arrow::ipc::GetRecordBatchPayload( - *batch, tiny_bach_write_options_, payload.get())); - } else { - TIME_NANO_OR_RAISE(total_compress_time_, - arrow::ipc::GetRecordBatchPayload( - *batch, options_.ipc_write_options, payload.get())); - } + if (num_rows <= options_.batch_compress_threshold) { + TIME_NANO_OR_RAISE(total_compress_time_, + arrow::ipc::GetRecordBatchPayload( + *batch, tiny_bach_write_options_, payload.get())); + } else { + TIME_NANO_OR_RAISE(total_compress_time_, + arrow::ipc::GetRecordBatchPayload( + *batch, options_.ipc_write_options, payload.get())); + } #else - // for test reason - TIME_NANO_OR_RAISE(total_compress_time_, - arrow::ipc::GetRecordBatchPayload(*batch, tiny_bach_write_options_, - payload.get())); + // for test reason + TIME_NANO_OR_RAISE(total_compress_time_, + arrow::ipc::GetRecordBatchPayload(*batch, tiny_bach_write_options_, + payload.get())); #endif + partition_cached_recordbatch_size_[partition_id] += payload->body_length; + partition_cached_recordbatch_[partition_id].push_back(std::move(payload)); + } std::pair> part_batch = std::make_pair(partition_id, batch); output_rb_.emplace(part_batch); // partition_cached_arb_[partition_id].push_back(batch); - partition_cached_recordbatch_size_[partition_id] += payload->body_length; - partition_cached_recordbatch_[partition_id].push_back(std::move(payload)); partition_buffer_idx_base_[partition_id] = 0; } return arrow::Status::OK(); @@ -995,12 +996,6 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { // update partition buffer base after split for (auto pid = 0; pid < num_partitions_; ++pid) { partition_buffer_idx_base_[pid] += partition_id_cnt_[pid]; - #ifdef DEBUG - if (partition_buffer_idx_base_[pid] > 0) { - std::cout << "Update partition buffer base after split, current partition id is " << pid << - ", partition_buffer_idx_base_ is: " << partition_buffer_idx_base_[pid] << std::endl; - } - #endif } return arrow::Status::OK(); diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index de919f58b..9c4dfaa86 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -96,7 +96,7 @@ class Splitter { /** * Collect the rb. */ - virtual std::vector>>& Collect(); + arrow::Status Collect(); /** @@ -265,10 +265,6 @@ class Splitter { std::stack>> output_rb_; - // partid - std::vector>> - partition_cached_arb_; - // partid std::vector>> partition_cached_recordbatch_; From 732eeab7f293bd1dd622bf58be03e8966cfa2a4c Mon Sep 17 00:00:00 2001 From: lviiii Date: Sun, 7 Aug 2022 21:24:35 +0000 Subject: [PATCH 07/12] Remove the exception. --- .../main/java/com/intel/oap/vectorized/SplitIterator.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index 62e331132..686d3535e 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -28,8 +28,13 @@ import java.io.Serializable; import java.util.Iterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class SplitIterator implements Iterator{ + private static final Logger logger = LoggerFactory.getLogger(SplitIterator.class); + public static class IteratorOptions implements Serializable { private static final long serialVersionUID = -1L; @@ -115,7 +120,8 @@ private void nativeCreateInstance() { jniWrapper = new ShuffleSplitterJniWrapper(); } if (nativeSplitter != 0) { - throw new Exception("NativeSplitter is not clear."); + logger.error("NativeSplitter is not clear."); + // throw new Exception("NativeSplitter is not clear."); } nativeSplitter = jniWrapper.make( options.getNativePartitioning(), From 4df13d9a59e4e8084ec6b5e91ab433bcd28b161d Mon Sep 17 00:00:00 2001 From: lviiii Date: Sun, 7 Aug 2022 22:51:35 +0000 Subject: [PATCH 08/12] Remove the exception. --- .../java/com/intel/oap/vectorized/SplitIterator.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index 686d3535e..be419841f 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -120,17 +120,17 @@ private void nativeCreateInstance() { jniWrapper = new ShuffleSplitterJniWrapper(); } if (nativeSplitter != 0) { - logger.error("NativeSplitter is not clear."); + logger.warn("NativeSplitter is not clear."); + jniWrapper.clear(nativeSplitter); + nativeSplitter = 0; // throw new Exception("NativeSplitter is not clear."); } nativeSplitter = jniWrapper.make( options.getNativePartitioning(), options.getOffheapPerTask(), options.getBufferSize()); - - int len = recordBatch.getBuffers().size(); - long[] bufAddrs = new long[len]; - long[] bufSizes = new long[len]; + long[] bufAddrs = new long[recordBatch.getBuffers().size()]; + long[] bufSizes = new long[recordBatch.getBuffersLayout().size()]; int i = 0, j = 0; for (ArrowBuf buffer: recordBatch.getBuffers()) { bufAddrs[i++] = buffer.memoryAddress(); @@ -216,6 +216,7 @@ public int nextPartitionId() { protected void finalize() throws Throwable { try { if (nativeSplitter != 0) { + logger.error("NativeSplitter is not clear."); jniWrapper.clear(nativeSplitter); nativeSplitter = 0; } From 387620fc564be66da0e24bdfb41d3f4ca29b66b9 Mon Sep 17 00:00:00 2001 From: lviiii Date: Mon, 8 Aug 2022 08:21:33 +0000 Subject: [PATCH 09/12] test. --- .../com/intel/oap/vectorized/SplitIterator.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index be419841f..7464bdc53 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -120,7 +120,6 @@ private void nativeCreateInstance() { jniWrapper = new ShuffleSplitterJniWrapper(); } if (nativeSplitter != 0) { - logger.warn("NativeSplitter is not clear."); jniWrapper.clear(nativeSplitter); nativeSplitter = 0; // throw new Exception("NativeSplitter is not clear."); @@ -155,6 +154,7 @@ public boolean hasRecordBatch(){ while (iterator.hasNext()) { cb = iterator.next(); if (cb.numRows() != 0 && cb.numCols() != 0) { + nativeCreateInstance(); return true; } } @@ -175,25 +175,14 @@ public boolean hasRecordBatch(){ public boolean hasNext() { // 1. Init the native splitter if (nativeSplitter == 0) { - boolean flag = hasRecordBatch(); - if (!flag) { - return false; - } else { - nativeCreateInstance(); - } + return hasRecordBatch() && nativeHasNext(nativeSplitter); } // 2. Call native hasNext if (nativeHasNext(nativeSplitter)) { return true; } else { - boolean flag = hasRecordBatch(); - if (!flag) { - return false; - } else { - nativeCreateInstance(); - } + return hasRecordBatch() && nativeHasNext(nativeSplitter); } - return nativeHasNext(nativeSplitter); } private native byte[] nativeNext(long instance); From 40849054b857d6e320112f81b2060bf3f0d6c1da Mon Sep 17 00:00:00 2001 From: lviiii Date: Mon, 8 Aug 2022 10:28:28 +0000 Subject: [PATCH 10/12] test. --- .../main/java/com/intel/oap/vectorized/SplitIterator.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index 7464bdc53..c53825608 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -143,7 +143,7 @@ private void nativeCreateInstance() { throw new RuntimeException(e); } finally { ConverterUtils.releaseArrowRecordBatch(recordBatch); - cb.close(); + // cb.close(); } } @@ -190,9 +190,8 @@ public boolean hasNext() { @Override public ColumnarBatch next() { byte[] serializedRecordBatch = nativeNext(nativeSplitter); - ColumnarBatch cb = ConverterUtils.createRecordBatch(serializedRecordBatch, - options.getNativePartitioning().getSchema()); - return cb; + return ConverterUtils.createRecordBatch(serializedRecordBatch, + options.getNativePartitioning().getSchema()); } private native int nativeNextPartitionId(long nativeSplitter); From 05ff93cf06e315295005be075ce2d26e63b126c3 Mon Sep 17 00:00:00 2001 From: lviiii Date: Mon, 8 Aug 2022 21:17:29 +0000 Subject: [PATCH 11/12] test. --- .../intel/oap/vectorized/SplitIterator.java | 15 ++++++++ .../ArrowColumnarBatchSerializer.scala | 2 +- .../ColumnarShuffleExchangeExec.scala | 34 +++++++++++++------ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java index c53825608..b758e6bc7 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/SplitIterator.java @@ -114,6 +114,10 @@ public SplitIterator(Iterator iterator, IteratorOptions options) } private void nativeCreateInstance() { + for (int i = 0; i < cb.numCols(); i++) { + ArrowWritableColumnVector vector = (ArrowWritableColumnVector)(cb.column(i)); + vector.getValueVector().setValueCount(cb.numRows()); + } ArrowRecordBatch recordBatch = ConverterUtils.createArrowRecordBatch(cb); try { if (jniWrapper == null) { @@ -137,9 +141,20 @@ private void nativeCreateInstance() { for (ArrowBuffer buffer: recordBatch.getBuffersLayout()) { bufSizes[j++] = buffer.getSize(); } + if (i != j || i < 1) { + logger.warn("bufAddrs and BuffersLayout have different lengths, and buffer sizes is " + i + " -- " + j); + } jniWrapper.split(nativeSplitter, cb.numRows(), bufAddrs, bufSizes, false); jniWrapper.collect(nativeSplitter, cb.numRows()); } catch (Exception e) { + if (nativeSplitter != 0) { + try { + jniWrapper.clear(nativeSplitter); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + nativeSplitter = 0; + } throw new RuntimeException(e); } finally { ConverterUtils.releaseArrowRecordBatch(recordBatch); diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala index 132194225..b66ea08ce 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/vectorized/ArrowColumnarBatchSerializer.scala @@ -271,7 +271,7 @@ private class ArrowColumnarBatchSerializerInstance( logError("Failed to serialize current RecordBatch", e) } finally { ConverterUtils.releaseArrowRecordBatch(recordBatch) - cb.close + // cb.close } this } diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 8ba380ff0..ab82ff4f6 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -559,19 +559,31 @@ object ColumnarShuffleExchangeExec extends Logging { case _ => logError("Unsupported operations: newPartitioning.") rdd.mapPartitionsWithIndexInternal( - (_, cbIter) => { - val iter = new Iterator[Product2[Int, ColumnarBatch]] { - val splitIterator = new SplitIterator(cbIter.asJava, options) - - override def hasNext: Boolean = splitIterator.hasNext - - override def next(): Product2[Int, ColumnarBatch] = - (splitIterator.nextPartitionId(), splitIterator.next()); - } - new CloseablePartitionedBatchIterator(iter) - }, + (_, cbIter) => + cbIter.map { cb => + (0 until cb.numCols).foreach( + cb.column(_) + .asInstanceOf[ArrowWritableColumnVector] + .getValueVector + .setValueCount(cb.numRows)) + (0, cb) + }, isOrderSensitive = isOrderSensitive ) +// rdd.mapPartitionsWithIndexInternal( +// (_, cbIter) => { +// val iter = new Iterator[Product2[Int, ColumnarBatch]] { +// val splitIterator = new SplitIterator(cbIter.asJava, options) +// +// override def hasNext: Boolean = splitIterator.hasNext +// +// override def next(): Product2[Int, ColumnarBatch] = +// (splitIterator.nextPartitionId(), splitIterator.next()); +// } +// new CloseablePartitionedBatchIterator(iter) +// }, +// isOrderSensitive = isOrderSensitive +// ) }} val dependency = From 5b517c80f5ceb665e3b61e0b8e171c50358e2083 Mon Sep 17 00:00:00 2001 From: lviiii Date: Tue, 9 Aug 2022 00:20:08 +0000 Subject: [PATCH 12/12] test. --- .../spark/sql/execution/ColumnarShuffleExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index ab82ff4f6..c43adebcd 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -557,7 +557,7 @@ object ColumnarShuffleExchangeExec extends Logging { isOrderSensitive = isOrderSensitive ) case _ => - logError("Unsupported operations: newPartitioning.") + logError("Unsupported operations: " + nativePartitioning.getShortName) rdd.mapPartitionsWithIndexInternal( (_, cbIter) => cbIter.map { cb =>