Skip to content

Commit

Permalink
[Fix](multi-catalog)Fix the issue of not initializing the writer caus…
Browse files Browse the repository at this point in the history
…ed by refactoring and add hive writing regression test. (#32721)

Issue Number: #31442

- Fix the issue of not initializing the writer caused by refactoring code in #31716.
- Fix reference lifetime issue of `TParquetVersion::type parquet_version` in `VParquetTransformer ` when using temp object.
- Add hive writing regression tests.
  • Loading branch information
kaka11chen authored Mar 25, 2024
1 parent 91be50b commit 645a4c4
Show file tree
Hide file tree
Showing 18 changed files with 1,981 additions and 27 deletions.
6 changes: 3 additions & 3 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const std::vector<TParquetSchema>& parquet_schemas,
const TParquetCompressionType::type& compression_type,
const bool& parquet_disable_dictionary,
const TParquetVersion::type& parquet_version,
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary,
TParquetVersion::type parquet_version,
bool output_object_data)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_parquet_schemas(parquet_schemas),
Expand Down
12 changes: 6 additions & 6 deletions be/src/vec/runtime/vparquet_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ class VParquetTransformer final : public VFileFormatTransformer {
VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
const std::vector<TParquetSchema>& parquet_schemas,
const TParquetCompressionType::type& compression_type,
const bool& parquet_disable_dictionary,
const TParquetVersion::type& parquet_version, bool output_object_data);
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary, TParquetVersion::type parquet_version,
bool output_object_data);

~VParquetTransformer() override = default;

Expand All @@ -118,9 +118,9 @@ class VParquetTransformer final : public VFileFormatTransformer {
std::shared_ptr<arrow::Schema> _arrow_schema;

const std::vector<TParquetSchema>& _parquet_schemas;
const TParquetCompressionType::type& _compression_type;
const bool& _parquet_disable_dictionary;
const TParquetVersion::type& _parquet_version;
const TParquetCompressionType::type _compression_type;
const bool _parquet_disable_dictionary;
const TParquetVersion::type _parquet_version;
};

} // namespace doris::vectorized
32 changes: 18 additions & 14 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)

io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties,
{.path = _write_info.write_path + '/' + _file_name}));
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _file_name)};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer));

switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
Expand All @@ -75,17 +77,18 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}
}
std::vector<TParquetSchema> parquet_schemas;
parquet_schemas.reserve(_columns.size());
for (int i = 0; i < _columns.size(); i++) {
VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
TParquetSchema parquet_schema;
parquet_schema.schema_column_name = _columns[i].name;
parquet_schemas.emplace_back(std::move(parquet_schema));
}
_vfile_writer.reset(new VParquetTransformer(
state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas,
_file_format_transformer.reset(new VParquetTransformer(
state, _file_writer.get(), _vec_output_expr_ctxs, parquet_schemas,
parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0,
false));
return _vfile_writer->open();
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
orc::CompressionKind orc_compression_type;
Expand Down Expand Up @@ -122,10 +125,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}
}

_vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(),
_vec_output_expr_ctxs, std::move(root_schema),
false, orc_compression_type));
return _vfile_writer->open();
_file_format_transformer.reset(
new VOrcTransformer(state, _file_writer.get(), _vec_output_expr_ctxs,
std::move(root_schema), false, orc_compression_type));
return _file_format_transformer->open();
}
default: {
return Status::InternalError("Unsupported file format type {}",
Expand All @@ -135,13 +138,14 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}

Status VHivePartitionWriter::close(const Status& status) {
if (_vfile_writer != nullptr) {
Status st = _vfile_writer->close();
if (_file_format_transformer != nullptr) {
Status st = _file_format_transformer->close();
if (!st.ok()) {
LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: {}", st.to_string());
LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}",
st.to_string());
}
}
if (!status.ok()) {
if (!status.ok() && _fs != nullptr) {
auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
Status st = _fs->delete_file(path);
if (!st.ok()) {
Expand All @@ -155,7 +159,7 @@ Status VHivePartitionWriter::close(const Status& status) {
Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) {
Block output_block;
RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block));
RETURN_IF_ERROR(_vfile_writer->write(output_block));
RETURN_IF_ERROR(_file_format_transformer->write(output_block));
_row_count += output_block.rows();
_input_size_in_bytes += output_block.bytes();
return Status::OK();
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class VHivePartitionWriter {

Status close(const Status& status);

inline size_t written_len() { return _vfile_writer->written_len(); }
inline size_t written_len() { return _file_format_transformer->written_len(); }

private:
std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& type_descriptor);
Expand Down Expand Up @@ -91,13 +91,13 @@ class VHivePartitionWriter {
TFileCompressType::type _hive_compress_type;
const std::map<std::string, std::string>& _hadoop_conf;

std::shared_ptr<io::FileSystem> _fs;
std::shared_ptr<io::FileSystem> _fs = nullptr;

// If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
std::unique_ptr<io::FileWriter> _file_writer_impl = nullptr;
std::unique_ptr<doris::io::FileWriter> _file_writer = nullptr;
// convert block to parquet/orc/csv format
std::unique_ptr<VFileFormatTransformer> _vfile_writer = nullptr;
std::unique_ptr<VFileFormatTransformer> _file_format_transformer = nullptr;

RuntimeState* _state;
};
Expand Down
Loading

0 comments on commit 645a4c4

Please sign in to comment.