Skip to content

Commit

Permalink
[fix](cloud) fix be core when using stream load record in cloud mode (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored and dataroaring committed Jul 22, 2024
1 parent b77441e commit 5be89b9
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 69 deletions.
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/thrift_server.h"

Expand Down Expand Up @@ -186,4 +188,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
response.status = t_status;
}

void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
BaseBackendService::get_stream_load_record(result, last_stream_record_time,
_engine.get_stream_load_recorder());
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class CloudBackendService final : public BaseBackendService {
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) override;

void get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ Status CloudStorageEngine::open() {

_tablet_hotspot = std::make_unique<TabletHotspot>();

RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");

return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
Expand Down
8 changes: 5 additions & 3 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <rapidjson/prettywriter.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -119,7 +120,7 @@ void HttpStreamAction::handle(HttpRequest* req) {
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
if (config::enable_stream_load_record && !config::is_cloud_mode()) {
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
Expand Down Expand Up @@ -364,8 +365,9 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,

void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str) {
auto stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();

if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
Expand Down
8 changes: 5 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <stdexcept>
#include <utility>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -217,7 +218,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
if (config::enable_stream_load_record && !config::is_cloud_mode()) {
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
Expand Down Expand Up @@ -705,8 +706,9 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa

void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str) {
auto stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();

if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
Expand Down
41 changes: 20 additions & 21 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
config::memory_limitation_per_thread_for_schema_change_bytes);
}

Status BaseStorageEngine::init_stream_load_recorder(const std::string& stream_load_record_path) {
LOG(INFO) << "stream load record path: " << stream_load_record_path;
// init stream load record rocksdb
_stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path);
if (_stream_load_recorder == nullptr) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"),
"new StreamLoadRecorder failed");
}
auto st = _stream_load_recorder->init();
if (!st.ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::IOError("open StreamLoadRecorder rocksdb failed, path={}",
stream_load_record_path),
"init StreamLoadRecorder failed");
}
return Status::OK();
}

static Status _validate_options(const EngineOptions& options) {
if (options.store_paths.empty()) {
return Status::InternalError("store paths is empty");
Expand All @@ -158,7 +177,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)),
_txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
_default_rowset_type(BETA_ROWSET),
_stream_load_recorder(nullptr),
_create_tablet_idx_lru_cache(
new CreateTabletIdxCache(config::partition_disk_index_lru_size)),
_snapshot_mgr(std::make_unique<SnapshotManager>(*this)) {
Expand Down Expand Up @@ -274,31 +292,12 @@ Status StorageEngine::_init_store_map() {
return Status::InternalError("init path failed, error={}", error_msg);
}

RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(_options.store_paths[0].path),
RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path),
"init StreamLoadRecorder failed");

return Status::OK();
}

Status StorageEngine::_init_stream_load_recorder(const std::string& stream_load_record_path) {
LOG(INFO) << "stream load record path: " << stream_load_record_path;
// init stream load record rocksdb
_stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path);
if (_stream_load_recorder == nullptr) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"),
"new StreamLoadRecorder failed");
}
auto st = _stream_load_recorder->init();
if (!st.ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::IOError("open StreamLoadRecorder rocksdb failed, path={}",
stream_load_record_path),
"init StreamLoadRecorder failed");
}
return Status::OK();
}

void StorageEngine::_update_storage_medium_type_count() {
set<TStorageMedium::type> available_storage_medium_types;

Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class BaseStorageEngine {

int get_disk_num() { return _disk_num; }

Status init_stream_load_recorder(const std::string& stream_load_record_path);

const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
return _stream_load_recorder;
}

protected:
void _evict_querying_rowset();
void _evict_quring_rowset_thread_callback();
Expand All @@ -157,6 +163,8 @@ class BaseStorageEngine {
int64_t _memory_limitation_bytes_for_schema_change;

int _disk_num {-1};

std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
};

class StorageEngine final : public BaseStorageEngine {
Expand Down Expand Up @@ -246,10 +254,6 @@ class StorageEngine final : public BaseStorageEngine {

bool should_fetch_from_peer(int64_t tablet_id);

const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
return _stream_load_recorder;
}

Status get_compaction_status_json(std::string* result);

// check cumulative compaction config
Expand Down Expand Up @@ -349,8 +353,6 @@ class StorageEngine final : public BaseStorageEngine {
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
CompactionType compaction_type);

Status _init_stream_load_recorder(const std::string& stream_load_record_path);

Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force);

Expand Down Expand Up @@ -470,8 +472,6 @@ class StorageEngine final : public BaseStorageEngine {
std::mutex _compaction_producer_sleep_mutex;
std::condition_variable _compaction_producer_sleep_cv;

std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;

// we use unordered_map to store all cumulative compaction policy sharded ptr
std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>
_cumulative_compaction_policies;
Expand Down
48 changes: 27 additions & 21 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,27 +923,8 @@ void BaseBackendService::close_scanner(TScanCloseResult& result_, const TScanClo

void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
auto stream_load_recorder = _engine.get_stream_load_recorder();
if (stream_load_recorder != nullptr) {
std::map<std::string, std::string> records;
auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
config::stream_load_record_batch_size, &records);
if (st.ok()) {
LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: "
<< records.size()
<< ", last_stream_load_timestamp: " << last_stream_record_time;
std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
auto it = records.begin();
for (; it != records.end(); ++it) {
TStreamLoadRecord stream_load_item;
StreamLoadContext::parse_stream_load_record(it->second, stream_load_item);
stream_load_record_batch.emplace(it->first.c_str(), stream_load_item);
}
result.__set_stream_load_record(stream_load_record_batch);
}
} else {
LOG(WARNING) << "stream_load_recorder is null.";
}
BaseBackendService::get_stream_load_record(result, last_stream_record_time,
_engine.get_stream_load_recorder());
}

void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
Expand Down Expand Up @@ -1199,6 +1180,31 @@ void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
LOG(ERROR) << "get_stream_load_record is not implemented";
}

void BaseBackendService::get_stream_load_record(
TStreamLoadRecordResult& result, int64_t last_stream_record_time,
std::shared_ptr<StreamLoadRecorder> stream_load_recorder) {
if (stream_load_recorder != nullptr) {
std::map<std::string, std::string> records;
auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
config::stream_load_record_batch_size, &records);
if (st.ok()) {
LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: "
<< records.size()
<< ", last_stream_load_timestamp: " << last_stream_record_time;
std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
auto it = records.begin();
for (; it != records.end(); ++it) {
TStreamLoadRecord stream_load_item;
StreamLoadContext::parse_stream_load_record(it->second, stream_load_item);
stream_load_record_batch.emplace(it->first.c_str(), stream_load_item);
}
result.__set_stream_load_record(stream_load_record_batch);
}
} else {
LOG(WARNING) << "stream_load_recorder is null.";
}
}

void BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) {
LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "agent/agent_server.h"
#include "agent/topic_subscriber.h"
#include "common/status.h"
#include "runtime/stream_load/stream_load_recorder.h"

namespace doris {

Expand Down Expand Up @@ -165,6 +166,9 @@ class BaseBackendService : public BackendServiceIf {
protected:
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);

void get_stream_load_record(TStreamLoadRecordResult& result, int64_t last_stream_record_time,
std::shared_ptr<StreamLoadRecorder> stream_load_recorder);

ExecEnv* _exec_env = nullptr;
std::unique_ptr<AgentServer> _agent_server;
std::unique_ptr<ThreadPool> _ingest_binlog_workers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,19 @@ public void addStreamLoadRecord(long dbId, String label, StreamLoadRecord stream
}

public List<StreamLoadItem> getStreamLoadRecords() {
LOG.info("test log: {}", streamLoadRecordHeap);
return new ArrayList<>(streamLoadRecordHeap);
}

public List<List<Comparable>> getStreamLoadRecordByDb(
long dbId, String label, boolean accurateMatch, StreamLoadState state) {
LinkedList<List<Comparable>> streamLoadRecords = new LinkedList<List<Comparable>>();
LOG.info("test log: {}", dbId);

readLock();
try {
if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
LOG.info("test log: {}", dbId);
return streamLoadRecords;
}

Expand Down Expand Up @@ -202,6 +205,7 @@ public List<List<Comparable>> getStreamLoadRecordByDb(
}

}
LOG.info("test log: {}", streamLoadRecords);
return streamLoadRecords;
} finally {
readUnlock();
Expand Down Expand Up @@ -263,19 +267,17 @@ protected void runAfterCatalogReady() {
TimeUtils.getDatetimeMsFormatWithTimeZone());
String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(),
TimeUtils.getDatetimeMsFormatWithTimeZone());
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load record info from backend: {}."
+ " label: {}, db: {}, tbl: {}, user: {}, user_ip: {},"
+ " status: {}, message: {}, error_url: {},"
+ " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {},"
+ " load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(),
streamLoadItem.getLoadBytes(), startTime, finishTime);
}
LOG.info("receive stream load record info from backend: {}."
+ " label: {}, db: {}, tbl: {}, user: {}, user_ip: {},"
+ " status: {}, message: {}, error_url: {},"
+ " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {},"
+ " load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(),
streamLoadItem.getLoadBytes(), startTime, finishTime);

AuditEvent auditEvent =
new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
Expand Down
2 changes: 2 additions & 0 deletions regression-test/pipeline/cloud_p0/conf/be_custom.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ file_cache_path = [{"path":"/data/doris_cloud/file_cache","total_size":104857600
tmp_file_dirs = [{"path":"/data/doris_cloud/tmp","max_cache_bytes":104857600,"max_upload_bytes":104857600}]
thrift_rpc_timeout_ms = 360000
save_load_error_log_to_s3 = true
enable_stream_load_record = true
stream_load_record_batch_size = 500
webserver_num_workers = 128
1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ max_garbage_sweep_interval=180

log_buffer_level = -1
enable_stream_load_record = true
stream_load_record_batch_size = 500
storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1;/mnt/ssd01/cluster_storage/doris.SSD
disable_auto_compaction=true
priority_networks=172.19.0.0/24
Expand Down
Loading

0 comments on commit 5be89b9

Please sign in to comment.