Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ingest write amplification after snapshot apply optimization #1913

Merged
merged 2 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaPack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
#include <Storages/DeltaMerge/RowKeyFilter.h>

namespace ProfileEvents
{
extern const Event DMWriteBytes;
}

namespace DB
{
namespace DM
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,7 @@ bool DeltaValueSpace::appendDeleteRange(DMContext & /*context*/, const RowKeyRan
return true;
}

bool DeltaValueSpace::appendRegionSnapshot(DMContext & /*context*/,
const RowKeyRange & range,
const DeltaPacks & packs,
bool clear_data_in_range)
bool DeltaValueSpace::ingestPacks(DMContext & /*context*/, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range)
{
std::scoped_lock lock(mutex);
if (abandoned.load(std::memory_order_relaxed))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>, pr

bool appendDeleteRange(DMContext & context, const RowKeyRange & delete_range);

bool appendRegionSnapshot(DMContext & context, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range);
bool ingestPacks(DMContext & context, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range);

/// Flush the data of packs which haven't write to disk yet, and also save the metadata of packs.
bool flush(DMContext & context);
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ void DMFile::readMetadata(const FileProviderPtr & file_provider)
readPackStat(file_provider, footer.meta_pack_info);
}

void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider,
const RateLimiterPtr & rate_limiter)
void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter)
{
writeMetadata(file_provider, rate_limiter);
if (unlikely(status != Status::WRITING))
Expand All @@ -395,8 +394,8 @@ void DMFile::finalizeForSingleFileMode(WriteBuffer & buffer)
{
Footer footer;
std::tie(footer.meta_pack_info.pack_property_offset, footer.meta_pack_info.pack_property_size) = writePackPropertyToBuffer(buffer);
std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMetaToBuffer(buffer);
std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePackStatToBuffer(buffer);
std::tie(footer.meta_pack_info.meta_offset, footer.meta_pack_info.meta_size) = writeMetaToBuffer(buffer);
std::tie(footer.meta_pack_info.pack_stat_offset, footer.meta_pack_info.pack_stat_size) = writePackStatToBuffer(buffer);

footer.sub_file_stat_offset = buffer.count();
footer.sub_file_num = sub_file_stats.size();
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Common/ProfileEvents.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
Expand All @@ -13,6 +14,11 @@
#include <Storages/Transaction/TMTContext.h>
#include <common/logger_useful.h>

namespace ProfileEvents
{
extern const Event DMWriteBytes;
}

namespace DB
{

Expand Down Expand Up @@ -58,8 +64,13 @@ void SSTFilesToDTFilesOutputStream::writeSuffix()
assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested.
// Add the DTFile to StoragePathPool so that we can restore it later
auto [ingest_storage, _schema_snap] = child->ingestingInfo();
(void)_schema_snap;
ingest_storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), dt_file->getBytesOnDisk());
std::ignore = _schema_snap;
const auto bytes_written = dt_file->getBytesOnDisk();
ingest_storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);

// Report DMWriteBytes for calculating write amplification
ProfileEvents::increment(ProfileEvents::DMWriteBytes, bytes_written);

dt_stream.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ bool Segment::ingestPacks(DMContext & dm_context, const RowKeyRange & range, con
auto new_range = range.shrink(rowkey_range);
LOG_TRACE(log, "Segment [" << segment_id << "] write region snapshot: " << new_range.toDebugString());

return delta->appendRegionSnapshot(dm_context, range, packs, clear_data_in_range);
return delta->ingestPacks(dm_context, range, packs, clear_data_in_range);
}

SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool for_update) const
Expand Down
29 changes: 27 additions & 2 deletions metrics/grafana/tiflash_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,16 @@
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"seriesOverrides": [
{
"alias": "/5min-write/",
"yaxis": 2
},
{
"alias": "/5min-all/",
"yaxis": 2
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
Expand Down Expand Up @@ -2389,6 +2398,22 @@
"intervalFactor": 1,
"legendFormat": "30min-{{instance}}",
"refId": "D"
},
{
"refId": "E",
"expr": "sum((rate(tiflash_system_profile_event_PSMWriteBytes{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[5m]) + rate(tiflash_system_profile_event_WriteBufferFromFileDescriptorWriteBytes{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[5m]) + rate(tiflash_system_profile_event_WriteBufferAIOWriteBytes{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[5m]))) by (instance)",
"hide": true,
"intervalFactor": 1,
"format": "time_series",
"legendFormat": "5min-all-{{instance}}"
},
{
"refId": "F",
"expr": "sum(rate(tiflash_system_profile_event_DMWriteBytes{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[5m])) by (instance)",
"hide": true,
"intervalFactor": 1,
"format": "time_series",
"legendFormat": "5min-write-{{instance}}"
}
],
"thresholds": [],
Expand Down Expand Up @@ -2420,7 +2445,7 @@
"show": true
},
{
"format": "none",
"format": "Bps",
"label": null,
"logBase": 1,
"max": null,
Expand Down
7 changes: 7 additions & 0 deletions tests/delta-merge-test/raft/schema/drop_on_restart.test
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,10 @@
=> DBGInvoke __refresh_schemas()
# db4 and db5 should be tombstoned
=> select name from system.databases where name like '%db%' and is_tombstone = 0

# Physical cleanup so that it won't make trouble for other tests
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
=> drop database if exists db3
=> drop database if exists db4
=> drop database if exists db5