Skip to content

Commit

Permalink
[BugFix] Fix ES::close block pipeline poller
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Dec 10, 2024
1 parent 815b720 commit 69da718
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include "exec/es/es_scroll_parser.h"
#include "exec/es/es_scroll_query.h"
#include "fmt/compile.h"
#include "runtime/exec_env.h"
#include "util/priority_thread_pool.hpp"

namespace starrocks {

Expand Down Expand Up @@ -214,21 +216,32 @@ Status ESScanReader::close() {
}

std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
RETURN_IF_ERROR(_network_client.init(scratch_target));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_method(DELETE);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
if (_ssl_enabled) {
_network_client.trust_all_ssl();
}
std::string response;
RETURN_IF_ERROR(_network_client.execute_delete_request(ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id),
&response));
if (_network_client.get_http_status() == 200) {
return Status::OK();
} else {
return Status::InternalError("es_scan_reader delete scroll context failure");
std::function<void()> send_del_request = [user_name = _user_name, passwd = _passwd, enable_ssl = _ssl_enabled,
scroll_id = _scroll_id, scratch_target]() {
HttpClient client;
RETURN_IF(client.init(scratch_target).ok(), (void)0);
client.set_basic_auth(user_name, passwd);
client.set_method(DELETE);
client.set_content_type("application/json");
client.set_timeout_ms(5 * 1000);
if (enable_ssl) {
client.trust_all_ssl();
}
std::string response;
auto payload = ESScrollQueryBuilder::build_clear_scroll_body(scroll_id);
auto st = client.execute_delete_request(payload, &response);
if (!st.ok()) {
LOG(WARNING) << "es delete scroll id failed";
return;
}
if (client.get_http_status() != 200) {
LOG(WARNING) << "es_scan_reader delete scroll context failure";
}
};
auto* thread_pool = ExecEnv::GetInstance()->pipeline_sink_io_pool();
if (!thread_pool->try_offer(send_del_request)) {
LOG(WARNING) << "try to delete scroll id failed";
}
return Status::OK();
}
} // namespace starrocks

0 comments on commit 69da718

Please sign in to comment.