Skip to content

Commit

Permalink
Merge pull request apache#9 from Intel-bigdata/wip_chendi
Browse files Browse the repository at this point in the history
Bug fixing after rebasing
  • Loading branch information
xuechendi authored Dec 6, 2019
2 parents 114ac65 + b4448ba commit 304cb60
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 26 deletions.
33 changes: 29 additions & 4 deletions cpp/src/jni/jni_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ static jmethodID arrowbuf_builder_constructor;

static arrow::jni::ConcurrentMap<std::shared_ptr<arrow::Buffer>> buffer_holder_;

#define ARROW_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \
auto status_name = (rexpr); \
if (!status_name.status().ok()) { \
env->ThrowNew(io_exception_class, status_name.status().message().c_str()); \
} \
lhs = std::move(status_name).ValueOrDie();

#define ARROW_ASSIGN_OR_THROW_NAME(x, y) ARROW_CONCAT(x, y)

// Executes an expression that returns a Result, extracting its value
// into the variable defined by lhs (or returning on error).
//
// Example: Assigning to a new value
// ARROW_ASSIGN_OR_THROW(auto value, MaybeGetValue(arg));
//
// Example: Assigning to an existing value
// ValueType value;
// ARROW_ASSIGN_OR_THROW(value, MaybeGetValue(arg));
//
// WARNING: ASSIGN_OR_RAISE expands into multiple statements; it cannot be used
// in a single statement (e.g. as the body of an if statement without {})!
#define ARROW_ASSIGN_OR_THROW(lhs, rexpr) \
ARROW_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_THROW_NAME(_error_or_value, __COUNTER__), \
lhs, rexpr);

jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
jclass local_class = env->FindClass(class_name);
jclass global_class = (jclass)env->NewGlobalRef(local_class);
Expand Down Expand Up @@ -166,13 +191,13 @@ arrow::Status MakeRecordBatch(const std::shared_ptr<arrow::Schema>& schema, int
return arrow::Status::OK();
}

jobject MakeRecordBatchBuilder(JNIEnv* env, std::shared_ptr<arrow::Schema> schema,
jobject MakeRecordBatchBuilder(JNIEnv* env,
std::shared_ptr<arrow::RecordBatch> record_batch) {
jobjectArray field_array =
env->NewObjectArray(schema->num_fields(), arrow_field_node_builder_class, nullptr);
jobjectArray field_array = env->NewObjectArray(record_batch->num_columns(),
arrow_field_node_builder_class, nullptr);

std::vector<std::shared_ptr<arrow::Buffer>> buffers;
for (int i = 0; i < schema->num_fields(); ++i) {
for (int i = 0; i < record_batch->num_columns(); ++i) {
auto column = record_batch->column(i);
auto dataArray = column->data();
jobject field = env->NewObject(arrow_field_node_builder_class,
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/jni/parquet/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class ParquetFileReader::Impl {
const std::vector<int>& row_group_indices) {
RETURN_NOT_OK(
GetRecordBatchReader(row_group_indices, column_indices, &record_batch_reader_));
RETURN_NOT_OK(record_batch_reader_->ReadNext(&next_batch_));
schema_ = next_batch_->schema();
return Status::OK();
}

Expand All @@ -64,23 +66,34 @@ class ParquetFileReader::Impl {
std::vector<int> row_group_indices =
GetRowGroupIndices(parquet_reader_->num_row_groups(), start_pos, end_pos);
RETURN_NOT_OK(InitRecordBatchReader(column_indices, row_group_indices));
RETURN_NOT_OK(record_batch_reader_->ReadNext(&next_batch_));
schema_ = next_batch_->schema();
return Status::OK();
}

Status ReadSchema(std::shared_ptr<Schema>* out) {
RETURN_NOT_OK(parquet_reader_->GetSchema(out));
if (!schema_) {
return arrow::Status::Invalid("ReadSchema found non-exist schema.");
}
*out = schema_;
return Status::OK();
}

Status ReadNext(std::shared_ptr<RecordBatch>* out) {
RETURN_NOT_OK(record_batch_reader_->ReadNext(out));
*out = next_batch_;
auto status = record_batch_reader_->ReadNext(&next_batch_);
if (!status.ok()) {
next_batch_ = nullptr;
}
return Status::OK();
}

private:
std::shared_ptr<RandomAccessFile> file_;
std::unique_ptr<::parquet::arrow::FileReader> parquet_reader_;
std::shared_ptr<RecordBatchReader> record_batch_reader_;
std::shared_ptr<RecordBatch> next_batch_;
std::shared_ptr<Schema> schema_;

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
Expand Down
39 changes: 21 additions & 18 deletions cpp/src/jni/parquet/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

#include <arrow/buffer.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/path_util.h>
#include <arrow/ipc/api.h>
#include <arrow/ipc/dictionary.h>
#include <arrow/type.h>
#include <jni.h>
#include <iostream>
#include <string>
Expand Down Expand Up @@ -94,11 +96,7 @@ Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeOpenParquetR
}

std::shared_ptr<arrow::io::RandomAccessFile> file;
status = fs->OpenInputFile(file_name).Value(&file);
if (!status.ok()) {
std::string error_message = "nativeOpenParquetReader: " + status.message();
env->ThrowNew(io_exception_class, error_message.c_str());
}
ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name));

parquet::ArrowReaderProperties properties(true);
properties.set_batch_size(batch_size);
Expand Down Expand Up @@ -205,15 +203,8 @@ Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeReadNext(JNI
if (record_batch == nullptr) {
return nullptr;
}
std::shared_ptr<arrow::Schema> schema;
status = reader->ReadSchema(&schema);
if (!status.ok()) {
std::string error_message =
"nativeReadNext: failed to read schema, err is " + status.message();
env->ThrowNew(io_exception_class, error_message.c_str());
}

return MakeRecordBatchBuilder(env, schema, record_batch);
return MakeRecordBatchBuilder(env, record_batch);
}

JNIEXPORT jobject JNICALL
Expand All @@ -224,6 +215,7 @@ Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeGetSchema(JN
auto reader = GetFileReader(env, id);
std::shared_ptr<arrow::Schema> schema;
status = reader->ReadSchema(&schema);

if (!status.ok()) {
std::string error_message =
"nativeGetSchema: failed to read schema, err is " + status.message();
Expand Down Expand Up @@ -266,13 +258,24 @@ Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeOpenParquetW
env->ThrowNew(io_exception_class, error_message.c_str());
}

std::shared_ptr<arrow::io::OutputStream> sink;
status = fs->OpenOutputStream(file_name).Value(&sink);
if (!status.ok()) {
std::string error_message = "nativeOpenParquetWriter: " + status.message();
env->ThrowNew(io_exception_class, error_message.c_str());
// check if directory exists
auto dir = arrow::fs::internal::GetAbstractPathParent(file_name).first;
arrow::fs::Selector selector;
selector.base_dir = dir;
selector.allow_non_existent = true;
std::vector<arrow::fs::FileStats> stats;
ARROW_ASSIGN_OR_THROW(stats, fs->GetTargetStats(selector));
if (stats.size() == 0) {
status = fs->CreateDir(dir);
if (!status.ok()) {
std::string error_message = "nativeOpenParquetWriter: " + status.message();
env->ThrowNew(io_exception_class, error_message.c_str());
}
}

std::shared_ptr<arrow::io::OutputStream> sink;
ARROW_ASSIGN_OR_THROW(sink, fs->OpenOutputStream(file_name));

std::unique_ptr<ParquetFileWriter> writer;
status = ParquetFileWriter::Open(sink, arrow::default_memory_pool(), schema, &writer);
if (!status.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public ParquetReader(
* @return Schema of parquet file
* @throws IOException throws io exception in case of native failure
*/
Schema getSchema() throws IOException {
public Schema getSchema() throws IOException {
byte[] schemaBytes = jniWrapper.nativeGetSchema(nativeInstanceId);

try (MessageChannelReader schemaReader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void teardown() {
public void testParquetReadWrite() throws Exception {

File testFile = testFolder.newFile("_tmpfile_ParquetWriterReaderTest");
String path = testFile.getAbsolutePath();
//String path = testFile.getAbsolutePath();
String path = "hdfs://sr602:9000/test?user=root&replication=1&use_hdfs3=1";

int numColumns = 10;
int[] rowGroupIndices = {0};
Expand Down

0 comments on commit 304cb60

Please sign in to comment.