diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index f576b60045d54d..d91e9e416b81a1 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -29,6 +29,8 @@ #include "common/status.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_recorder.h" #include "util/brpc_client_cache.h" // BrpcClientCache #include "util/thrift_server.h" @@ -186,4 +188,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon response.status = t_status; } +void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& result, + int64_t last_stream_record_time) { + BaseBackendService::get_stream_load_record(result, last_stream_record_time, + _engine.get_stream_load_recorder()); +} + } // namespace doris diff --git a/be/src/cloud/cloud_backend_service.h b/be/src/cloud/cloud_backend_service.h index 88f0099fe73f09..358cb4d1f0b2ec 100644 --- a/be/src/cloud/cloud_backend_service.h +++ b/be/src/cloud/cloud_backend_service.h @@ -53,6 +53,9 @@ class CloudBackendService final : public BaseBackendService { void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response, const TCheckWarmUpCacheAsyncRequest& request) override; + void get_stream_load_record(TStreamLoadRecordResult& result, + int64_t last_stream_record_time) override; + private: CloudStorageEngine& _engine; }; diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index b1b455d2007e1f..de4bbac7b3ef6c 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -196,6 +196,10 @@ Status CloudStorageEngine::open() { _tablet_hotspot = std::make_unique(); + RETURN_NOT_OK_STATUS_WITH_WARN( + init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path), + "init StreamLoadRecorder failed"); + return ThreadPoolBuilder("SyncLoadForTabletsThreadPool") .set_max_threads(config::sync_load_for_tablets_thread) .set_min_threads(config::sync_load_for_tablets_thread) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index a3439969e60ba2..87cc2f694eb102 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -30,6 +30,7 @@ #include #include +#include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/config.h" #include "common/consts.h" @@ -119,7 +120,7 @@ void HttpStreamAction::handle(HttpRequest* req) { // add new line at end str = str + '\n'; HttpChannel::send_reply(req, str); - if (config::enable_stream_load_record && !config::is_cloud_mode()) { + if (config::enable_stream_load_record) { str = ctx->prepare_stream_load_record(str); _save_stream_load_record(ctx, str); } @@ -364,8 +365,9 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, void HttpStreamAction::_save_stream_load_record(std::shared_ptr ctx, const std::string& str) { - auto stream_load_recorder = - ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder(); + std::shared_ptr stream_load_recorder = + ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { std::string key = std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 93fde511898dd3..2b6a0803e81e76 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -39,6 +39,7 @@ #include #include +#include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/config.h" #include "common/consts.h" @@ -217,7 +218,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); #ifndef BE_TEST - if (config::enable_stream_load_record && !config::is_cloud_mode()) { + if (config::enable_stream_load_record) { str = ctx->prepare_stream_load_record(str); _save_stream_load_record(ctx, str); } @@ -705,8 +706,9 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa void StreamLoadAction::_save_stream_load_record(std::shared_ptr ctx, const std::string& str) { - auto stream_load_recorder = - ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder(); + std::shared_ptr stream_load_recorder = + ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { std::string key = std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 43093d3183e438..f9fe26bb9342fc 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -134,6 +134,25 @@ int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change( config::memory_limitation_per_thread_for_schema_change_bytes); } +Status BaseStorageEngine::init_stream_load_recorder(const std::string& stream_load_record_path) { + LOG(INFO) << "stream load record path: " << stream_load_record_path; + // init stream load record rocksdb + _stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path); + if (_stream_load_recorder == nullptr) { + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"), + "new StreamLoadRecorder failed"); + } + auto st = _stream_load_recorder->init(); + if (!st.ok()) { + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::IOError("open StreamLoadRecorder rocksdb failed, path={}", + stream_load_record_path), + "init StreamLoadRecorder failed"); + } + return Status::OK(); +} + static Status _validate_options(const EngineOptions& options) { if (options.store_paths.empty()) { return Status::InternalError("store paths is empty"); @@ -158,7 +177,6 @@ StorageEngine::StorageEngine(const EngineOptions& options) _tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)), _txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)), _default_rowset_type(BETA_ROWSET), - _stream_load_recorder(nullptr), _create_tablet_idx_lru_cache( new CreateTabletIdxCache(config::partition_disk_index_lru_size)), _snapshot_mgr(std::make_unique(*this)) { @@ -274,31 +292,12 @@ Status StorageEngine::_init_store_map() { return Status::InternalError("init path failed, error={}", error_msg); } - RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(_options.store_paths[0].path), + RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path), "init StreamLoadRecorder failed"); return Status::OK(); } -Status StorageEngine::_init_stream_load_recorder(const std::string& stream_load_record_path) { - LOG(INFO) << "stream load record path: " << stream_load_record_path; - // init stream load record rocksdb - _stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path); - if (_stream_load_recorder == nullptr) { - RETURN_NOT_OK_STATUS_WITH_WARN( - Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"), - "new StreamLoadRecorder failed"); - } - auto st = _stream_load_recorder->init(); - if (!st.ok()) { - RETURN_NOT_OK_STATUS_WITH_WARN( - Status::IOError("open StreamLoadRecorder rocksdb failed, path={}", - stream_load_record_path), - "init StreamLoadRecorder failed"); - } - return Status::OK(); -} - void StorageEngine::_update_storage_medium_type_count() { set available_storage_medium_types; diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 6e8fb7bbb7facd..b1f30e5db8c0de 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -133,6 +133,12 @@ class BaseStorageEngine { int get_disk_num() { return _disk_num; } + Status init_stream_load_recorder(const std::string& stream_load_record_path); + + const std::shared_ptr& get_stream_load_recorder() { + return _stream_load_recorder; + } + protected: void _evict_querying_rowset(); void _evict_quring_rowset_thread_callback(); @@ -157,6 +163,8 @@ class BaseStorageEngine { int64_t _memory_limitation_bytes_for_schema_change; int _disk_num {-1}; + + std::shared_ptr _stream_load_recorder; }; class StorageEngine final : public BaseStorageEngine { @@ -246,10 +254,6 @@ class StorageEngine final : public BaseStorageEngine { bool should_fetch_from_peer(int64_t tablet_id); - const std::shared_ptr& get_stream_load_recorder() { - return _stream_load_recorder; - } - Status get_compaction_status_json(std::string* result); // check cumulative compaction config @@ -349,8 +353,6 @@ class StorageEngine final : public BaseStorageEngine { void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type); - Status _init_stream_load_recorder(const std::string& stream_load_record_path); - Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, bool force); @@ -470,8 +472,6 @@ class StorageEngine final : public BaseStorageEngine { std::mutex _compaction_producer_sleep_mutex; std::condition_variable _compaction_producer_sleep_cv; - std::shared_ptr _stream_load_recorder; - // we use unordered_map to store all cumulative compaction policy sharded ptr std::unordered_map> _cumulative_compaction_policies; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 4effc225110a7a..d686c12609a5ed 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -923,27 +923,8 @@ void BaseBackendService::close_scanner(TScanCloseResult& result_, const TScanClo void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, int64_t last_stream_record_time) { - auto stream_load_recorder = _engine.get_stream_load_recorder(); - if (stream_load_recorder != nullptr) { - std::map records; - auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time), - config::stream_load_record_batch_size, &records); - if (st.ok()) { - LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " - << records.size() - << ", last_stream_load_timestamp: " << last_stream_record_time; - std::map stream_load_record_batch; - auto it = records.begin(); - for (; it != records.end(); ++it) { - TStreamLoadRecord stream_load_item; - StreamLoadContext::parse_stream_load_record(it->second, stream_load_item); - stream_load_record_batch.emplace(it->first.c_str(), stream_load_item); - } - result.__set_stream_load_record(stream_load_record_batch); - } - } else { - LOG(WARNING) << "stream_load_recorder is null."; - } + BaseBackendService::get_stream_load_record(result, last_stream_record_time, + _engine.get_stream_load_recorder()); } void BackendService::check_storage_format(TCheckStorageFormatResult& result) { @@ -1199,6 +1180,31 @@ void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result, LOG(ERROR) << "get_stream_load_record is not implemented"; } +void BaseBackendService::get_stream_load_record( + TStreamLoadRecordResult& result, int64_t last_stream_record_time, + std::shared_ptr stream_load_recorder) { + if (stream_load_recorder != nullptr) { + std::map records; + auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time), + config::stream_load_record_batch_size, &records); + if (st.ok()) { + LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " + << records.size() + << ", last_stream_load_timestamp: " << last_stream_record_time; + std::map stream_load_record_batch; + auto it = records.begin(); + for (; it != records.end(); ++it) { + TStreamLoadRecord stream_load_item; + StreamLoadContext::parse_stream_load_record(it->second, stream_load_item); + stream_load_record_batch.emplace(it->first.c_str(), stream_load_item); + } + result.__set_stream_load_record(stream_load_record_batch); + } + } else { + LOG(WARNING) << "stream_load_recorder is null."; + } +} + void BaseBackendService::get_disk_trash_used_capacity(std::vector& diskTrashInfos) { LOG(ERROR) << "get_disk_trash_used_capacity is not implemented"; } diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 0ada1bf5393c8b..f0e06094560ae3 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -26,6 +26,7 @@ #include "agent/agent_server.h" #include "agent/topic_subscriber.h" #include "common/status.h" +#include "runtime/stream_load/stream_load_recorder.h" namespace doris { @@ -165,6 +166,9 @@ class BaseBackendService : public BackendServiceIf { protected: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); + void get_stream_load_record(TStreamLoadRecordResult& result, int64_t last_stream_record_time, + std::shared_ptr stream_load_recorder); + ExecEnv* _exec_env = nullptr; std::unique_ptr _agent_server; std::unique_ptr _ingest_binlog_workers; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index f44e8b785f6ae1..6c53f354af89b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -150,16 +150,19 @@ public void addStreamLoadRecord(long dbId, String label, StreamLoadRecord stream } public List getStreamLoadRecords() { + LOG.info("test log: {}", streamLoadRecordHeap); return new ArrayList<>(streamLoadRecordHeap); } public List> getStreamLoadRecordByDb( long dbId, String label, boolean accurateMatch, StreamLoadState state) { LinkedList> streamLoadRecords = new LinkedList>(); + LOG.info("test log: {}", dbId); readLock(); try { if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) { + LOG.info("test log: {}", dbId); return streamLoadRecords; } @@ -202,6 +205,7 @@ public List> getStreamLoadRecordByDb( } } + LOG.info("test log: {}", streamLoadRecords); return streamLoadRecords; } finally { readUnlock(); @@ -263,19 +267,17 @@ protected void runAfterCatalogReady() { TimeUtils.getDatetimeMsFormatWithTimeZone()); String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(), TimeUtils.getDatetimeMsFormatWithTimeZone()); - if (LOG.isDebugEnabled()) { - LOG.debug("receive stream load record info from backend: {}." - + " label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," - + " status: {}, message: {}, error_url: {}," - + " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {}," - + " load_bytes: {}, start_time: {}, finish_time: {}.", - backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), - streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), - streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), - streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), - streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), - streamLoadItem.getLoadBytes(), startTime, finishTime); - } + LOG.info("receive stream load record info from backend: {}." + + " label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + + " status: {}, message: {}, error_url: {}," + + " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {}," + + " load_bytes: {}, start_time: {}, finish_time: {}.", + backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), + streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), + streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), + streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), + streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), + streamLoadItem.getLoadBytes(), startTime, finishTime); AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH) diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index a2478b47269184..9f2967b1972c11 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -30,4 +30,6 @@ file_cache_path = [{"path":"/data/doris_cloud/file_cache","total_size":104857600 tmp_file_dirs = [{"path":"/data/doris_cloud/tmp","max_cache_bytes":104857600,"max_upload_bytes":104857600}] thrift_rpc_timeout_ms = 360000 save_load_error_log_to_s3 = true +enable_stream_load_record = true +stream_load_record_batch_size = 500 webserver_num_workers = 128 diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 32c7b81f934418..a072ac7ad50aca 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -47,6 +47,7 @@ max_garbage_sweep_interval=180 log_buffer_level = -1 enable_stream_load_record = true +stream_load_record_batch_size = 500 storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1;/mnt/ssd01/cluster_storage/doris.SSD disable_auto_compaction=true priority_networks=172.19.0.0/24 diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy new file mode 100644 index 00000000000000..96a4fff9c538ff --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_record", "p0") { + def tableName = "test_stream_load_record" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, + `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, + `v5` int(11) REPLACE_IF_NOT_NULL NULL, + `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, + `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, + `v8` datetime REPLACE_IF_NOT_NULL NULL, + `v9` date REPLACE_IF_NOT_NULL NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), + PARTITION partition_b VALUES [("100000"), ("1000000000")), + PARTITION partition_c VALUES [("1000000000"), ("10000000000")), + PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + // test strict_mode success + streamLoad { + table "${tableName}" + + set 'column_separator', '\t' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + } + + def count = 0 + while (true) { + sleep(1000) + def res = sql"show stream load" + log.info("Stream load result: ${res}", res) + if (res.size() > 0) { + break + } + if (count > 150) { + assertTrue(-1 > 0) + } + count++ + } +} \ No newline at end of file