Skip to content

Commit

Permalink
ORC-1152: [C++] Support writing short decimals in RLEv2
Browse files Browse the repository at this point in the history
This closes #1089
  • Loading branch information
coderex2522 authored Apr 22, 2022
1 parent 7f07491 commit dc14289
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 2 deletions.
128 changes: 128 additions & 0 deletions c++/src/ColumnWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,127 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}

class Decimal64ColumnWriterV2 : public ColumnWriter {
public:
Decimal64ColumnWriterV2(const Type& type,
const StreamsFactory& factory,
const WriterOptions& options);

virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
uint64_t numValues,
const char* incomingMask) override;

virtual void flush(std::vector<proto::Stream>& streams) override;

virtual uint64_t getEstimatedSize() const override;

virtual void getColumnEncoding(
std::vector<proto::ColumnEncoding>& encodings) const override;

virtual void recordPosition() const override;

protected:
uint64_t precision;
uint64_t scale;
std::unique_ptr<RleEncoder> valueEncoder;
};

Decimal64ColumnWriterV2::Decimal64ColumnWriterV2(
const Type& type,
const StreamsFactory& factory,
const WriterOptions& options) :
ColumnWriter(type, factory, options),
precision(type.getPrecision()),
scale(type.getScale()) {
std::unique_ptr<BufferedOutputStream> dataStream =
factory.createStream(proto::Stream_Kind_DATA);
valueEncoder = createRleEncoder(std::move(dataStream),
true,
RleVersion_2,
memPool,
options.getAlignedBitpacking());

if (enableIndex) {
recordPosition();
}
}

void Decimal64ColumnWriterV2::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
uint64_t numValues,
const char* incomingMask) {
const Decimal64VectorBatch* decBatch =
dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
if (decBatch == nullptr) {
throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
}

DecimalColumnStatisticsImpl* decStats =
dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
if (decStats == nullptr) {
throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
}

ColumnWriter::add(rowBatch, offset, numValues, incomingMask);

const int64_t* data = decBatch->values.data() + offset;
const char* notNull = decBatch->hasNulls ?
decBatch->notNull.data() + offset : nullptr;

valueEncoder->add(data, numValues, notNull);

uint64_t count = 0;
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
++count;
if (enableBloomFilter) {
std::string decimal = Decimal(
data[i], static_cast<int32_t>(scale)).toString(true);
bloomFilter->addBytes(
decimal.c_str(), static_cast<int64_t>(decimal.size()));
}
decStats->update(Decimal(data[i], static_cast<int32_t>(scale)));
}
}
decStats->increase(count);
if (count < numValues) {
decStats->setHasNull(true);
}
}

void Decimal64ColumnWriterV2::flush(std::vector<proto::Stream>& streams) {
ColumnWriter::flush(streams);

proto::Stream dataStream;
dataStream.set_kind(proto::Stream_Kind_DATA);
dataStream.set_column(static_cast<uint32_t>(columnId));
dataStream.set_length(valueEncoder->flush());
streams.push_back(dataStream);
}

uint64_t Decimal64ColumnWriterV2::getEstimatedSize() const {
uint64_t size = ColumnWriter::getEstimatedSize();
size += valueEncoder->getBufferSize();
return size;
}

void Decimal64ColumnWriterV2::getColumnEncoding(
std::vector<proto::ColumnEncoding>& encodings) const {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(RleVersion_2));
encoding.set_dictionarysize(0);
if (enableBloomFilter) {
encoding.set_bloomencoding(BloomFilterVersion::UTF8);
}
encodings.push_back(encoding);
}

void Decimal64ColumnWriterV2::recordPosition() const {
ColumnWriter::recordPosition();
valueEncoder->recordPosition(rowIndexPosition.get());
}

class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type,
Expand Down Expand Up @@ -3019,6 +3140,13 @@ namespace orc {
true));
case DECIMAL:
if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
if (options.getFileVersion() == FileVersion::UNSTABLE_PRE_2_0()) {
return std::unique_ptr<ColumnWriter>(
new Decimal64ColumnWriterV2(
type,
factory,
options));
}
return std::unique_ptr<ColumnWriter>(
new Decimal64ColumnWriter(
type,
Expand Down
8 changes: 8 additions & 0 deletions c++/src/Writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ namespace orc {
privateBits->fileVersion = version;
return *this;
}
if (version == FileVersion::UNSTABLE_PRE_2_0()) {
*privateBits->errorStream << "Warning: ORC files written in "
<< FileVersion::UNSTABLE_PRE_2_0().toString()
<< " will not be readable by other versions of the software."
<< " It is only for developer testing.\n";
privateBits->fileVersion = version;
return *this;
}
throw std::logic_error("Unsupported file version specified.");
}

Expand Down
2 changes: 1 addition & 1 deletion c++/test/TestWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1996,5 +1996,5 @@ namespace orc {
}
}

INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12(), FileVersion::UNSTABLE_PRE_2_0()));
}
Binary file added examples/decimal64_v2_cplusplus.orc
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,8 @@ protected Decimal64TreeReader(int columnId,

@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT)) {
if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
Expand Down
41 changes: 41 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
Expand Down Expand Up @@ -446,6 +448,45 @@ public void testSkipBadBloomFilters() throws IOException {
CheckFileWithSargs("bad_bloom_filter_1.6.0.orc", "ORC C++ ");
}

@Test
public void testReadDecimalV2File() throws IOException {
Configuration conf = new Configuration();
Path path = new Path(workDir, "decimal64_v2_cplusplus.orc");
FileSystem fs = path.getFileSystem(conf);
try (ReaderImpl reader = (ReaderImpl) OrcFile.createReader(path,
OrcFile.readerOptions(conf).filesystem(fs))) {
assertEquals("ORC C++ 1.8.0-SNAPSHOT", reader.getSoftwareVersion());
OrcTail tail = reader.extractFileTail(fs, path, Long.MAX_VALUE);
List<StripeStatistics> stats = tail.getStripeStatistics();
assertEquals(1, stats.size());

try (RecordReader rows = reader.rows()) {
TypeDescription schema = reader.getSchema();
assertEquals("struct<a:bigint,b:decimal(10,2),c:decimal(2,2),d:decimal(2,2),e:decimal(2,2)>",
schema.toString());
VectorizedRowBatch batch = schema.createRowBatchV2();
assertTrue(rows.nextBatch(batch), "No rows read out!");
assertEquals(10, batch.size);
LongColumnVector col1 = (LongColumnVector) batch.cols[0];
Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
Decimal64ColumnVector col3 = (Decimal64ColumnVector) batch.cols[2];
Decimal64ColumnVector col4 = (Decimal64ColumnVector) batch.cols[3];
Decimal64ColumnVector col5 = (Decimal64ColumnVector) batch.cols[4];
for (int i = 0; i < batch.size; ++i) {
assertEquals(17292380420L + i, col1.vector[i]);
if (i == 0) {
long scaleNum = (long) Math.pow(10, col2.scale);
assertEquals(164.16 * scaleNum, col2.vector[i]);
} else {
assertEquals(col2.vector[i - 1] * 2, col2.vector[i]);
}
assertEquals(col3.vector[i] + col4.vector[i], col5.vector[i]);
}
assertFalse(rows.nextBatch(batch));
}
}
}

@Test
public void testExtractFileTailIndexOutOfBoundsException() throws Exception {
Configuration conf = new Configuration();
Expand Down

0 comments on commit dc14289

Please sign in to comment.