Skip to content

Commit

Permalink
Sql Index and Datasource Cache feature Spark official 3.0 support (#1…
Browse files Browse the repository at this point in the history
…412)

* Sql Index and Datasource Cache feature Spark official 3.0 support

* adjust travis.yml
  • Loading branch information
zhixingheyi-tian authored Jul 2, 2020
1 parent c39a336 commit 385991b
Show file tree
Hide file tree
Showing 21 changed files with 208 additions and 135 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ jobs:
- sudo apt-get install cmake
install:
- # Download spark 3.0.0
- "[ -f spark ] || mkdir spark && cd spark && wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7-hive1.2.tgz && cd .."
- "tar -xf ./spark/spark-3.0.0-bin-hadoop2.7-hive1.2.tgz"
- "export SPARK_HOME=`pwd`/spark-3.0.0-bin-hadoop2.7-hive1.2"
- "[ -f spark ] || mkdir spark && cd spark && wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz && cd .."
- "tar -xf ./spark/spark-3.0.0-bin-hadoop2.7.tgz"
- "export SPARK_HOME=`pwd`/spark-3.0.0-bin-hadoop2.7"
before_script:
- cd ${TRAVIS_BUILD_DIR}/dev
- ./install_vmemcache.sh
Expand Down
36 changes: 16 additions & 20 deletions oap-cache/oap/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.useIncrementalCompilation>false</maven.compiler.useIncrementalCompilation>
<basedir>./</basedir>
<spark.internal.version>3.0.1-Hive1.2-SNAPSHOT</spark.internal.version>
<spark.internal.version>3.0.0</spark.internal.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<java.version>1.8</java.version>
Expand All @@ -46,7 +46,7 @@
<jetty.version>9.3.24.v20180605</jetty.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
<!--diff: Spark-2.4.4 use orc 1.5.5 -->
<orc.version>1.5.9</orc.version>
<orc.version>1.5.10</orc.version>
<exec.maven.version>1.6.0</exec.maven.version>
</properties>

Expand Down Expand Up @@ -97,10 +97,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>

<pluginRepositories>
Expand Down Expand Up @@ -199,65 +195,65 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
</dependency>
<dependency>
<groupId>com.intel.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_${scala.binary.version}</artifactId>
<version>${spark.internal.version}</version>
<type>test-jar</type>
Expand Down Expand Up @@ -317,7 +313,7 @@
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${orc.version}</version>
<classifier>nohive</classifier>
<!--<classifier>nohive</classifier>-->
<scope>compile</scope>
<exclusions>
<exclusion>
Expand All @@ -342,7 +338,7 @@
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>${orc.version}</version>
<classifier>nohive</classifier>
<!--<classifier>nohive</classifier>-->
<scope>compile</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -690,7 +686,7 @@
<source>src/main/spark3.0.0/java</source>
<source>src/main/spark3.0.0/scala</source>
<source>src/main/parquet1.10.1/java</source>
<source>src/main/orc-1.5.9-nohive/java</source>
<source>src/main/orc-1.5.10/java</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.orc.impl;

import org.apache.orc.storage.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@
import java.util.TimeZone;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.util.TimestampUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.BooleanColumnStatistics;
import org.apache.orc.ColumnStatistics;
Expand All @@ -47,18 +58,6 @@
import org.apache.orc.StripeInformation;
import org.apache.orc.TimestampColumnStatistics;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.io.DiskRange;
import org.apache.orc.storage.common.io.DiskRangeList;
import org.apache.orc.storage.common.io.DiskRangeList.CreateHelper;
import org.apache.orc.storage.common.type.HiveDecimal;
//import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf;
import org.apache.orc.storage.ql.io.sarg.SearchArgument;
import org.apache.orc.storage.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.orc.storage.ql.util.TimestampUtils;
import org.apache.orc.storage.serde2.io.DateWritable;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import org.apache.orc.util.BloomFilter;
import org.apache.orc.util.BloomFilterIO;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ protected void initColumnReaders(PageReadStore pages) throws IOException {
if (missingColumns[i]) continue;
columnReaders[i] = new SkippableVectorizedColumnReader(columns.get(i),
types.get(i).getOriginalType(), pages.getPageReader(columns.get(i)),
ZoneId.systemDefault(), true);
ZoneId.systemDefault(), "LEGACY");
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.orc.*;
import org.apache.orc.impl.DataReaderProperties;
import org.apache.orc.impl.ReaderImpl;
import org.apache.orc.impl.RecordReaderBinaryCacheImpl;
import org.apache.orc.impl.RecordReaderBinaryCacheUtils;
import org.apache.orc.mapred.OrcInputFormat;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.io.WritableComparable;
import org.apache.orc.*;
import org.apache.orc.impl.DataReaderProperties;
Expand All @@ -31,7 +32,6 @@
import org.apache.orc.impl.RecordReaderBinaryCacheUtils;
import org.apache.orc.mapred.OrcMapredRecordReader;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import org.apache.spark.sql.internal.oap.OapConf$;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.orc;

import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;

import org.apache.spark.sql.types.DataType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public SkippableVectorizedColumnReader(
OriginalType originalType,
PageReader pageReader,
ZoneId convertTz,
boolean rebaseDateTime
String rebaseDateTime
)
throws IOException {
super(descriptor, originalType, pageReader, convertTz, rebaseDateTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.orc.impl;

import org.apache.orc.*;
import org.apache.orc.storage.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList;

import java.io.IOException;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.orc.DataReader;
import org.apache.orc.storage.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.spark.sql.execution.datasources.oap.filecache.FiberCache;
import org.apache.spark.sql.execution.datasources.oap.filecache.FiberCacheManager;
import org.apache.spark.sql.execution.datasources.oap.filecache.OrcBinaryFiberId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.orc.storage.common.io.DiskRange;
import org.apache.orc.storage.common.io.DiskRangeList;
import org.apache.orc.storage.common.io.DiskRangeList.CreateHelper;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf;
import org.apache.orc.storage.ql.io.sarg.SearchArgument;
import org.apache.orc.storage.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.orc.storage.serde2.io.DateWritable;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import org.apache.orc.storage.ql.util.TimestampUtils;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.ql.util.TimestampUtils;
import org.apache.hadoop.io.Text;

public class RecordReaderImpl implements RecordReader {
Expand Down Expand Up @@ -310,6 +310,13 @@ public long getNext() {
}
}

public static final class ZeroPositionProvider implements PositionProvider {
@Override
public long getNext() {
return 0;
}
}

public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
) throws IOException {
return dataReader.readStripeFooter(stripe);
Expand Down Expand Up @@ -1419,7 +1426,13 @@ protected void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
PositionProvider[] index = new PositionProvider[indexes.length];
for (int i = 0; i < indexes.length; ++i) {
if (indexes[i] != null) {
index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
OrcProto.RowIndexEntry entry = indexes[i].getEntry(rowEntry);
// This is effectively a test for pre-ORC-569 files.
if (rowEntry == 0 && entry.getPositionsCount() == 0) {
index[i] = new ZeroPositionProvider();
} else {
index[i] = new PositionProviderImpl(entry);
}
}
}
reader.seek(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.storage.common.io.DiskRange;
import org.apache.orc.storage.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.DataReader;
Expand Down Expand Up @@ -381,7 +381,7 @@ public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
if (!includedRowGroups[group]) continue;
int posn = getIndexPosition(
encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
long start = index.getEntry(group).getPositions(posn);
long start = group == 0 ? 0 : index.getEntry(group).getPositions(posn);
final long nextGroupOffset;
boolean isLast = group == (includedRowGroups.length - 1);
nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.*;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.oap.filecache.FiberCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.vector.{ColumnVector, VectorizedRowBatch}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.orc._
import org.apache.orc.impl.{ReaderImpl, RecordReaderCacheImpl}
import org.apache.orc.mapred.{OrcInputFormat, OrcStruct}
import org.apache.orc.mapreduce._
import org.apache.orc.storage.ql.exec.vector.{ColumnVector, VectorizedRowBatch}
import org.apache.parquet.hadoop.{ParquetFiberDataReader, VectorizedOapRecordReader}

import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -230,10 +230,10 @@ private[oap] case class OrcDataFile(
val field = schema.fields(fiberId)
val toColumn = new OnHeapColumnVector(rowCount, field.dataType)
if (fromColumn.isRepeating) {
OrcCacheReader.putRepeatingValues(rowCount, field, fromColumn, toColumn)
OrcCacheReader.putRepeatingValues(rowCount, field, fromColumn, toColumn)
}
else if (fromColumn.noNulls) {
OrcCacheReader.putNonNullValues(rowCount, field, fromColumn, toColumn)
OrcCacheReader.putNonNullValues(rowCount, field, fromColumn, toColumn)
}
else {
OrcCacheReader.putValues(rowCount, field, fromColumn, toColumn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[oap] case class ParquetFiberDataLoader(
val fiberData = reader.readFiberData(blockMetaData, columnDescriptor)
val columnReader =
new VectorizedColumnReader(columnDescriptor, originalType,
fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, true)
fiberData.getPageReader(columnDescriptor), ZoneId.systemDefault, "LEGACY")

if (OapRuntime.getOrCreate.fiberCacheManager.dataCacheCompressEnable) {
ParquetDataFiberCompressedWriter.dumpToCache(
Expand Down
Loading

0 comments on commit 385991b

Please sign in to comment.