Skip to content

Commit

Permalink
[Feature-WIP](iceberg-writer) Implements iceberg partition transform. (
Browse files Browse the repository at this point in the history
…#36289)

#31442

Added iceberg operator function to support direct entry into the lake by
doris
1. Support insert into  data to iceberg by appending  hdfs files
2. Implement iceberg partition routing through partitionTransform
2.1) Serialize spec and schema data into json on the fe side and then
deserialize on the be side to get the schema and partition information
of iceberg table
2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other
types of partition strategies through partitionTransform and template
class
3. Transaction management through IcebergTransaction
3.1) After the be side file is written, report CommitData data to fe
according to the partition granularity
3.2) After receiving CommitData data, fe submits metadata to iceberg in
IcebergTransaction

### Future work
- Add unit test for partition transform function.
- Implement partition transform function with exchange sink turned on.
- The partition transform function omits the processing of bigint type.

---------

Co-authored-by: lik40 <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 22, 2024
1 parent abef1dd commit d8d9f0a
Show file tree
Hide file tree
Showing 17 changed files with 1,979 additions and 227 deletions.
22 changes: 22 additions & 0 deletions be/src/util/bit_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,28 @@ class BitUtil {
return (v << n) >> n;
}

template <typename T>
static std::string IntToByteBuffer(T input) {
std::string buffer;
T value = input;
for (int i = 0; i < sizeof(value); ++i) {
// Applies a mask for a byte range on the input.
char value_to_save = value & 0XFF;
buffer.push_back(value_to_save);
// Remove the just processed part from the input so that we can exit early if there
// is nothing left to process.
value >>= 8;
if (value == 0 && value_to_save >= 0) {
break;
}
if (value == -1 && value_to_save < 0) {
break;
}
}
std::reverse(buffer.begin(), buffer.end());
return buffer;
}

// Returns ceil(log2(x)).
// TODO: this could be faster if we use __builtin_clz. Fix this if this ever shows up
// in a hot path.
Expand Down
168 changes: 152 additions & 16 deletions be/src/vec/sink/writer/iceberg/partition_transformers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
namespace doris {
namespace vectorized {

const std::chrono::time_point<std::chrono::system_clock> PartitionColumnTransformUtils::EPOCH =
std::chrono::system_clock::from_time_t(0);

std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
const doris::iceberg::PartitionField& field, const TypeDescriptor& source_type) {
auto& transform = field.transform();
Expand All @@ -33,38 +36,178 @@ std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(

if (std::regex_match(transform, width_match, hasWidth)) {
std::string name = width_match[1];
//int parsed_width = std::stoi(width_match[2]);
int parsed_width = std::stoi(width_match[2]);

if (name == "truncate") {
switch (source_type.type) {
case TYPE_INT: {
return std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_BIGINT: {
return std::make_unique<BigintTruncatePartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
return std::make_unique<StringTruncatePartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_DECIMALV2: {
return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>(
source_type, parsed_width);
}
case TYPE_DECIMAL32: {
return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>(
source_type, parsed_width);
}
case TYPE_DECIMAL64: {
return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>(
source_type, parsed_width);
}
case TYPE_DECIMAL128I: {
return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>(
source_type, parsed_width);
}
case TYPE_DECIMAL256: {
return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>(
source_type, parsed_width);
}
default: {
throw doris::Exception(
doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type for truncate partition column transform {}",
source_type.debug_string());
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}
} else if (name == "bucket") {
switch (source_type.type) {
case TYPE_INT: {
return std::make_unique<IntBucketPartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_BIGINT: {
return std::make_unique<BigintBucketPartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
return std::make_unique<StringBucketPartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_DATEV2: {
return std::make_unique<DateBucketPartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_DATETIMEV2: {
return std::make_unique<TimestampBucketPartitionColumnTransform>(source_type,
parsed_width);
}
case TYPE_DECIMALV2: {
return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>(
source_type, parsed_width);
}
case TYPE_DECIMAL32: {
return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>(
source_type, parsed_width);
}
case TYPE_DECIMAL64: {
return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>(
source_type, parsed_width);
}
case TYPE_DECIMAL128I: {
return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>(
source_type, parsed_width);
}
case TYPE_DECIMAL256: {
return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>(
source_type, parsed_width);
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type for bucket partition column transform {}",
source_type.debug_string());
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}
}
}

if (transform == "identity") {
return std::make_unique<IdentityPartitionColumnTransform>(source_type);
} else if (transform == "year") {
switch (source_type.type) {
case TYPE_DATEV2: {
return std::make_unique<DateYearPartitionColumnTransform>(source_type);
}
case TYPE_DATETIMEV2: {
return std::make_unique<TimestampYearPartitionColumnTransform>(source_type);
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}
} else if (transform == "month") {
switch (source_type.type) {
case TYPE_DATEV2: {
return std::make_unique<DateMonthPartitionColumnTransform>(source_type);
}
case TYPE_DATETIMEV2: {
return std::make_unique<TimestampMonthPartitionColumnTransform>(source_type);
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}
} else if (transform == "day") {
switch (source_type.type) {
case TYPE_DATEV2: {
return std::make_unique<DateDayPartitionColumnTransform>(source_type);
}
case TYPE_DATETIMEV2: {
return std::make_unique<TimestampDayPartitionColumnTransform>(source_type);
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}
} else if (transform == "hour") {
switch (source_type.type) {
case TYPE_DATETIMEV2: {
return std::make_unique<TimestampHourPartitionColumnTransform>(source_type);
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}
} else if (transform == "void") {
return std::make_unique<VoidPartitionColumnTransform>(source_type);
} else {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported partition column transform: {}.", transform);
"Unsupported type {} for partition column transform {}",
source_type.debug_string(), transform);
}
}

std::string PartitionColumnTransform::name() const {
return "default";
}

std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& type,
const std::any& value) const {
return get_partition_value(type, value);
}

std::string PartitionColumnTransform::get_partition_value(const TypeDescriptor& type,
const std::any& value) const {
if (value.has_value()) {
switch (type.type) {
case TYPE_BOOLEAN: {
Expand Down Expand Up @@ -131,19 +274,12 @@ std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& type
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported partition column transform: {}",
type.debug_string());
"Unsupported type {} for partition", type.debug_string());
}
}
}
return "null";
}

ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block, int idx) {
const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(idx);
return {column_with_type_and_name.column, column_with_type_and_name.type,
column_with_type_and_name.name};
}

} // namespace vectorized
} // namespace doris
Loading

0 comments on commit d8d9f0a

Please sign in to comment.