Skip to content

Commit

Permalink
Merge pull request #68865 from ClickHouse/backport/24.5/68131
Browse files Browse the repository at this point in the history
Backport #68131 to 24.5: Fix crash on parquet column type mismatch
  • Loading branch information
robot-ch-test-poll1 authored Aug 25, 2024
2 parents 7868ab0 + 922e9b8 commit 2fb9ed0
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 37 deletions.
9 changes: 9 additions & 0 deletions src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,15 @@ static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
case TypeIndex::IPv6:
return readIPv6ColumnFromBinaryData(arrow_column, column_name);
/// ORC format outputs big integers as binary column, because there is no fixed binary in ORC.
///
/// When ORC/Parquet file says the type is "byte array" or "fixed len byte array",
/// but the clickhouse query says to interpret the column as e.g. Int128, it
/// may mean one of two things:
/// * The byte array is the 16 bytes of Int128, little-endian.
/// * The byte array is an ASCII string containing the Int128 formatted in base 10.
/// There's no reliable way to distinguish these cases. We just guess: if the
/// byte array is variable-length, and the length is different from sizeof(type),
/// we parse as text, otherwise as binary.
case TypeIndex::Int128:
return readColumnWithBigNumberFromBinaryData<ColumnInt128>(arrow_column, column_name, type_hint);
case TypeIndex::UInt128:
Expand Down
130 changes: 93 additions & 37 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Interpreters/convertFieldToType.h>

namespace CurrentMetrics
{
Expand All @@ -48,15 +49,15 @@ namespace ErrorCodes
throw Exception::createDeprecated(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)

/// Decode min/max value from column chunk statistics.
/// Decode min/max value from column chunk statistics. Returns Null if missing or unsupported.
///
/// There are two questionable decisions in this implementation:
/// * We parse the value from the encoded byte string instead of casting the parquet::Statistics
/// to parquet::TypedStatistics and taking the value from there.
/// * We dispatch based on the parquet logical+converted+physical type instead of the ClickHouse type.
/// The idea is that this is similar to what we'll have to do when reimplementing Parquet parsing in
/// ClickHouse instead of using Arrow (for speed). So, this is an exercise in parsing Parquet manually.
static std::optional<Field> decodePlainParquetValueSlow(const std::string & data, parquet::Type::type physical_type, const parquet::ColumnDescriptor & descr)
static Field decodePlainParquetValueSlow(const std::string & data, parquet::Type::type physical_type, const parquet::ColumnDescriptor & descr, TypeIndex type_hint)
{
using namespace parquet;

Expand Down Expand Up @@ -112,8 +113,6 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
if (data.size() != size || size < 1 || size > 32)
throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected decimal size: {} (actual {})", size, data.size());

/// For simplicity, widen all decimals to 256-bit. It should compare correctly with values
/// of different bitness.
Int256 val = 0;
memcpy(&val, data.data(), size);
if (big_endian)
Expand All @@ -122,7 +121,19 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
if (size < 32 && (val >> (size * 8 - 1)) != 0)
val |= ~((Int256(1) << (size * 8)) - 1);

return Field(DecimalField<Decimal256>(Decimal256(val), static_cast<UInt32>(scale)));
auto narrow = [&](auto x) -> Field
{
memcpy(&x, &val, sizeof(x));
return Field(DecimalField<decltype(x)>(x, static_cast<UInt32>(scale)));
};
if (size <= 4)
return narrow(Decimal32(0));
else if (size <= 8)
return narrow(Decimal64(0));
else if (size <= 16)
return narrow(Decimal128(0));
else
return narrow(Decimal256(0));
}
while (false);

Expand Down Expand Up @@ -179,8 +190,6 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
return Field(val);
}

/// Strings.

if (physical_type == Type::type::BYTE_ARRAY || physical_type == Type::type::FIXED_LEN_BYTE_ARRAY)
{
/// Arrow's parquet decoder handles missing min/max values slightly incorrectly.
Expand All @@ -207,14 +216,31 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
/// TODO: Remove this workaround either when we implement our own Parquet decoder that
/// doesn't have this bug, or if it's fixed in Arrow.
if (data.empty())
return std::nullopt;
return Field();

/// Long integers, encoded either as text or as little-endian bytes.
/// The parquet file doesn't know that it's numbers, so the min/max are produced by comparing
/// strings lexicographically. So these min and max are mostly useless to us.
/// There's one case where they're not useless: min == max; currently we don't make use of this.
switch (type_hint)
{
case TypeIndex::UInt128:
case TypeIndex::UInt256:
case TypeIndex::Int128:
case TypeIndex::Int256:
case TypeIndex::IPv6:
return Field();
default: break;
}

/// Strings.
return Field(data);
}

/// This one's deprecated in Parquet.
/// This type is deprecated in Parquet.
/// TODO: But turns out it's still used in practice, we should support it.
if (physical_type == Type::type::INT96)
throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Parquet INT96 type is deprecated and not supported");
return Field();

/// Integers.

Expand Down Expand Up @@ -277,30 +303,61 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
continue;
auto stats = it->second;

auto default_value = [&]() -> Field
{
DataTypePtr type = header.getByPosition(idx).type;
if (type->lowCardinality())
type = assert_cast<const DataTypeLowCardinality &>(*type).getDictionaryType();
if (type->isNullable())
type = assert_cast<const DataTypeNullable &>(*type).getNestedType();
return type->getDefault();
};
DataTypePtr type = header.getByPosition(idx).type;
if (type->lowCardinality())
type = assert_cast<const DataTypeLowCardinality &>(*type).getDictionaryType();
if (type->isNullable())
type = assert_cast<const DataTypeNullable &>(*type).getNestedType();
Field default_value = type->getDefault();
TypeIndex type_index = type->getTypeId();

/// Only primitive fields are supported, not arrays, maps, tuples, or Nested.
/// Arrays, maps, and Nested can't be meaningfully supported because Parquet only has min/max
/// across all *elements* of the array, not min/max array itself.
/// Same limitation for tuples, but maybe it would make sense to have some kind of tuple
/// expansion in KeyCondition to accept ranges per element instead of whole tuple.

std::optional<Field> min;
std::optional<Field> max;
Field min;
Field max;
if (stats->HasMinMax())
{
try
{
min = decodePlainParquetValueSlow(stats->EncodeMin(), stats->physical_type(), *stats->descr());
max = decodePlainParquetValueSlow(stats->EncodeMax(), stats->physical_type(), *stats->descr());
min = decodePlainParquetValueSlow(stats->EncodeMin(), stats->physical_type(), *stats->descr(), type_index);
max = decodePlainParquetValueSlow(stats->EncodeMax(), stats->physical_type(), *stats->descr(), type_index);

/// If the data type in parquet file substantially differs from the requested data type,
/// it's sometimes correct to just typecast the min/max values.
/// Other times it's incorrect, e.g.:
/// INSERT INTO FUNCTION file('t.parquet', Parquet, 'x String') VALUES ('1'), ('100'), ('2');
/// SELECT * FROM file('t.parquet', Parquet, 'x Int64') WHERE x >= 3;
/// If we just typecast min/max from string to integer, this query will incorrectly return empty result.
/// Allow conversion in some simple cases, otherwise ignore the min/max values.
auto min_type = min.getType();
auto max_type = max.getType();
min = convertFieldToType(min, *type);
max = convertFieldToType(max, *type);
auto ok_cast = [&](Field::Types::Which from, Field::Types::Which to) -> bool
{
if (from == to)
return true;
/// Decimal -> wider decimal.
if (Field::isDecimal(from) || Field::isDecimal(to))
return Field::isDecimal(from) && Field::isDecimal(to) && to >= from;
/// Integer -> IP.
if (to == Field::Types::IPv4)
return from == Field::Types::UInt64;
/// Disable index for everything else, especially string <-> number.
return false;
};
if (!(ok_cast(min_type, min.getType()) && ok_cast(max_type, max.getType())) &&
!(min == max) &&
!(min_type == Field::Types::Int64 && min.getType() == Field::Types::UInt64 && min.safeGet<Int64>() >= 0) &&
!(max_type == Field::Types::UInt64 && max.getType() == Field::Types::Int64 && max.safeGet<UInt64>() <= UInt64(INT64_MAX)))
{
min = Field();
max = Field();
}
}
catch (Exception & e)
{
Expand All @@ -322,7 +379,7 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
{
/// Single-point range containing either the default value of one of the infinities.
if (null_as_default)
hyperrectangle[idx].right = hyperrectangle[idx].left = default_value();
hyperrectangle[idx].right = hyperrectangle[idx].left = default_value;
else
hyperrectangle[idx].right = hyperrectangle[idx].left;
continue;
Expand All @@ -333,32 +390,31 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
if (null_as_default)
{
/// Make sure the range contains the default value.
Field def = default_value();
if (min.has_value() && applyVisitor(FieldVisitorAccurateLess(), def, *min))
min = def;
if (max.has_value() && applyVisitor(FieldVisitorAccurateLess(), *max, def))
max = def;
if (!min.isNull() && applyVisitor(FieldVisitorAccurateLess(), default_value, min))
min = default_value;
if (!max.isNull() && applyVisitor(FieldVisitorAccurateLess(), max, default_value))
max = default_value;
}
else
{
/// Make sure the range reaches infinity on at least one side.
if (min.has_value() && max.has_value())
min.reset();
if (!min.isNull() && !max.isNull())
min = Field();
}
}
else
{
/// If the column doesn't have nulls, exclude both infinities.
if (!min.has_value())
if (min.isNull())
hyperrectangle[idx].left_included = false;
if (!max.has_value())
if (max.isNull())
hyperrectangle[idx].right_included = false;
}

if (min.has_value())
hyperrectangle[idx].left = std::move(min.value());
if (max.has_value())
hyperrectangle[idx].right = std::move(max.value());
if (!min.isNull())
hyperrectangle[idx].left = std::move(min);
if (!max.isNull())
hyperrectangle[idx].right = std::move(max);
}

return hyperrectangle;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
424242424242424242424242424242424242424242424242424242
22707864971053448441042714569797161695738549521977760418632926980540162388532
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

# This is parsed as text.
$CLICKHOUSE_LOCAL -q "select toString(424242424242424242424242424242424242424242424242424242::UInt256) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' -q "select * from table"

# But this is parsed as binary because text length happens to be 32 bytes. Not ideal.
$CLICKHOUSE_LOCAL -q "select toString(42424242424242424242424242424242::UInt256) as x format Parquet" | $CLICKHOUSE_LOCAL --input-format=Parquet --structure='x UInt256' -q "select * from table"
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@ d256 Nullable(Decimal(76, 40))
500 244750
500 244750
500 244750
42
100
6 changes: 6 additions & 0 deletions tests/queries/0_stateless/02841_parquet_filter_pushdown.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,9 @@ select count(), sum(number) from file('02841.parquet', Parquet, 'number UInt64,
select count(), sum(number) from file('02841.parquet') where indexHint(string_or_null == ''); -- quirk with infinities
select count(), sum(number) from file('02841.parquet', Parquet, 'number UInt64, string_or_null String') where indexHint(string_or_null == '');
select count(), sum(number) from file('02841.parquet', Parquet, 'number UInt64, nEgAtIvE_oR_nUlL Int64') where indexHint(nEgAtIvE_oR_nUlL > -50) settings input_format_parquet_case_insensitive_column_matching = 1;

-- Bad type conversions.
insert into function file('02841.parquet') select 42 as x;
select * from file('02841.parquet', Parquet, 'x Nullable(String)') where x not in (1);
insert into function file('t.parquet', Parquet, 'x String') values ('1'), ('100'), ('2');
select * from file('t.parquet', Parquet, 'x Int64') where x >= 3;

0 comments on commit 2fb9ed0

Please sign in to comment.