Skip to content

Commit

Permalink
Merge branch 'master' into hongyunyan_fix_5154
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy authored Jun 22, 2022
2 parents 72c75bc + e14c677 commit 3ec80b2
Show file tree
Hide file tree
Showing 22 changed files with 1,155 additions and 140 deletions.
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,30 @@ LSAN_OPTIONS=suppressions=$WORKSPACE/tiflash/test/sanitize/asan.suppression

## Run Integration Tests

TBD.
1. Build your own tiflash binary in $BUILD with `-DCMAKE_BUILD_TYPE=DEBUG`.
```
cd $BUILD
cmake $WORKSPACE/tiflash -GNinja -DCMAKE_BUILD_TYPE=DEBUG
ninja tiflash
```
2. Run tidb cluster locally using tiup playgroud or other tools.
```
tiup playground nightly --tiflash.binpath $BUILD/dbms/src/Server/tiflash
```
3. Check $WORKSPACE/tests/_env.sh to make the port and build dir right.
4. Run your integration tests using commands like "./run-test.sh fullstack-test2/ddl" under $WORKSPACE dir

## Run MicroBenchmark Tests

To run micro benchmark tests, you need to build with -DCMAKE_BUILD_TYPE=RELEASE -DENABLE_TESTS=ON:

```shell
cd $BUILD
cmake $WORKSPACE/tiflash -GNinja -DCMAKE_BUILD_TYPE=DEBUG -DENABLE_TESTS=ON
ninja bench_dbms
```

And the microbenchmark-test executables are at `$BUILD/dbms/bench_dbms`, you can run it with `./bench_dbms` or `./bench_dbms --benchmark_filter=xxx` . More usage please check with `./bench_dbms --help`.

## Generate LLVM Coverage Report

Expand Down
46 changes: 25 additions & 21 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,37 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_remote_read_for_batch_cop) \
M(force_context_path) \
M(force_slow_page_storage_snapshot_release) \
M(force_change_all_blobs_to_read_only)

#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(force_change_all_blobs_to_read_only) \
M(unblock_query_init_after_write)


#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot) \
M(pause_after_copr_streams_acquired_once)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta)
#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta) \
M(pause_query_init)


namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
APPLY_FOR_FAILPOINTS_ONCE(M)
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS(M)
#undef M
} // namespace FailPoints

Expand Down Expand Up @@ -167,11 +171,11 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
}

#define M(NAME) SUB_M(NAME, FIU_ONETIME)
APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M)
#undef M

#define M(NAME) SUB_M(NAME, 0)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS(M)
#undef M
#undef SUB_M

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
/// too short a period can cause errors to disappear immediately after creation.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2 * DBMS_DEFAULT_SEND_TIMEOUT_SEC)
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16

#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
Expand Down
75 changes: 37 additions & 38 deletions dbms/src/Flash/Coprocessor/ArrowColCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <DataTypes/DataTypeDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeMyDate.h>
#include <DataTypes/DataTypeMyDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
Expand All @@ -41,7 +40,7 @@ extern const int NOT_IMPLEMENTED;
const IColumn * getNestedCol(const IColumn * flash_col)
{
if (flash_col->isColumnNullable())
return dynamic_cast<const ColumnNullable *>(flash_col)->getNestedColumnPtr().get();
return static_cast<const ColumnNullable *>(flash_col)->getNestedColumnPtr().get();
else
return flash_col;
}
Expand Down Expand Up @@ -75,8 +74,8 @@ bool flashDecimalColToArrowColInternal(
const IColumn * nested_col = getNestedCol(flash_col_untyped);
if (checkColumn<ColumnDecimal<T>>(nested_col) && checkDataType<DataTypeDecimal<T>>(data_type))
{
const ColumnDecimal<T> * flash_col = checkAndGetColumn<ColumnDecimal<T>>(nested_col);
const DataTypeDecimal<T> * type = checkAndGetDataType<DataTypeDecimal<T>>(data_type);
const auto * flash_col = checkAndGetColumn<ColumnDecimal<T>>(nested_col);
const auto * type = checkAndGetDataType<DataTypeDecimal<T>>(data_type);
UInt32 scale = type->getScale();
for (size_t i = start_index; i < end_index; i++)
{
Expand All @@ -92,8 +91,8 @@ bool flashDecimalColToArrowColInternal(
std::vector<Int32> digits;
digits.reserve(type->getPrec());
decimalToVector<typename T::NativeType>(dec.value, digits, scale);
TiDBDecimal tiDecimal(scale, digits, dec.value < 0);
dag_column.append(tiDecimal);
TiDBDecimal ti_decimal(scale, digits, dec.value < 0);
dag_column.append(ti_decimal);
}
return true;
}
Expand Down Expand Up @@ -121,7 +120,7 @@ template <typename T, bool is_nullable>
bool flashIntegerColToArrowColInternal(TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index)
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
if (const ColumnVector<T> * flash_col = checkAndGetColumn<ColumnVector<T>>(nested_col))
if (const auto * flash_col = checkAndGetColumn<ColumnVector<T>>(nested_col))
{
constexpr bool is_unsigned = std::is_unsigned_v<T>;
for (size_t i = start_index; i < end_index; i++)
Expand All @@ -135,9 +134,9 @@ bool flashIntegerColToArrowColInternal(TiDBColumn & dag_column, const IColumn *
}
}
if constexpr (is_unsigned)
dag_column.append((UInt64)flash_col->getElement(i));
dag_column.append(static_cast<UInt64>(flash_col->getElement(i)));
else
dag_column.append((Int64)flash_col->getElement(i));
dag_column.append(static_cast<UInt64>(flash_col->getElement(i)));
}
return true;
}
Expand All @@ -148,7 +147,7 @@ template <typename T, bool is_nullable>
void flashDoubleColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index)
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
if (const ColumnVector<T> * flash_col = checkAndGetColumn<ColumnVector<T>>(nested_col))
if (const auto * flash_col = checkAndGetColumn<ColumnVector<T>>(nested_col))
{
for (size_t i = start_index; i < end_index; i++)
{
Expand All @@ -160,7 +159,7 @@ void flashDoubleColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col
continue;
}
}
dag_column.append((T)flash_col->getElement(i));
dag_column.append(static_cast<T>(flash_col->getElement(i)));
}
return;
}
Expand Down Expand Up @@ -196,7 +195,7 @@ void flashDateOrDateTimeColToArrowCol(
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
using DateFieldType = DataTypeMyTimeBase::FieldType;
auto * flash_col = checkAndGetColumn<ColumnVector<DateFieldType>>(nested_col);
const auto * flash_col = checkAndGetColumn<ColumnVector<DateFieldType>>(nested_col);
for (size_t i = start_index; i < end_index; i++)
{
if constexpr (is_nullable)
Expand All @@ -217,7 +216,7 @@ void flashStringColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
// columnFixedString is not used so do not check it
auto * flash_col = checkAndGetColumn<ColumnString>(nested_col);
const auto * flash_col = checkAndGetColumn<ColumnString>(nested_col);
for (size_t i = start_index; i < end_index; i++)
{
// todo check if we can convert flash_col to DAG col directly since the internal representation is almost the same
Expand All @@ -242,7 +241,7 @@ void flashBitColToArrowCol(
const tipb::FieldType & field_type)
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
auto * flash_col = checkAndGetColumn<ColumnVector<UInt64>>(nested_col);
const auto * flash_col = checkAndGetColumn<ColumnVector<UInt64>>(nested_col);
for (size_t i = start_index; i < end_index; i++)
{
if constexpr (is_nullable)
Expand All @@ -267,7 +266,7 @@ void flashEnumColToArrowCol(
const IDataType * data_type)
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
auto * flash_col = checkAndGetColumn<ColumnVector<DataTypeEnum16::FieldType>>(nested_col);
const auto * flash_col = checkAndGetColumn<ColumnVector<DataTypeEnum16::FieldType>>(nested_col);
const auto * enum_type = checkAndGetDataType<DataTypeEnum16>(data_type);
size_t enum_value_size = enum_type->getValues().size();
for (size_t i = start_index; i < end_index; i++)
Expand All @@ -280,10 +279,10 @@ void flashEnumColToArrowCol(
continue;
}
}
auto enum_value = (UInt64)flash_col->getElement(i);
auto enum_value = static_cast<UInt64>(flash_col->getElement(i));
if (enum_value == 0 || enum_value > enum_value_size)
throw TiFlashException("number of enum overflow enum boundary", Errors::Coprocessor::Internal);
TiDBEnum ti_enum(enum_value, enum_type->getNameForValue((const DataTypeEnum16::FieldType)enum_value));
TiDBEnum ti_enum(enum_value, enum_type->getNameForValue(static_cast<const DataTypeEnum16::FieldType>(enum_value)));
dag_column.append(ti_enum);
}
}
Expand All @@ -300,7 +299,7 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
throw TiFlashException("Flash column and TiDB column has different not null flag", Errors::Coprocessor::Internal);
}
if (type->isNullable())
type = dynamic_cast<const DataTypeNullable *>(type)->getNestedType().get();
type = static_cast<const DataTypeNullable *>(type)->getNestedType().get();

switch (tidb_column_info.tp)
{
Expand Down Expand Up @@ -457,7 +456,7 @@ const char * arrowEnumColToFlashCol(
{
if (checkNull(i, null_count, null_bitmap, col))
continue;
const auto enum_value = (Int64)toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos + offsets[i])));
const auto enum_value = static_cast<Int64>(toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos + offsets[i]))));
col.column->assumeMutable()->insert(Field(enum_value));
}
return pos + offsets[length];
Expand All @@ -479,11 +478,11 @@ const char * arrowBitColToFlashCol(
continue;
const String value = String(pos + offsets[i], pos + offsets[i + 1]);
if (value.length() == 0)
col.column->assumeMutable()->insert(Field(UInt64(0)));
col.column->assumeMutable()->insert(Field(static_cast<UInt64>(0)));
UInt64 result = 0;
for (auto & c : value)
for (const auto & c : value)
{
result = (result << 8u) | (UInt8)c;
result = (result << 8u) | static_cast<UInt8>(c);
}
col.column->assumeMutable()->insert(Field(result));
}
Expand All @@ -500,7 +499,7 @@ T toCHDecimal(UInt8 digits_int, UInt8 digits_frac, bool negative, const Int32 *
UInt8 tailing_digit = digits_frac % DIGITS_PER_WORD;

typename T::NativeType value = 0;
const int word_max = int(1e9);
const int word_max = static_cast<int>(1e9);
for (int i = 0; i < word_int; i++)
{
value = value * word_max + word_buf[i];
Expand Down Expand Up @@ -552,28 +551,28 @@ const char * arrowDecimalColToFlashCol(
pos += 1;
Int32 word_buf[MAX_WORD_BUF_LEN];
const DataTypePtr decimal_type
= col.type->isNullable() ? dynamic_cast<const DataTypeNullable *>(col.type.get())->getNestedType() : col.type;
for (int j = 0; j < MAX_WORD_BUF_LEN; j++)
= col.type->isNullable() ? static_cast<const DataTypeNullable *>(col.type.get())->getNestedType() : col.type;
for (int & j : word_buf)
{
word_buf[j] = toLittleEndian(*(reinterpret_cast<const Int32 *>(pos)));
j = toLittleEndian(*(reinterpret_cast<const Int32 *>(pos)));
pos += 4;
}
if (auto * type32 = checkDecimal<Decimal32>(*decimal_type))
if (const auto * type32 = checkDecimal<Decimal32>(*decimal_type))
{
auto res = toCHDecimal<Decimal32>(digits_int, digits_frac, negative, word_buf);
col.column->assumeMutable()->insert(DecimalField<Decimal32>(res, type32->getScale()));
}
else if (auto * type64 = checkDecimal<Decimal64>(*decimal_type))
else if (const auto * type64 = checkDecimal<Decimal64>(*decimal_type))
{
auto res = toCHDecimal<Decimal64>(digits_int, digits_frac, negative, word_buf);
col.column->assumeMutable()->insert(DecimalField<Decimal64>(res, type64->getScale()));
}
else if (auto * type128 = checkDecimal<Decimal128>(*decimal_type))
else if (const auto * type128 = checkDecimal<Decimal128>(*decimal_type))
{
auto res = toCHDecimal<Decimal128>(digits_int, digits_frac, negative, word_buf);
col.column->assumeMutable()->insert(DecimalField<Decimal128>(res, type128->getScale()));
}
else if (auto * type256 = checkDecimal<Decimal256>(*decimal_type))
else if (const auto * type256 = checkDecimal<Decimal256>(*decimal_type))
{
auto res = toCHDecimal<Decimal256>(digits_int, digits_frac, negative, word_buf);
col.column->assumeMutable()->insert(DecimalField<Decimal256>(res, type256->getScale()));
Expand All @@ -600,13 +599,13 @@ const char * arrowDateColToFlashCol(
continue;
}
UInt64 chunk_time = toLittleEndian(*(reinterpret_cast<const UInt64 *>(pos)));
UInt16 year = (UInt16)((chunk_time & MyTimeBase::YEAR_BIT_FIELD_MASK) >> MyTimeBase::YEAR_BIT_FIELD_OFFSET);
UInt8 month = (UInt8)((chunk_time & MyTimeBase::MONTH_BIT_FIELD_MASK) >> MyTimeBase::MONTH_BIT_FIELD_OFFSET);
UInt8 day = (UInt8)((chunk_time & MyTimeBase::DAY_BIT_FIELD_MASK) >> MyTimeBase::DAY_BIT_FIELD_OFFSET);
UInt16 hour = (UInt16)((chunk_time & MyTimeBase::HOUR_BIT_FIELD_MASK) >> MyTimeBase::HOUR_BIT_FIELD_OFFSET);
UInt8 minute = (UInt8)((chunk_time & MyTimeBase::MINUTE_BIT_FIELD_MASK) >> MyTimeBase::MINUTE_BIT_FIELD_OFFSET);
UInt8 second = (UInt8)((chunk_time & MyTimeBase::SECOND_BIT_FIELD_MASK) >> MyTimeBase::SECOND_BIT_FIELD_OFFSET);
UInt32 micro_second = (UInt32)((chunk_time & MyTimeBase::MICROSECOND_BIT_FIELD_MASK) >> MyTimeBase::MICROSECOND_BIT_FIELD_OFFSET);
auto year = static_cast<UInt16>((chunk_time & MyTimeBase::YEAR_BIT_FIELD_MASK) >> MyTimeBase::YEAR_BIT_FIELD_OFFSET);
auto month = static_cast<UInt8>((chunk_time & MyTimeBase::MONTH_BIT_FIELD_MASK) >> MyTimeBase::MONTH_BIT_FIELD_OFFSET);
auto day = static_cast<UInt8>((chunk_time & MyTimeBase::DAY_BIT_FIELD_MASK) >> MyTimeBase::DAY_BIT_FIELD_OFFSET);
auto hour = static_cast<UInt16>((chunk_time & MyTimeBase::HOUR_BIT_FIELD_MASK) >> MyTimeBase::HOUR_BIT_FIELD_OFFSET);
auto minute = static_cast<UInt8>((chunk_time & MyTimeBase::MINUTE_BIT_FIELD_MASK) >> MyTimeBase::MINUTE_BIT_FIELD_OFFSET);
auto second = static_cast<UInt8>((chunk_time & MyTimeBase::SECOND_BIT_FIELD_MASK) >> MyTimeBase::SECOND_BIT_FIELD_OFFSET);
auto micro_second = static_cast<UInt32>((chunk_time & MyTimeBase::MICROSECOND_BIT_FIELD_MASK) >> MyTimeBase::MICROSECOND_BIT_FIELD_OFFSET);
MyDateTime mt(year, month, day, hour, minute, second, micro_second);
pos += field_length;
col.column->assumeMutable()->insert(Field(mt.toPackedUInt()));
Expand Down Expand Up @@ -659,7 +658,7 @@ const char * arrowNumColToFlashCol(
case TiDB::TypeFloat:
u32 = toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos)));
std::memcpy(&f32, &u32, sizeof(Float32));
col.column->assumeMutable()->insert(Field((Float64)f32));
col.column->assumeMutable()->insert(Field(static_cast<Float64>(f32)));
break;
case TiDB::TypeDouble:
u64 = toLittleEndian(*(reinterpret_cast<const UInt64 *>(pos)));
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,20 @@ void DAGContext::attachBlockIO(const BlockIO & io_)
io = io_;
}

const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & DAGContext::getMPPExchangeReceiverMap() const
ExchangeReceiverPtr DAGContext::getMPPExchangeReceiver(const String & executor_id) const
{
if (!isMPPTask())
throw TiFlashException("mpp_exchange_receiver_map is used in mpp only", Errors::Coprocessor::Internal);
RUNTIME_ASSERT(mpp_exchange_receiver_map != nullptr, log, "MPPTask without exchange receiver map");
return *mpp_exchange_receiver_map;
RUNTIME_ASSERT(mpp_receiver_set != nullptr, log, "MPPTask without receiver set");
return mpp_receiver_set->getExchangeReceiver(executor_id);
}

void DAGContext::addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader)
{
if (!isMPPTask())
return;
RUNTIME_ASSERT(mpp_receiver_set != nullptr, log, "MPPTask without receiver set");
return mpp_receiver_set->addCoprocessorReader(coprocessor_reader);
}

bool DAGContext::containsRegionsInfoForTable(Int64 table_id) const
Expand Down
Loading

0 comments on commit 3ec80b2

Please sign in to comment.