Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check block schema in exchange operator #1506

Merged
merged 9 commits into from
Mar 8, 2021
3 changes: 3 additions & 0 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
std::shared_ptr<RemoteReader> remote_reader;

Block sample_block;
DataTypes expected_types;

std::queue<Block> block_queue;

Expand Down Expand Up @@ -136,6 +137,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
LOG_DEBUG(log, "decode packet " << std::to_string(block.rows()) + " for " + result.req_info);
if (unlikely(block.rows() == 0))
continue;
assertBlockSchema(expected_types, block, getName());
block_queue.push(std::move(block));
}
if (block_queue.empty())
Expand All @@ -156,6 +158,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
{
auto tp = getDataTypeByColumnInfo(dag_col.second);
ColumnWithTypeAndName col(tp, dag_col.first);
expected_types.push_back(col.type);
columns.emplace_back(col);
}
sample_block = Block(columns);
Expand Down
31 changes: 29 additions & 2 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <IO/ReadBufferFromString.h>

namespace DB
Expand All @@ -13,6 +14,28 @@ class CHBlockChunkCodecStream : public ChunkCodecStream
explicit CHBlockChunkCodecStream(const std::vector<tipb::FieldType> & field_types) : ChunkCodecStream(field_types)
{
output = std::make_unique<WriteBufferFromOwnString>();
for (size_t i = 0; i < field_types.size(); i++)
{
if (field_types[i].tp() == TiDB::TypeEnum)
{
ColumnInfo ci;
auto & field_type = field_types[i];
ci.tp = static_cast<TiDB::TP>(field_type.tp());
ci.flag = field_type.flag();
ci.flen = field_type.flen();
ci.decimal = field_type.decimal();
/// this is a workaround, since tidb does not push down
/// the element of Enum type, should remove if
/// https://github.com/pingcap/tics/issues/1489 is fixed
ci.elems.emplace_back("a", 1);
ci.elems.emplace_back("b", 2);
expected_types.emplace_back(getDataTypeByColumnInfo(ci));
}
else
{
expected_types.emplace_back(getDataTypeByFieldType(field_types[i]));
}
}
}

String getString() override
Expand All @@ -23,6 +46,7 @@ class CHBlockChunkCodecStream : public ChunkCodecStream
void clear() override { output = std::make_unique<WriteBufferFromOwnString>(); }
void encode(const Block & block, size_t start, size_t end) override;
std::unique_ptr<WriteBufferFromOwnString> output;
DataTypes expected_types;
};

void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, size_t offset, size_t limit)
Expand All @@ -43,10 +67,13 @@ void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & o

void CHBlockChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
/// only check block schema in CHBlock codec because for both
/// Default codec and Arrow codec, it implicitly convert the
/// input to the target output types.
assertBlockSchema(expected_types, block, "CHBlockChunkCodecStream");
// Encode data in chunk by chblock encode
if (start != 0 || end != block.rows())
throw TiFlashException(
"CHBlock encode only support encode whole block", Errors::Coprocessor::Internal);
throw TiFlashException("CHBlock encode only support encode whole block", Errors::Coprocessor::Internal);
block.checkNumberOfRows();
size_t columns = block.columns();
size_t rows = block.rows();
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Common/TiFlashException.h>
#include <Core/Types.h>
#include <DataTypes/DataTypeNullable.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionHelpers.h>
Expand Down Expand Up @@ -402,6 +403,33 @@ grpc::StatusCode tiflashErrorCodeToGrpcStatusCode(int error_code)
return grpc::StatusCode::INTERNAL;
}

void assertBlockSchema(const DataTypes & expected_types, const Block & block, const std::string & context_description)
{
size_t columns = expected_types.size();
if (block.columns() != columns)
throw Exception("Block schema mismatch in " + context_description + ": different number of columns: expected "
+ std::to_string(columns) + " columns, got " + std::to_string(block.columns()) + " columns");

for (size_t i = 0; i < columns; ++i)
{
const auto & actual = block.getByPosition(i).type;
const auto & expected = expected_types[i];

if (!expected->equals(*actual))
{
/// This is a workaround for Enum type: because TiDB does not push down enough
/// information about enum type, we only check the nullability if both type is
/// Enum, should be removed if https://github.com/pingcap/tics/issues/1489 is fixed
if (expected->isEnum() && actual->isEnum())
continue;
if (expected->isNullable() && removeNullable(expected)->isEnum() && actual->isNullable() && removeNullable(actual)->isEnum())
continue;
throw Exception("Block schema mismatch in " + context_description + ": different types: expected " + expected->getName()
+ ", got " + actual->getName());
}
}
}

extern const String UniqRawResName;

std::unordered_map<tipb::ExprType, String> agg_func_map({
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <Core/Block.h>
#include <Core/Field.h>
#include <Core/NamesAndTypes.h>
#include <Storages/Transaction/Collator.h>
Expand Down Expand Up @@ -44,5 +45,6 @@ std::shared_ptr<TiDB::ITiDBCollator> getCollatorFromExpr(const tipb::Expr & expr
std::shared_ptr<TiDB::ITiDBCollator> getCollatorFromFieldType(const tipb::FieldType & field_type);
bool hasUnsignedFlag(const tipb::FieldType & tp);
grpc::StatusCode tiflashErrorCodeToGrpcStatusCode(int error_code);
void assertBlockSchema(const DataTypes & expected_types, const Block & block, const std::string & context_description);

} // namespace DB