Skip to content

Commit

Permalink
ci
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Dec 16, 2022
1 parent 13307d8 commit d9f64e7
Show file tree
Hide file tree
Showing 12 changed files with 18 additions and 154 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ std::pair<size_t, size_t> ColumnFileBigReader::readRowsRepeatedly(MutableColumns
if (actual_read == 0 && rows > 0)
{
// First successful read of data, update `actual_offset`.
auto offset_before_block_index = block_index == 0 ? 0 : cached_block_rows_end[block_index - 1];
actual_offset = offset_before_block_index + offset;
auto rows_before_block_index = block_index == 0 ? 0 : cached_block_rows_end[block_index - 1];
actual_offset = rows_before_block_index + offset;
}
actual_read += rows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t off
row_ids->resize(row_ids->size() + read_rows);
for (size_t i = 0; i < read_rows; ++i)
{
(*row_ids)[row_ids_offset + i] =start_row_id + i;
(*row_ids)[row_ids_offset + i] = start_row_id + i;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
}
else
{
// `DMVersionFilterBlockInputStream` is the last stage for generating segment row id.
// In the way we use it, the other columns are not used subsequently.
res.setSegmentRowIdCol(cur_raw_block.segmentRowIdCol()->filter(filter, passed_count));
}
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
}
else
{
// `DMVersionFilterBlockInputStream` is the last stage for generating segment row id.
// In the way we use it, the other columns are not used subsequently.
Block res;
res.setSegmentRowIdCol(block.segmentRowIdCol());
return res;
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,6 @@ class DeltaValueReader
UInt64 max_version);
};

// `DeltaValueInputStream` will read ALL data in delta.
// Blocks are not filtered.
// `segment_range_` is unsued.
class DeltaValueInputStream : public IBlockInputStream
{
private:
Expand Down
11 changes: 5 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream
ColumnUInt32::MutablePtr seg_row_id_col;
// `stable_rows` is the total rows of the underlying DMFiles, includes not valid rows.
UInt64 stable_rows;
// `delta_row_ids` is used to return the row id of delta.
std::vector<UInt32> delta_row_ids;

public:
Expand Down Expand Up @@ -166,7 +167,6 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream

Block read() override
{
// TODO(jinhelin):???
if constexpr (skippable_place)
{
if (sk_call_status == 0)
Expand Down Expand Up @@ -227,11 +227,10 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream

if constexpr (need_row_id)
{
if (block.rows() != block.segmentRowIdCol()->size())
{
auto s = fmt::format("Build bitmap error: block.rows {} != segmentRowId.size() {}", block.rows(), block.segmentRowIdCol()->size());
throw Exception(s);
}
RUNTIME_CHECK_MSG(block.rows() == block.segmentRowIdCol()->size(),
"Build bitmap error: block.rows {} != segmentRowId.size() {}",
block.rows(),
block.segmentRowIdCol()->size());
}
}
}
Expand Down
12 changes: 3 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,8 @@ class DeltaMergeStore : private boost::noncopyable
StoreStats getStoreStats();
SegmentsStats getSegmentsStats();

bool isCommonHandle() const
{
return is_common_handle;
}
size_t getRowKeyColumnSize() const
{
return rowkey_column_size;
}
bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }

public:
/// Methods mainly used by region split.
Expand Down Expand Up @@ -622,6 +616,7 @@ class DeltaMergeStore : private boost::noncopyable

BackgroundProcessingPool & blockable_background_pool;
BackgroundProcessingPool::TaskHandle blockable_background_pool_handle;

/// end of range -> segment
SegmentSortedMap segments;
/// Mainly for debug.
Expand All @@ -633,7 +628,6 @@ class DeltaMergeStore : private boost::noncopyable

RowKeyValue next_gc_check_key;

RowKeyValue next_update_bitmap_filter_key;
// Synchronize between write threads and read threads.
mutable std::shared_mutex read_write_mutex;

Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,11 +538,7 @@ class Segment
const SegmentSnapshotPtr & segment_snap,
const ColumnDefines & columns_to_read);

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
ReadInfo getReadInfo(
const DMContext & dm_context,
const ColumnDefines & read_columns,
Expand Down
126 changes: 0 additions & 126 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@
#include <TestUtils/TiFlashTestEnv.h>
#include <common/logger_useful.h>
#include <common/types.h>
#include <fmt/format.h>
#include <gtest/gtest.h>

#include <algorithm>
#include <future>
#include <iterator>
#include <random>

using namespace std::chrono_literals;

namespace DB
{
namespace FailPoints
Expand Down Expand Up @@ -3406,129 +3403,6 @@ try
}
CATCH

TEST_P(DeltaMergeStoreRWTest, ReadWithRange)
try
{
const ColumnDefine col_i8_define(2, "i8", std::make_shared<DataTypeInt8>());
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
table_column_defines->emplace_back(col_i8_define);
store = reload(table_column_defines);
}

auto create_block = [&](Int64 a, Int8 b) {
auto block = DMTestEnv::prepareSimpleWriteBlock(a, a + 1, false);
block.insert(DB::tests::createColumn<Int8>(
createSignedNumbers(b, b + 1),
col_i8_define.name,
col_i8_define.id));
return block;
};

auto b1 = create_block(std::numeric_limits<Int64>::min(), 1);
store->write(*db_context, db_context->getSettingsRef(), b1);

auto b3 = create_block(0L, 3);
store->write(*db_context, db_context->getSettingsRef(), b3);

auto b2 = create_block(std::numeric_limits<Int64>::max() - 1, 2);
store->write(*db_context, db_context->getSettingsRef(), b2);

//while (!store->mergeDeltaAll(*db_context))
{
std::this_thread::sleep_for(10ms);
}
auto getIntHandleKey = [](Int64 i) {
WriteBufferFromOwnString ss;
DB::EncodeInt64(i, ss);
return std::make_shared<String>(ss.releaseStr());
};

{
Int64 start_key = 0;
RowKeyValue start(false, getIntHandleKey(start_key), start_key);
RowKeyValue end = RowKeyValue::INT_HANDLE_MAX_KEY;
RowKeyRange range(start, end, false, 1);
// read all columns from store
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{range},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 1024)[0];
auto block = in->read();
const auto & col = block.getByPosition(0);
auto * ids = toColumnVectorDataPtr<Int64>(col.column);
std::vector<Int64> v(ids->data(), ids->data() + col.column->size());
std::cout << fmt::format("{}", v) << std::endl;
}
}
CATCH

TEST_P(DeltaMergeStoreRWTest, TestComplex)
try
{
if (mode == TestMode::V1_BlockOnly)
{
// Seems V1 not support ingest files.
return;
}

constexpr size_t num_write_rows = 128;

UInt64 tso1 = 1;
UInt64 tso2 = 100;
// [0, 128) --> tso: 1
Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false, tso1);
// [128, 256) --> tso: 1
Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false, tso1);
// [64, 192) --> tso: 100
Block block3 = DMTestEnv::prepareSimpleWriteBlock(num_write_rows / 2, num_write_rows / 2 + num_write_rows, false, tso2);

// Write: [128, 256) --> tso: 1
store->write(*db_context, db_context->getSettingsRef(), block2);

// Ingest files.
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
store->ingestFiles(dm_context, range1, file_ids1, false);
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
store->ingestFiles(dm_context, range3, file_ids3, false);
//store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
//store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

// Delete range [0, 64)
const size_t num_deleted_rows = 64;
HandleRange delete_range(0, num_deleted_rows);
store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(delete_range));

const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ static_cast<UInt64>(1),
EMPTY_FILTER,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ false,
/* expected_block_size= */ 1024)[0];
// Data is not guaranteed to be returned in order.
ASSERT_UNORDERED_INPUTSTREAM_COLS_UR(
in,
Strings({DMTestEnv::pk_name}),
createColumns({createColumn<Int64>(createNumbers<Int64>(num_write_rows / 2, 2 * num_write_rows))}));
}
CATCH


} // namespace tests
} // namespace DM
} // namespace DB
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file expected in compliance with the License.
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,4 @@ void SegmentTestBasic::writeSegmentWithDeleteRange(PageId segment_id, Int64 begi
auto segment = segments[segment_id];
RUNTIME_CHECK(segment->write(*dm_context, range));
}
} // namespace DB::DM::tests
} // namespace DB::DM::tests
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/InputStreamTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,4 @@ ::testing::AssertionResult UnorderedInputStreamVSBlockUnrestrictlyCompareColumns
}

} // namespace tests
} // namespace DB
} // namespace DB

0 comments on commit d9f64e7

Please sign in to comment.