Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-667] backport patches to 1.3 branch (#666)
Browse files Browse the repository at this point in the history
* [NSE-636]Remove log4j1 related unit tests (#646)

* [NSE-640] Disable compression for tiny payloads in shuffle (#641)

Closes #640

* [NSE-653] Add validity checking for get_json_object in WSCG (#654)

* Initial commit

* Add two unit test cases

* Format the code

* Update clang format check

* [NSE-617] Handle exception in cast expression from string to numeric types in WSCG (#655)

* [NSE-660] fix window builder with string (#649)

* fix window builder with string

Signed-off-by: Yuan Zhou <[email protected]>

* fix format

Signed-off-by: Yuan Zhou <[email protected]>

* [NSE-650] Scala test ArrowColumnarBatchSerializerSuite is failing  (#659)

Closes #650

* [NSE-645] Add support to cast bool type to bigint type & string type (#644)

* Initial commit

* Change arrow branch for test [revert this commit at last]

* Revert "Change arrow branch for test [revert this commit at last]"

This reverts commit 94ce7fbfc4025d48c252f91701459b4ed091dad9.

* use arrow 1.3 branch

Signed-off-by: Yuan Zhou <[email protected]>

* [NSE-662] Add "like" expression support in WSCG (#663)

* Initial commit

* Copy headers

* Format the code

* Change arrow branch for test [will revert at last]

* Revert "Change arrow branch for test [will revert at last]"

This reverts commit 065547a.

* [NSE-126] remove extra headers/namespaces in codegen (#668)

* remove extra gandiva header

Signed-off-by: Yuan Zhou <[email protected]>

* remove extra using namespace

Signed-off-by: Yuan Zhou <[email protected]>

* [NSE-661] Add trim expression support in WSCG (#664)

* Add trim expression support in WSCG

* Fix a bug

* Format the code

Co-authored-by: Wei-Ting Chen <[email protected]>
Co-authored-by: Hongze Zhang <[email protected]>
Co-authored-by: PHILO-HE <[email protected]>
  • Loading branch information
4 people authored Dec 30, 2021
1 parent 58e6d33 commit da597b8
Show file tree
Hide file tree
Showing 40 changed files with 390 additions and 160 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.3 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2
sudo make install
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.3 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.3 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:
run: |
cd /tmp
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap && cd cpp
cd arrow && git checkout arrow-4.0.0-oap-1.3 && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DGTEST_ROOT=/usr/src/gtest && make -j2
sudo make install
Expand All @@ -159,7 +159,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Run clang-format style check for C/C++ programs.
uses: jidicula/clang-format-action@v3.2.0
uses: jidicula/clang-format-action@v3.5.1
with:
clang-format-version: '10'
check-path: 'native-sql-engine/cpp/src'
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ You have to use a customized Arrow to support for our datasets Java API.

```
// build arrow-cpp
git clone -b arrow-4.0.0-oap https://github.com/oap-project/arrow.git
git clone -b arrow-4.0.0-oap-1.3 https://github.com/oap-project/arrow.git
cd arrow/cpp
mkdir build
cd build
Expand Down
2 changes: 1 addition & 1 deletion arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap-1.3 $ARROW_SOURCE_DIR
pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down
2 changes: 1 addition & 1 deletion docs/ApacheArrowInstallation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Please make sure your cmake version is qualified based on the prerequisite.
# Arrow
``` shell
git clone https://github.com/oap-project/arrow.git
cd arrow && git checkout arrow-4.0.0-oap
cd arrow && git checkout arrow-4.0.0-oap-1.3
mkdir -p arrow/cpp/release-build
cd arrow/cpp/release-build
cmake -DARROW_DEPENDENCY_SOURCE=BUNDLED -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_HDFS=ON -DARROW_BOOST_USE_SHARED=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_WITH_PROTOBUF=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_LZ4=ON -DARROW_FILESYSTEM=ON -DARROW_JSON=ON ..
Expand Down
2 changes: 1 addition & 1 deletion docs/Installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Based on the different environment, there are some parameters can be set via -D
| arrow_root | When build_arrow set to False, arrow_root will be enabled to find the location of your existing arrow library. | /usr/local |
| build_protobuf | Build Protobuf from Source. If set to False, default library path will be used to find protobuf library. | True |

When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap)
When build_arrow set to True, the build_arrow.sh will be launched and compile a custom arrow library from [OAP Arrow](https://github.com/oap-project/arrow/tree/arrow-4.0.0-oap-1.3)
If you wish to change any parameters from Arrow, you can change it from the `build_arrow.sh` script under `native-sql-engine/arrow-data-source/script/`.

### Additional Notes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,11 @@ public void loadCompressed(ArrowRecordBatch recordBatch) {
+ Collections2.toList(buffers).toString());
}
}

/**
* Direct router to VectorLoader#load()
*/
public void loadUncompressed(ArrowRecordBatch recordBatch) {
super.load(recordBatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
Expand All @@ -42,7 +43,11 @@
* ArrowRecordBatches.
*/
public class SchemaAwareArrowCompressedStreamReader extends ArrowStreamReader {
public static final String COMPRESS_TYPE_NONE = "none";

private final Schema originalSchema;

// fixme: the design can be improved to avoid relying on this stateful field
private String compressType;

public SchemaAwareArrowCompressedStreamReader(Schema originalSchema, InputStream in,
Expand All @@ -57,7 +62,7 @@ public SchemaAwareArrowCompressedStreamReader(InputStream in,
this(null, in, allocator);
}

public String GetCompressType() {
public String getCompressType() {
return compressType;
}

Expand Down Expand Up @@ -112,12 +117,17 @@ public boolean loadNextBatch() throws IOException {
}

ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(result.getMessage(), bodyBuffer);
String codecName = CompressionType.name(batch.getBodyCompression().getCodec());

if (codecName.equals("LZ4_FRAME")) {
compressType = "lz4";
byte codec = batch.getBodyCompression().getCodec();
final String codecName;
if (codec == NoCompressionCodec.COMPRESSION_TYPE) {
compressType = COMPRESS_TYPE_NONE;
} else {
compressType = codecName;
codecName = CompressionType.name(codec);
if (codecName.equals("LZ4_FRAME")) {
compressType = "lz4";
} else {
compressType = codecName;
}
}

loadRecordBatch(batch);
Expand All @@ -138,9 +148,18 @@ public boolean loadNextBatch() throws IOException {
@Override
protected void loadRecordBatch(ArrowRecordBatch batch) {
try {
((CompressedVectorLoader) loader).loadCompressed(batch);
CompressedVectorLoader loader = (CompressedVectorLoader) this.loader;
if (isCurrentBatchCompressed()) {
loader.loadCompressed(batch);
} else {
loader.loadUncompressed(batch);
}
} finally {
batch.close();
}
}

public boolean isCurrentBatchCompressed() {
return !Objects.equals(getCompressType(), COMPRESS_TYPE_NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public long make(
long offheapPerTask,
int bufferSize,
String codec,
int batchCompressThreshold,
String dataFile,
int subDirsPerLocalDir,
String localDirs,
Expand All @@ -57,6 +58,7 @@ public long make(
offheapPerTask,
bufferSize,
codec,
batchCompressThreshold,
dataFile,
subDirsPerLocalDir,
localDirs,
Expand All @@ -73,6 +75,7 @@ public native long nativeMake(
long offheapPerTask,
int bufferSize,
String codec,
int batchCompressThreshold,
String dataFile,
int subDirsPerLocalDir,
String localDirs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val columnarShuffleUseCustomizedCompressionCodec: String =
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4")

val shuffleSplitDefaultSize: Int =
val columnarShuffleBatchCompressThreshold: Int =
conf.getConfString("spark.oap.sql.columnar.shuffle.batchCompressThreshold", "100").toInt

val shuffleSplitDefaultSize: Int =
conf
.getConfString("spark.oap.sql.columnar.shuffleSplitDefaultSize", "8192").toInt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ class ColumnarCast(
if (datatype == StringType) {
val supported =
List(
BooleanType,
ByteType,
ShortType,
IntegerType,
Expand All @@ -430,12 +431,14 @@ class ColumnarCast(
}
} else if (datatype == IntegerType) {
val supported =
List(ByteType, ShortType, LongType, FloatType, DoubleType, DateType, DecimalType, StringType)
List(ByteType, ShortType, LongType, FloatType, DoubleType, DateType,
DecimalType, StringType)
if (supported.indexOf(child.dataType) == -1 && !child.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(s"${child.dataType} is not supported in castINT")
}
} else if (datatype == LongType) {
val supported = List(IntegerType, FloatType, DoubleType, DateType, DecimalType, TimestampType, StringType)
val supported = List(IntegerType, FloatType, DoubleType, DateType,
DecimalType, TimestampType, StringType, BooleanType)
if (supported.indexOf(child.dataType) == -1 &&
!child.dataType.isInstanceOf[DecimalType]) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -494,21 +497,22 @@ class ColumnarCast(
}
if (dataType == StringType) {
val limitLen: java.lang.Long = childType0 match {
case int: ArrowType.Int if int.getBitWidth == 8 => 4
case int: ArrowType.Int if int.getBitWidth == 16 => 6
case int: ArrowType.Int if int.getBitWidth == 32 => 11
case int: ArrowType.Int if int.getBitWidth == 64 => 20
case int: ArrowType.Int if int.getBitWidth == 8 => 4L
case int: ArrowType.Int if int.getBitWidth == 16 => 6L
case int: ArrowType.Int if int.getBitWidth == 32 => 11L
case int: ArrowType.Int if int.getBitWidth == 64 => 20L
case float: ArrowType.FloatingPoint
if float.getPrecision() == FloatingPointPrecision.SINGLE =>
12
12L
case float: ArrowType.FloatingPoint
if float.getPrecision() == FloatingPointPrecision.DOUBLE =>
21
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => 10
21L
case _: ArrowType.Bool => 10L
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => 10L
case decimal: ArrowType.Decimal =>
// Add two to precision for decimal point and negative sign
(decimal.getPrecision() + 2)
case _: ArrowType.Timestamp => 24
case _: ArrowType.Timestamp => 24L
case _ =>
throw new UnsupportedOperationException(
s"ColumnarCast to String doesn't support ${childType0}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private class ArrowColumnarBatchSerializerInstance(
numRowsTotal += numRows

// jni call to decompress buffers
if (compressionEnabled) {
if (compressionEnabled &&
reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader]
.isCurrentBatchCompressed) {
try {
decompressVectors()
} catch {
Expand Down Expand Up @@ -231,7 +233,7 @@ private class ArrowColumnarBatchSerializerInstance(

val serializedBatch = jniWrapper.decompress(
schemaHolderId,
reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader].GetCompressType(),
reader.asInstanceOf[SchemaAwareArrowCompressedStreamReader].getCompressType,
root.getRowCount,
bufAddrs.toArray,
bufSizes.toArray,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ColumnarShuffleWriter[K, V](
} else {
"uncompressed"
}
private val batchCompressThreshold =
GazellePluginConfig.getConf.columnarShuffleBatchCompressThreshold;

private val preferSpill = GazellePluginConfig.getConf.columnarShufflePreferSpill

private val writeSchema = GazellePluginConfig.getConf.columnarShuffleWriteSchema
Expand Down Expand Up @@ -103,6 +106,7 @@ class ColumnarShuffleWriter[K, V](
offheapPerTask,
nativeBufferSize,
defaultCompressionCodec,
batchCompressThreshold,
dataTmp.getAbsolutePath,
blockManager.subDirsPerLocalDir,
localDirs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.intel.oap.misc

import com.intel.oap.tpc.ds.TPCDSTableGen
import com.intel.oap.tpc.util.TPCRunner
import org.apache.log4j.{Level, LogManager}
//import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.functions.{col, expr}
Expand Down Expand Up @@ -68,7 +68,7 @@ class PartitioningSuite extends QueryTest with SharedSparkSession {

override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
//LogManager.getRootLogger.setLevel(Level.WARN)

lPath = Files.createTempFile("", ".parquet").toFile.getAbsolutePath
spark.range(scale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.intel.oap.tpc.ds

import com.intel.oap.tpc.util.TPCRunner
import org.apache.log4j.{Level, LogManager}
//import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -60,7 +60,7 @@ class Orc_TPCDSSuite extends QueryTest with SharedSparkSession {

override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
//LogManager.getRootLogger.setLevel(Level.WARN)
val tGen = new Orc_TPCDSTableGen(spark, 0.1D, TPCDS_WRITE_PATH)
tGen.gen()
tGen.createTables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.intel.oap.tpc.ds

import com.intel.oap.tpc.util.TPCRunner
import org.apache.log4j.{Level, LogManager}
//import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.functions.{col, exp, expr}
Expand Down Expand Up @@ -64,7 +64,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {

override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
//LogManager.getRootLogger.setLevel(Level.WARN)
val tGen = new TPCDSTableGen(spark, 0.1D, TPCDS_WRITE_PATH)
tGen.gen()
tGen.createTables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.intel.oap.tpc.MallocUtils
import com.intel.oap.tpc.h.TPCHSuite.RAMMonitor
import com.intel.oap.tpc.util.TPCRunner
import org.apache.commons.lang.StringUtils
import org.apache.log4j.{Level, LogManager}
//import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -63,7 +63,7 @@ class Orc_TPCHSuite extends QueryTest with SharedSparkSession {

override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
//LogManager.getRootLogger.setLevel(Level.WARN)
val tGen = new Orc_TPCHTableGen(spark, 0.1D, TPCH_WRITE_PATH)
tGen.gen()
tGen.createTables()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.intel.oap.tpc.MallocUtils
import com.intel.oap.tpc.h.TPCHSuite.RAMMonitor
import com.intel.oap.tpc.util.TPCRunner
import org.apache.commons.lang.StringUtils
import org.apache.log4j.{Level, LogManager}
//import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -72,7 +72,7 @@ class TPCHSuite extends QueryTest with SharedSparkSession {

override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
//LogManager.getRootLogger.setLevel(Level.WARN)
val tGen = new TPCHTableGen(spark, 0.1D, TPCH_WRITE_PATH)
tGen.gen()
tGen.createTables()
Expand Down
Loading

0 comments on commit da597b8

Please sign in to comment.