Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: schrodinger bank2 fail #521

Merged
merged 11 commits into from
Mar 17, 2020
2 changes: 1 addition & 1 deletion dbms/src/Core/ColumnWithTypeAndName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ bool ColumnWithTypeAndName::operator==(const ColumnWithTypeAndName & other) cons

void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
{
out << name;
out << name << ' ' << column_id;

if (type)
out << ' ' << type->getName();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Parsers/ASTSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
partition_expression_list->formatImpl(s, state, frame);
}

if (partition_expression_list)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "SEGMENT " << (s.hilite ? hilite_none : "");
segment_expression_list->formatImpl(s, state, frame);
}

if (prewhere_expression)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "PREWHERE " << (s.hilite ? hilite_none : "");
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ASTSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ASTSelectQuery : public IAST
ASTPtr select_expression_list;
ASTPtr tables;
ASTPtr partition_expression_list;
ASTPtr segment_expression_list;
ASTPtr prewhere_expression;
ASTPtr where_expression;
ASTPtr group_expression_list;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ExpressionElementParsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ const char * ParserAliasBase::restricted_keywords[] =
"UNION",
"INTO",
"PARTITION",
"SEGMENT",
nullptr
};

Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Parsers/ParserSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_distinct("DISTINCT");
ParserKeyword s_from("FROM");
ParserKeyword s_partition("PARTITION");
ParserKeyword s_segment("SEGMENT");
ParserKeyword s_prewhere("PREWHERE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
Expand Down Expand Up @@ -90,6 +91,13 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}

/// SEGMENT p or SEGMENT (p1, p2, ...)
if (s_segment.ignore(pos, expected))
{
if (!ParserPartition().parse(pos, select_query->segment_expression_list, expected))
return false;
}

/// PREWHERE expr
if (s_prewhere.ignore(pos, expected))
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/CompactDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ bool DeltaValueSpace::compact(DMContext & context)
{
if (!pack->isSaved())
break;
if ((unlikely(pack->isMutable())))
throw Exception("Saved pack is mutable", ErrorCodes::LOGICAL_ERROR);
if ((unlikely(pack->dataFlushable())))
throw Exception("Saved pack is data flushable", ErrorCodes::LOGICAL_ERROR);

bool small_pack = !pack->isDeleteRange() && pack->rows < context.delta_small_pack_rows;
bool schema_ok = task.to_compact.empty() || pack->schema == task.to_compact.back()->schema;
Expand Down
29 changes: 17 additions & 12 deletions dbms/src/Storages/DeltaMerge/Delta/FlushDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ struct FlushPackTask
{
FlushPackTask(const PackPtr & pack_) : pack(pack_) {}

PackPtr pack;
PageId data_page = 0;
ConstPackPtr pack;

PageId data_page = 0;
};
using FlushPackTasks = std::vector<FlushPackTask>;

Expand Down Expand Up @@ -53,7 +54,7 @@ bool DeltaValueSpace::flush(DMContext & context)
auto & task = tasks.emplace_back(pack);
// We only write the pack's data if it is not a delete range, and it's data haven't been saved.
// Otherwise, simply save it's metadata is enough.
if (pack->isMutable())
if (pack->dataFlushable())
{
if (unlikely(!pack->cache))
throw Exception("Mutable pack does not have cache", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -64,13 +65,13 @@ bool DeltaValueSpace::flush(DMContext & context)
}
total_rows += pack->rows;
total_deletes += pack->isDeleteRange();

// Stop other threads appending to this pack.
pack->appendable = false;
}

if (unlikely(flush_rows != unsaved_rows || flush_deletes != unsaved_deletes || total_rows != rows || total_deletes != deletes))
throw Exception("Rows and deletes check failed", ErrorCodes::LOGICAL_ERROR);

// Must remove the last_cache, so that later append operations won't append to last pack which we are flushing.
last_cache = {};
}

// No update, return successfully.
Expand Down Expand Up @@ -143,12 +144,6 @@ bool DeltaValueSpace::flush(DMContext & context)
// If it's data have been updated, use the new pages info.
if (task.data_page != 0)
shadow->data_page = task.data_page;
if (task.pack->rows >= context.delta_small_pack_rows)
{
// This pack is too large to use cache.
task.pack->cache = {};
task.pack->cache_offset = 0;
}

packs_copy.push_back(shadow);
}
Expand Down Expand Up @@ -188,6 +183,16 @@ bool DeltaValueSpace::flush(DMContext & context)
/// Commit updates in memory.
packs.swap(packs_copy);

for (auto & pack : packs)
{
if (pack->cache && pack->data_page != 0 && pack->rows >= context.delta_small_pack_rows)
{
// This pack is too large to use cache.
pack->cache = {};
pack->cache_offset = 0;
}
}

unsaved_rows -= flush_rows;
unsaved_deletes -= flush_deletes;

Expand Down
33 changes: 27 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Pack.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#include <common/logger_useful.h>

#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Storages/DeltaMerge/Delta/Pack.h>
#include <Storages/Page/PageStorage.h>


namespace DB::DM
{
using PageReadFields = PageStorage::PageReadFields;
Expand Down Expand Up @@ -63,8 +64,9 @@ inline void serializePack(const Pack & pack, const BlockPtr & schema, WriteBuffe

inline PackPtr deserializePack(ReadBuffer & buf)
{
auto pack = std::make_shared<Pack>();
pack->saved = true; // Must be true, otherwise it should not be here.
auto pack = std::make_shared<Pack>();
pack->saved = true; // Must be true, otherwise it should not be here.
pack->appendable = false; // Must be false, otherwise it should not be here.
readIntBinary(pack->rows, buf);
readIntBinary(pack->bytes, buf);
readPODBinary(pack->delete_range, buf);
Expand Down Expand Up @@ -173,6 +175,13 @@ Block readPackFromCache(const PackPtr & pack)

Columns readPackFromCache(const PackPtr & pack, const ColumnDefines & column_defines, size_t col_start, size_t col_end)
{
if (unlikely(!(pack->cache)))
{
String msg = " Not a cache pack: " + pack->toString();
LOG_ERROR(&Logger::get(__FUNCTION__), msg);
throw Exception(msg);
}

// TODO: should be able to use cache data directly, without copy.
std::scoped_lock lock(pack->cache->mutex);

Expand All @@ -189,9 +198,21 @@ Columns readPackFromCache(const PackPtr & pack, const ColumnDefines & column_def
}
else
{
auto col_offset = it->second;
auto col_data = col.type->createColumn();
col_data->insertRangeFrom(*cache_block.getByPosition(col_offset).column, pack->cache_offset, pack->rows);
auto col_offset = it->second;
auto col_data = col.type->createColumn();
auto & cache_col = cache_block.getByPosition(col_offset).column;
if (unlikely(col_offset >= cache_block.columns() || !cache_col))
{
String msg = "read column at " + DB::toString(col_offset) //
+ ", cache block: columns=" + DB::toString(cache_block.columns()) //
+ ", rows=" + DB::toString(cache_block.rows()) //
+ ", read col_id: " + DB::toString(col.id) //
+ ", pack: " + pack->toString();
LOG_ERROR(&Logger::get(__FUNCTION__), msg);
throw Exception(msg);
}

col_data->insertRangeFrom(*cache_col, pack->cache_offset, pack->rows);
columns.push_back(std::move(col_data));
}
}
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool is_u
{
if (!is_update || pack->isSaved())
{
auto pack_copy = pack->isMutable() ? std::make_shared<Pack>(*pack) : pack;
auto pack_copy = pack->isAppendable() ? std::make_shared<Pack>(*pack) : pack;
snap->packs.push_back(std::move(pack_copy));

check_rows += pack->rows;
Expand Down Expand Up @@ -175,9 +175,14 @@ const Columns & DeltaValueSpace::Snapshot::getColumnsOfPack(size_t pack_index, s
size_t col_start = columns.size();
size_t col_end = col_num;

auto read_columns = packs[pack_index]->isCached()
? readPackFromCache(packs[pack_index], column_defines, col_start, col_end)
: readPackFromDisk(packs[pack_index], storage_snap->log_reader, column_defines, col_start, col_end);
auto & pack = packs[pack_index];
Columns read_columns;
if (pack->isCached())
read_columns = readPackFromCache(packs[pack_index], column_defines, col_start, col_end);
else if (pack->data_page != 0)
read_columns = readPackFromDisk(packs[pack_index], storage_snap->log_reader, column_defines, col_start, col_end);
else
throw Exception("Pack is in illegal status: " + pack->toString(), ErrorCodes::LOGICAL_ERROR);

columns.insert(columns.end(), read_columns.begin(), read_columns.end());
}
Expand Down
24 changes: 18 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc
size_t num_read = 0;

Handle last_handle = N_INF_HANDLE;
UInt64 last_version = 0;
size_t last_handle_pos = 0;
size_t last_handle_read_num = 0;

Expand Down Expand Up @@ -208,19 +209,30 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc
{
if constexpr (DM_RUN_CHECK)
{
// In some cases like Segment::getSplitPointSlow, only handle column in block.
if (block.columns() < 2 //
|| block.getByPosition(0).column_id != EXTRA_HANDLE_COLUMN_ID //
|| block.getByPosition(1).column_id != VERSION_COLUMN_ID)
return;

++num_read;

auto & handle_column = toColumnVectorData<Handle>(block.getByPosition(0).column);
auto & handle_column = toColumnVectorData<Handle>(block.getByPosition(0).column);
auto & version_column = toColumnVectorData<UInt64>(block.getByPosition(1).column);
for (size_t i = 0; i < handle_column.size(); ++i)
{
if (handle_column[i] < last_handle)
auto handle = handle_column[i];
auto version = version_column[i];
if (handle < last_handle || (handle == last_handle && version < last_version))
{
throw Exception("DeltaMerge return wrong result, current handle [" + DB::toString(handle_column[i]) + "]@read["
+ DB::toString(num_read) + "]@pos[" + DB::toString(i) + "] is expected >= last handle ["
+ DB::toString(last_handle) + "]@read[" + DB::toString(last_handle_read_num) + "]@pos["
throw Exception("DeltaMerge return wrong result, current handle[" + DB::toString(handle) + "]version["
+ DB::toString(version) + "]@read[" + DB::toString(num_read) + "]@pos[" + DB::toString(i)
+ "] is expected >= last_handle[" + DB::toString(last_handle) + "]last_version["
+ DB::toString(last_version) + "]@read[" + DB::toString(last_handle_read_num) + "]@pos["
+ DB::toString(last_handle_pos) + "]");
}
last_handle = handle_column[i];
last_handle = handle;
last_version = version;
last_handle_pos = i;
last_handle_read_num = num_read;
}
Expand Down
59 changes: 52 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
storage_pool,
hash_salt,
store_columns,
/* min_version */ 0,
latest_gc_safe_point,
settings.not_compress_columns,
db_settings);
return DMContextPtr(ctx);
Expand Down Expand Up @@ -527,7 +527,8 @@ void DeltaMergeStore::compact(const Context & db_context, const HandleRange & ra
BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
size_t num_streams)
size_t num_streams,
const SegmentIdSet & read_segments)
{
SegmentReadTasks tasks;

Expand All @@ -539,7 +540,9 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
for (const auto & [handle, segment] : segments)
{
(void)handle;
tasks.push(std::make_shared<SegmentReadTask>(segment, segment->createSnapshot(*dm_context), HandleRanges{segment->getRange()}));
if (read_segments.empty() || read_segments.count(segment->segmentId()))
tasks.push(
std::make_shared<SegmentReadTask>(segment, segment->createSnapshot(*dm_context), HandleRanges{segment->getRange()}));
}
}

Expand Down Expand Up @@ -574,7 +577,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
size_t num_streams,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size)
size_t expected_block_size,
const SegmentIdSet & read_segments)
{
LOG_DEBUG(log, "Read with " << sorted_ranges.size() << " ranges");

Expand All @@ -599,7 +603,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
{
auto & req_range = *range_it;
auto & seg_range = seg_it->second->getRange();
if (req_range.intersect(seg_range))
if (req_range.intersect(seg_range) && (read_segments.empty() || read_segments.count(seg_it->second->segmentId())))
{
if (tasks.empty() || tasks.back()->segment != seg_it->second)
{
Expand Down Expand Up @@ -934,6 +938,10 @@ bool DeltaMergeStore::handleBackgroundTask()
/* ignore_cache= */ false,
global_context.getSettingsRef().safe_point_update_interval_seconds);

LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << safe_point);

// Foreground task don't get GC safe point from remote, but we better make it as up to date as possible.
latest_gc_safe_point = safe_point;
task.dm_context->min_version = safe_point;
}

Expand All @@ -955,13 +963,15 @@ bool DeltaMergeStore::handleBackgroundTask()
left = segmentMergeDelta(*task.dm_context, task.segment, false);
type = ThreadType::BG_MergeDelta;
break;
case Compact: {
case Compact:
{
task.segment->getDelta()->compact(*task.dm_context);
left = task.segment;
type = ThreadType::BG_Compact;
break;
}
case Flush: {
case Flush:
{
task.segment->getDelta()->flush(*task.dm_context);
left = task.segment;
type = ThreadType::BG_Flush;
Expand Down Expand Up @@ -1411,5 +1421,40 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()

return stat;
}

SegmentStats DeltaMergeStore::getSegmentStats()
{
std::shared_lock lock(read_write_mutex);

SegmentStats stats;
for (const auto & [handle, segment] : segments)
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
{
(void)handle;

SegmentStat stat;
auto & delta = segment->getDelta();
auto & stable = segment->getStable();

stat.segment_id = segment->segmentId();
stat.range = segment->getRange();

stat.rows = segment->getEstimatedRows();
stat.size = delta->getBytes() + stable->getBytes();
stat.delete_ranges = delta->getDeletes();

stat.delta_pack_count = delta->getPackCount();
stat.stable_pack_count = stable->getPacks();

stat.avg_delta_pack_rows = (Float64)delta->getRows() / stat.delta_pack_count;
stat.avg_stable_pack_rows = (Float64)stable->getRows() / stat.stable_pack_count;

stat.delta_rate = (Float64)delta->getRows() / stat.rows;
stat.delta_cache_size = delta->getTotalCacheBytes();

stats.push_back(stat);
}
return stats;
}

} // namespace DM
} // namespace DB
Loading