Skip to content

Commit

Permalink
fix lock contension of consumeResource in storage layer (#8271) (#8280)
Browse files Browse the repository at this point in the history
close #8270
  • Loading branch information
ti-chi-bot authored Oct 31, 2023
1 parent 4effafa commit ec00c9a
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 25 deletions.
7 changes: 3 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ class DAGContext

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getResourceGroupName() { return resource_group_name; }
void enableResourceControl() { enable_resource_control = true; }
bool isResourceControlEnabled() const { return enable_resource_control; }
// For now, only called for BlockIO execution engine to disable report RU of storage layer.
void clearResourceGroupName() { resource_group_name = ""; }

RU getReadRU() const;

Expand Down Expand Up @@ -452,8 +452,7 @@ class DAGContext
// The keyspace that the DAG request from
const KeyspaceID keyspace_id = NullspaceID;

const String resource_group_name;
bool enable_resource_control = false;
String resource_group_name;

// Used to determine the execution mode
// - None: request has not been executed yet
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ void DAGStorageInterpreter::prepare()

// Do learner read
DAGContext & dag_context = *context.getDAGContext();
auto scan_context
= std::make_shared<DM::ScanContext>(dag_context.getResourceGroupName(), dag_context.isResourceControlEnabled());
auto scan_context = std::make_shared<DM::ScanContext>(dag_context.getResourceGroupName());
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Executor/toRU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ RU cpuTimeToRU(UInt64 cpu_time_ns)
// 1ru = 64KB
RU bytesToRU(UInt64 bytes)
{
return static_cast<double>(bytes) / 1024.0 / 64.0;
return static_cast<double>(bytes) / bytes_of_one_ru;
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Flash/Executor/toRU.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ UInt64 toCPUTimeMillisecond(UInt64 cpu_time_ns);
// Convert cpu time nanoseconds to Request Unit.
RU cpuTimeToRU(UInt64 cpu_time_ns);
RU bytesToRU(UInt64 bytes);

static constexpr UInt64 bytes_of_one_ru = 1024 * 64;
static constexpr UInt64 bytes_of_one_hundred_ru = 100 * bytes_of_one_ru;
} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,4 +613,45 @@ class LocalAdmissionController final : private boost::noncopyable

const LoggerPtr log = Logger::get("LocalAdmissionController");
};

// This is to reduce the calling frequence of LAC::consumeResource() to avoid lock contension.
// TODO: Need to optimize LAC::consumeResource().
// Because the lock contension still increase when the thread num of storage layer or the data to be read is very large.
class LACBytesCollector
{
public:
explicit LACBytesCollector(const std::string & name)
: resource_group_name(name)
, delta_bytes(0)
{}

~LACBytesCollector()
{
if (delta_bytes != 0)
consume();
}

void collect(uint64_t bytes)
{
delta_bytes += bytes;
// Call LAC::consumeResource() when accumulated to `bytes_of_one_hundred_ru` to avoid lock contension.
if (delta_bytes >= bytes_of_one_hundred_ru)
{
consume();
delta_bytes = 0;
}
}

private:
void consume()
{
assert(delta_bytes != 0);
if (!resource_group_name.empty())
LocalAdmissionController::global_instance->consumeResource(resource_group_name, bytesToRU(delta_bytes), 0);
}

const std::string resource_group_name;
uint64_t delta_bytes;
};
using LACBytesCollectorPtr = std::unique_ptr<LACBytesCollector>;
} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool
{
RUNTIME_ASSERT(context.getDAGContext());
auto & dag_context = *context.getDAGContext();
// Resource control should only works for pipeline model.
dag_context.clearResourceGroupName();
const auto & logger = dag_context.log;
RUNTIME_ASSERT(logger);

Expand Down Expand Up @@ -155,7 +157,6 @@ std::optional<QueryExecutorPtr> executeAsPipeline(Context & context, bool intern
}

prepareForExecute(context);
dag_context.enableResourceControl();

ProcessList::EntryPtr process_list_entry;
if (likely(!internal))
Expand Down
15 changes: 7 additions & 8 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ ColumnFileSetReader::ColumnFileSetReader(
, snapshot(snapshot_)
, col_defs(col_defs_)
, segment_range(segment_range_)
, lac_bytes_collector(context_.scan_context ? context_.scan_context->resource_group_name : "")
{
size_t total_rows = 0;
for (auto & f : snapshot->getColumnFiles())
Expand Down Expand Up @@ -185,17 +186,15 @@ size_t ColumnFileSetReader::readRows(
}
}
}

UInt64 delta_bytes = 0;
for (const auto & col : output_columns)
{
const auto delta_bytes = col->byteSize();
delta_bytes += col->byteSize();

lac_bytes_collector.collect(delta_bytes);
if (likely(context.scan_context))
context.scan_context->total_user_read_bytes += delta_bytes;

if (context.scan_context->enable_resource_control)
LocalAdmissionController::global_instance->consumeResource(
context.scan_context->resource_group_name,
bytesToRU(delta_bytes),
0);
}
return actual_read;
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ class ColumnFileSetReader

std::vector<ColumnFileReaderPtr> column_file_readers;

LACBytesCollector lac_bytes_collector;

private:
explicit ColumnFileSetReader(const DMContext & context_)
: context(context_)
, lac_bytes_collector(context_.scan_context ? context_.scan_context->resource_group_name : "")
{}

Block readPKVersion(size_t offset, size_t limit);
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ class ScanContext
std::atomic<uint64_t> total_disagg_read_cache_miss_size{0};


explicit ScanContext(const String & name = "", bool enable_resource_control_ = false)
explicit ScanContext(const String & name = "")
: resource_group_name(name)
, enable_resource_control(enable_resource_control_)
{}

void deserialize(const tipb::TiFlashScanContext & tiflash_scan_context_pb)
Expand Down Expand Up @@ -134,7 +133,6 @@ class ScanContext
}

const String resource_group_name;
const bool enable_resource_control;
};

using ScanContextPtr = std::shared_ptr<ScanContext>;
Expand Down
10 changes: 4 additions & 6 deletions dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
: rows(inputs_.size(), 0)
, precede_stream_rows(0)
, scan_context(scan_context_)
, lac_bytes_collector(scan_context_ ? scan_context_->resource_group_name : "")
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
Expand All @@ -91,6 +92,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
: rows(std::move(rows_))
, precede_stream_rows(0)
, scan_context(scan_context_)
, lac_bytes_collector(scan_context_ ? scan_context_->resource_group_name : "")
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
Expand Down Expand Up @@ -219,18 +221,14 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
if (likely(scan_context != nullptr))
{
scan_context->total_user_read_bytes += bytes;

if (scan_context->enable_resource_control)
LocalAdmissionController::global_instance->consumeResource(
scan_context->resource_group_name,
bytesToRU(bytes),
0);
lac_bytes_collector.collect(bytes);
}
}
BlockInputStreams::iterator current_stream;
std::vector<size_t> rows;
size_t precede_stream_rows;
const ScanContextPtr scan_context;
LACBytesCollector lac_bytes_collector;
};

} // namespace DM
Expand Down

0 comments on commit ec00c9a

Please sign in to comment.