diff --git a/contrib/kvproto b/contrib/kvproto index 41564e9f984..e166ae58810 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 41564e9f984b1c8abb8fae9b5c3efcc68c6f691d +Subproject commit e166ae588106b9deef7cc399a7e73f325361c0d2 diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index 602005a9528..829061792f7 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -125,7 +125,8 @@ class Connection : private boost::noncopyable setDescription(); } - virtual ~Connection(){}; + virtual ~Connection() = default; + ; /// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic. void setThrottler(const ThrottlerPtr & throttler_) @@ -268,7 +269,7 @@ class Connection : private boost::noncopyable class LoggerWrapper { public: - LoggerWrapper(Connection & parent_) + explicit LoggerWrapper(Connection & parent_) : log(nullptr) , parent(parent_) { diff --git a/dbms/src/Common/ExternalTable.h b/dbms/src/Common/ExternalTable.h index c747c4d5a53..b9edfe1f926 100644 --- a/dbms/src/Common/ExternalTable.h +++ b/dbms/src/Common/ExternalTable.h @@ -15,7 +15,6 @@ #pragma once #include -#include #include #include #include @@ -195,7 +194,7 @@ class ExternalTablesHandler : public Poco::Net::PartHandler , params(params_) {} - void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream) + void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream) override { /// The buffer is initialized here, not in the virtual function initReadBuffer read_buffer = std::make_unique(stream); diff --git a/dbms/src/Common/HTMLForm.h b/dbms/src/Common/HTMLForm.h deleted file mode 100644 index 610b852c72c..00000000000 --- a/dbms/src/Common/HTMLForm.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include - - -/** Somehow, in case of POST, Poco::Net::HTMLForm doesn't read parameters from URL, only from body. - * This helper allows to read parameters just from URL. - */ -struct HTMLForm : public Poco::Net::HTMLForm -{ - explicit HTMLForm(const Poco::Net::HTTPRequest & request) - { - Poco::URI uri(request.getURI()); - std::istringstream istr(uri.getRawQuery()); - readUrl(istr); - } - - explicit HTMLForm(const Poco::URI & uri) - { - std::istringstream istr(uri.getRawQuery()); - readUrl(istr); - } - - - template - T getParsed(const std::string & key, T default_value) - { - auto it = find(key); - return (it != end()) ? DB::parse(it->second) : default_value; - } - - template - T getParsed(const std::string & key) - { - return DB::parse(get(key)); - } -}; diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 25344d7cb13..11b46566c21 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -17,14 +17,13 @@ #include #include -#define DBMS_NAME "ClickHouse" +#define DBMS_NAME "TiFlash" #define DBMS_VERSION_MAJOR 1 #define DBMS_VERSION_MINOR 1 #define DBMS_DEFAULT_HOST "localhost" #define DBMS_DEFAULT_PORT 9000 #define DBMS_DEFAULT_SECURE_PORT 9440 -#define DBMS_DEFAULT_HTTP_PORT 8123 #define DBMS_DEFAULT_CONNECT_TIMEOUT_SEC 10 #define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS 50 #define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300 @@ -104,17 +103,12 @@ static constexpr UInt64 DEFAULT_DISAGG_TASK_TIMEOUT_SEC = 5 * 60; #define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500 -#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 -#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 - #define PLATFORM_NOT_SUPPORTED "The only supported platforms are x86_64 and AArch64 (work in progress)" #define DEFAULT_MARK_CACHE_SIZE (1ULL * 1024 * 1024 * 1024) #define DEFAULT_METRICS_PORT 8234 -#define DEFAULT_HTTP_PORT 8123 - #if !defined(__x86_64__) && !defined(__aarch64__) // #error PLATFORM_NOT_SUPPORTED #endif diff --git a/dbms/src/Dictionaries/DictionarySourceFactory.cpp b/dbms/src/Dictionaries/DictionarySourceFactory.cpp index 29302df6b2b..1e6f0a50d6f 100644 --- a/dbms/src/Dictionaries/DictionarySourceFactory.cpp +++ b/dbms/src/Dictionaries/DictionarySourceFactory.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -143,15 +142,6 @@ DictionarySourcePtr DictionarySourceFactory::create( return std::make_unique(dict_struct, config, config_prefix + ".executable", sample_block, context); } - else if ("http" == source_type) - { - if (dict_struct.has_expressions) - throw Exception{ - "Dictionary source of type `http` does not support attribute expressions", - ErrorCodes::LOGICAL_ERROR}; - - return std::make_unique(dict_struct, config, config_prefix + ".http", sample_block, context); - } else if ("library" == source_type) { return std::make_unique(dict_struct, config, config_prefix + ".library", sample_block, context); diff --git a/dbms/src/Dictionaries/HTTPDictionarySource.cpp b/dbms/src/Dictionaries/HTTPDictionarySource.cpp deleted file mode 100644 index 662ef2276be..00000000000 --- a/dbms/src/Dictionaries/HTTPDictionarySource.cpp +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -static const size_t max_block_size = 8192; - - -HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_struct_, - const Poco::Util::AbstractConfiguration & config, - const std::string & config_prefix, - Block & sample_block, - const Context & context) - : log(&Poco::Logger::get("HTTPDictionarySource")) - , update_time{std::chrono::system_clock::from_time_t(0)} - , dict_struct{dict_struct_} - , url{config.getString(config_prefix + ".url", "")} - , update_field{config.getString(config_prefix + ".update_field", "")} - , format{config.getString(config_prefix + ".format")} - , sample_block{sample_block} - , context(context) - , timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())) -{ - if (update_field.empty()) - return; - - /// TODO This code is totally wrong and ignorant. - /// What if URL contains fragment (#). What if update_field contains characters that must be %-encoded. - std::string::size_type option = url.find('?'); - if (option == std::string::npos) - update_field = '?' + update_field; - else - update_field = '&' + update_field; -} - -HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) - : log(&Poco::Logger::get("HTTPDictionarySource")) - , update_time{other.update_time} - , dict_struct{other.dict_struct} - , url{other.url} - , update_field{other.update_field} - , format{other.format} - , sample_block{other.sample_block} - , context(other.context) - , timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())) -{ -} - -std::string HTTPDictionarySource::getUpdateFieldAndDate() -{ - if (update_time != std::chrono::system_clock::from_time_t(0)) - { - auto tmp_time = update_time; - update_time = std::chrono::system_clock::now(); - time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1; - char buffer[80]; - struct tm * timeinfo; - timeinfo = localtime(&hr_time); - strftime(buffer, 80, "=%Y-%m-%d%%20%H:%M:%S", timeinfo); - std::string str_time(buffer); - return url + update_field + str_time; - } - else - { - update_time = std::chrono::system_clock::now(); - return url + update_field + "=0000-00-00%2000:00:00"; ///for initial load - } -} - -BlockInputStreamPtr HTTPDictionarySource::loadAll() -{ - LOG_TRACE(log, "loadAll {}", toString()); - Poco::URI uri(url); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts); - auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); - return std::make_shared>(input_stream, std::move(in_ptr)); -} - -BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() -{ - std::string url_update = getUpdateFieldAndDate(); - LOG_TRACE(log, "loadUpdatedAll {}", url_update); - Poco::URI uri(url_update); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts); - auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); - return std::make_shared>(input_stream, std::move(in_ptr)); -} - -BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & ids) -{ - LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); - - ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) { - WriteBufferFromOStream out_buffer(ostr); - auto output_stream = context.getOutputFormat(format, out_buffer, sample_block); - formatIDs(output_stream, ids); - }; - - Poco::URI uri(url); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts); - auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); - return std::make_shared>(input_stream, std::move(in_ptr)); -} - -BlockInputStreamPtr HTTPDictionarySource::loadKeys( - const Columns & key_columns, - const std::vector & requested_rows) -{ - LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); - - ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) { - WriteBufferFromOStream out_buffer(ostr); - auto output_stream = context.getOutputFormat(format, out_buffer, sample_block); - formatKeys(dict_struct, output_stream, key_columns, requested_rows); - }; - - Poco::URI uri(url); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts); - auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); - return std::make_shared>(input_stream, std::move(in_ptr)); -} - -bool HTTPDictionarySource::isModified() const -{ - return true; -} - -bool HTTPDictionarySource::supportsSelectiveLoad() const -{ - return true; -} - -bool HTTPDictionarySource::hasUpdateField() const -{ - return !update_field.empty(); -} - -DictionarySourcePtr HTTPDictionarySource::clone() const -{ - return std::make_unique(*this); -} - -std::string HTTPDictionarySource::toString() const -{ - Poco::URI uri(url); - return uri.toString(); -} - -} // namespace DB diff --git a/dbms/src/Dictionaries/HTTPDictionarySource.h b/dbms/src/Dictionaries/HTTPDictionarySource.h deleted file mode 100644 index 7a8c0d2dbf0..00000000000 --- a/dbms/src/Dictionaries/HTTPDictionarySource.h +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -namespace Poco { class Logger; } - - -namespace DB -{ - -/// Allows loading dictionaries from http[s] source -class HTTPDictionarySource final : public IDictionarySource -{ -public: - HTTPDictionarySource(const DictionaryStructure & dict_struct_, - const Poco::Util::AbstractConfiguration & config, - const std::string & config_prefix, - Block & sample_block, - const Context & context); - - HTTPDictionarySource(const HTTPDictionarySource & other); - - BlockInputStreamPtr loadAll() override; - - BlockInputStreamPtr loadUpdatedAll() override; - - BlockInputStreamPtr loadIds(const std::vector & ids) override; - - BlockInputStreamPtr loadKeys( - const Columns & key_columns, const std::vector & requested_rows) override; - - bool isModified() const override; - - bool supportsSelectiveLoad() const override; - - bool hasUpdateField() const override; - - DictionarySourcePtr clone() const override; - - std::string toString() const override; - -private: - std::string getUpdateFieldAndDate(); - - Poco::Logger * log; - - LocalDateTime getLastModification() const; - - std::chrono::time_point update_time; - const DictionaryStructure dict_struct; - const std::string url; - std::string update_field; - const std::string format; - Block sample_block; - const Context & context; - ConnectionTimeouts timeouts; -}; - -} diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h index f5356ae1661..abff5d51c0a 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 936bbf45bf8..6030ef79e46 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -34,8 +34,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -84,7 +86,7 @@ FlashService::FlashService() = default; void FlashService::init(Context & context_) { context = &context_; - log = &Poco::Logger::get("FlashService"); + log = Logger::get("FlashService"); manual_compact_manager = std::make_unique( context->getGlobalContext(), context->getGlobalContext().getSettingsRef()); @@ -130,7 +132,7 @@ String getClientMetaVarWithDefault(const grpc::ServerContext * grpc_context, con return default_val; } -void updateSettingsFromTiDB(const grpc::ServerContext * grpc_context, ContextPtr & context, Poco::Logger * log) +void updateSettingsFromTiDB(const grpc::ServerContext * grpc_context, ContextPtr & context, LoggerPtr log) { const static std::vector> tidb_varname_to_tiflash_varname = { std::make_pair("tidb_max_tiflash_threads", "max_threads"), @@ -816,6 +818,59 @@ grpc::Status FlashService::GetDisaggConfig(grpc::ServerContext * grpc_context, c return grpc::Status::OK; } +grpc::Status FlashService::GetTiFlashSystemTable( + grpc::ServerContext * grpc_context, + const kvrpcpb::TiFlashSystemTableRequest * request, + kvrpcpb::TiFlashSystemTableResponse * response) +{ + CPUAffinityManager::getInstance().bindSelfGrpcThread(); + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; + + try + { + ContextPtr ctx; + std::tie(ctx, std::ignore) = createDBContext(grpc_context); + ctx->setDefaultFormat("JSONCompact"); + ReadBufferFromString in_buf(request->sql()); + MemoryWriteBuffer out_buf; + executeQuery(in_buf, out_buf, false, *ctx, nullptr); + auto data_size = out_buf.count(); + auto buf = out_buf.tryGetReadBuffer(); + String data; + data.resize(data_size); + buf->readStrict(&data[0], data_size); + response->set_data(data); + } + catch (const TiFlashException & e) + { + LOG_ERROR(log, "TiFlash Exception: {}\n{}", e.displayText(), e.getStackTrace().toString()); + return grpc::Status(grpc::StatusCode::INTERNAL, e.standardText()); + } + catch (const Exception & e) + { + LOG_ERROR(log, "DB Exception: {}\n{}", e.message(), e.getStackTrace().toString()); + return grpc::Status(tiflashErrorCodeToGrpcStatusCode(e.code()), e.message()); + } + catch (const pingcap::Exception & e) + { + LOG_ERROR(log, "KV Client Exception: {}", e.message()); + return grpc::Status(grpc::StatusCode::INTERNAL, e.message()); + } + catch (const std::exception & e) + { + LOG_ERROR(log, "std exception: {}", e.what()); + return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + catch (...) + { + LOG_ERROR(log, "other exception"); + return grpc::Status(grpc::StatusCode::INTERNAL, "other exception"); + } + return grpc::Status::OK; +} + void FlashService::setMockStorage(MockStorage * mock_storage_) { mock_storage = mock_storage_; diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 896e810596f..772dc93265c 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -86,8 +86,8 @@ class FlashService : public tikvpb::Tikv::Service // For S3 Lock Service - grpc::Status tryAddLock(grpc::ServerContext * /*context*/, const disaggregated::TryAddLockRequest * request, disaggregated::TryAddLockResponse * response) override; - grpc::Status tryMarkDelete(grpc::ServerContext * /*context*/, const disaggregated::TryMarkDeleteRequest * request, disaggregated::TryMarkDeleteResponse * response) override; + grpc::Status tryAddLock(grpc::ServerContext * grpc_context, const disaggregated::TryAddLockRequest * request, disaggregated::TryAddLockResponse * response) override; + grpc::Status tryMarkDelete(grpc::ServerContext * grpc_context, const disaggregated::TryMarkDeleteRequest * request, disaggregated::TryMarkDeleteResponse * response) override; // The TiFlash read node call this RPC to build the disaggregated task // on the TiFlash write node. @@ -98,6 +98,7 @@ class FlashService : public tikvpb::Tikv::Service grpc::Status FetchDisaggPages(grpc::ServerContext * grpc_context, const disaggregated::FetchDisaggPagesRequest * request, grpc::ServerWriter * sync_writer) override; grpc::Status GetDisaggConfig(grpc::ServerContext * grpc_context, const disaggregated::GetDisaggConfigRequest * request, disaggregated::GetDisaggConfigResponse * response) override; + grpc::Status GetTiFlashSystemTable(grpc::ServerContext * grpc_context, const kvrpcpb::TiFlashSystemTableRequest * request, kvrpcpb::TiFlashSystemTableResponse * response) override; void setMockStorage(MockStorage * mock_storage_); void setMockMPPServerInfo(MockMPPServerInfo & mpp_test_info_); @@ -109,7 +110,7 @@ class FlashService : public tikvpb::Tikv::Service grpc::Status checkGrpcContext(const grpc::ServerContext * grpc_context) const; Context * context = nullptr; - Poco::Logger * log = nullptr; + LoggerPtr log; bool is_async = false; bool enable_local_tunnel = false; bool enable_async_grpc_client = false; diff --git a/dbms/src/IO/ConnectionTimeouts.h b/dbms/src/IO/ConnectionTimeouts.h index f38bb28759b..31140544937 100644 --- a/dbms/src/IO/ConnectionTimeouts.h +++ b/dbms/src/IO/ConnectionTimeouts.h @@ -63,11 +63,6 @@ struct ConnectionTimeouts { return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout); } - - static ConnectionTimeouts getHTTPTimeouts(const Settings & settings) - { - return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout); - } }; } // namespace DB diff --git a/dbms/src/IO/Progress.cpp b/dbms/src/IO/Progress.cpp index 5f94b4155a0..f5acef3a27d 100644 --- a/dbms/src/IO/Progress.cpp +++ b/dbms/src/IO/Progress.cpp @@ -46,24 +46,9 @@ void ProgressValues::write(WriteBuffer & out, UInt64 /*client_revision*/) const } -void ProgressValues::writeJSON(WriteBuffer & out) const -{ - /// Numbers are written in double quotes (as strings) to avoid loss of precision - /// of 64-bit integers after interpretation by JavaScript. - - writeCString("{\"read_rows\":\"", out); - writeText(rows, out); - writeCString("\",\"read_bytes\":\"", out); - writeText(bytes, out); - writeCString("\",\"total_rows\":\"", out); - writeText(total_rows, out); - writeCString("\"}", out); -} - - void Progress::read(ReadBuffer & in, UInt64 server_revision) { - ProgressValues values; + ProgressValues values{}; values.read(in, server_revision); rows.store(values.rows, std::memory_order_relaxed); @@ -77,10 +62,4 @@ void Progress::write(WriteBuffer & out, UInt64 client_revision) const getValues().write(out, client_revision); } - -void Progress::writeJSON(WriteBuffer & out) const -{ - getValues().writeJSON(out); -} - } // namespace DB diff --git a/dbms/src/IO/Progress.h b/dbms/src/IO/Progress.h index db91ee70450..452d95cb23b 100644 --- a/dbms/src/IO/Progress.h +++ b/dbms/src/IO/Progress.h @@ -35,7 +35,6 @@ struct ProgressValues void read(ReadBuffer & in, UInt64 server_revision); void write(WriteBuffer & out, UInt64 client_revision) const; - void writeJSON(WriteBuffer & out) const; }; @@ -54,7 +53,7 @@ struct Progress */ std::atomic total_rows{0}; - Progress() {} + Progress() = default; Progress(size_t rows_, size_t bytes_, size_t total_rows_ = 0) : rows(rows_) , bytes(bytes_) @@ -64,9 +63,6 @@ struct Progress void read(ReadBuffer & in, UInt64 server_revision); void write(WriteBuffer & out, UInt64 client_revision) const; - /// Progress in JSON format (single line, without whitespaces) is used in HTTP headers. - void writeJSON(WriteBuffer & out) const; - /// Each value separately is changed atomically (but not whole object). void incrementPiecewiseAtomically(const Progress & rhs) { @@ -84,7 +80,7 @@ struct Progress ProgressValues getValues() const { - ProgressValues res; + ProgressValues res{}; res.rows = rows.load(std::memory_order_relaxed); res.bytes = bytes.load(std::memory_order_relaxed); @@ -95,7 +91,7 @@ struct Progress ProgressValues fetchAndResetPiecewiseAtomically() { - ProgressValues res; + ProgressValues res{}; res.rows = rows.fetch_and(0); res.bytes = bytes.fetch_and(0); diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp deleted file mode 100644 index 7ad3dfca372..00000000000 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#if Poco_NetSSL_FOUND -#include -#endif - - -namespace DB -{ -namespace ErrorCodes -{ -extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; -extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS; -} // namespace ErrorCodes - - -ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri, - const std::string & method_, - OutStreamCallback out_stream_callback, - const ConnectionTimeouts & timeouts, - size_t buffer_size_) - : ReadBuffer(nullptr, 0) - , uri{uri} - , method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST - : Poco::Net::HTTPRequest::HTTP_GET} - , timeouts{timeouts} - , is_ssl{uri.getScheme() == "https"} - , session -{ - std::unique_ptr( -#if Poco_NetSSL_FOUND - is_ssl ? new Poco::Net::HTTPSClientSession : -#endif - new Poco::Net::HTTPClientSession) -} -{ - session->setHost(DNSCache::instance().resolveHost(uri.getHost()).toString()); - session->setPort(uri.getPort()); - -#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000 - session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout); -#else - session->setTimeout(timeouts.connection_timeout); -#endif - - Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); - request.setHost(uri.getHost()); // use original, not resolved host name in header - - if (out_stream_callback) - request.setChunkedTransferEncoding(true); - - Poco::Net::HTTPResponse response; - - LOG_TRACE((&Poco::Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri.toString()); - - auto & stream_out = session->sendRequest(request); - - if (out_stream_callback) - out_stream_callback(stream_out); - - istr = &session->receiveResponse(response); - - auto status = response.getStatus(); - - if (status != Poco::Net::HTTPResponse::HTTP_OK) - { - std::stringstream error_message; - error_message << "Received error from remote server " << uri.toString() << ". HTTP status code: " << status << " " - << response.getReason() << ", body: " << istr->rdbuf(); - - throw Exception(error_message.str(), - status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS - : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); - } - - impl = std::make_unique(*istr, buffer_size_); -} - - -bool ReadWriteBufferFromHTTP::nextImpl() -{ - if (!impl->next()) - return false; - internal_buffer = impl->buffer(); - working_buffer = internal_buffer; - return true; -} -} // namespace DB diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.h b/dbms/src/IO/ReadWriteBufferFromHTTP.h deleted file mode 100644 index 1242a684b9a..00000000000 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.h +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include - -#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 -#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 - -namespace DB -{ -const int HTTP_TOO_MANY_REQUESTS = 429; - - -/** Perform HTTP POST request and provide response to read. - */ -class ReadWriteBufferFromHTTP : public ReadBuffer -{ -private: - Poco::URI uri; - std::string method; - ConnectionTimeouts timeouts; - - bool is_ssl; - std::unique_ptr session; - std::istream * istr; /// owned by session - std::unique_ptr impl; - -public: - using OutStreamCallback = std::function; - - explicit ReadWriteBufferFromHTTP( - const Poco::URI & uri, - const std::string & method = {}, - OutStreamCallback out_stream_callback = {}, - const ConnectionTimeouts & timeouts = {}, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE); - - bool nextImpl() override; -}; - -} // namespace DB diff --git a/dbms/src/IO/RemoteReadBuffer.h b/dbms/src/IO/RemoteReadBuffer.h deleted file mode 100644 index 64749eff6ed..00000000000 --- a/dbms/src/IO/RemoteReadBuffer.h +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include "ReadHelpers.h" - -#define DEFAULT_REMOTE_READ_BUFFER_CONNECTION_TIMEOUT 1 -#define DEFAULT_REMOTE_READ_BUFFER_RECEIVE_TIMEOUT 1800 -#define DEFAULT_REMOTE_READ_BUFFER_SEND_TIMEOUT 1800 - -namespace DB -{ -/** Allows you to read a file from a remote server via riod. - */ -class RemoteReadBuffer : public ReadBuffer -{ -private: - std::unique_ptr impl; - -public: - RemoteReadBuffer( - const std::string & host, - int port, - const std::string & path, - bool compress = true, - size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE, - const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_CONNECTION_TIMEOUT, 0), - const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_SEND_TIMEOUT, 0), - const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_RECEIVE_TIMEOUT, 0)) - : ReadBuffer(nullptr, 0) - { - Poco::URI uri; - uri.setScheme("http"); - uri.setHost(host); - uri.setPort(port); - uri.setQueryParameters( - {std::make_pair("action", "read"), - std::make_pair("path", path), - std::make_pair("compress", (compress ? "true" : "false"))}); - - ConnectionTimeouts timeouts(connection_timeout, send_timeout, receive_timeout); - ReadWriteBufferFromHTTP::OutStreamCallback callback; - impl = std::make_unique(uri, std::string(), callback, timeouts, buffer_size); - } - - bool nextImpl() override - { - if (!impl->next()) - return false; - internal_buffer = impl->buffer(); - working_buffer = internal_buffer; - return true; - } - - /// Return the list of file names in the directory. - static std::vector listFiles( - const std::string & host, - int port, - const std::string & path, - const ConnectionTimeouts & timeouts) - { - Poco::URI uri; - uri.setScheme("http"); - uri.setHost(host); - uri.setPort(port); - uri.setQueryParameters( - {std::make_pair("action", "list"), - std::make_pair("path", path)}); - - ReadWriteBufferFromHTTP in(uri, {}, {}, timeouts); - - std::vector files; - while (!in.eof()) - { - std::string s; - readString(s, in); - skipWhitespaceIfAny(in); - files.push_back(s); - } - - return files; - } -}; - -} // namespace DB diff --git a/dbms/src/IO/RemoteWriteBuffer.h b/dbms/src/IO/RemoteWriteBuffer.h deleted file mode 100644 index 07478e1400f..00000000000 --- a/dbms/src/IO/RemoteWriteBuffer.h +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT 1 -#define DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT 1800 -#define DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT 1800 - - -namespace DB -{ -namespace ErrorCodes -{ -extern const int CANNOT_WRITE_TO_OSTREAM; -extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; -} // namespace ErrorCodes - -/** Allows you to write a file to a remote server. - */ -class RemoteWriteBuffer : public WriteBuffer -{ -private: - std::string host; - int port; - std::string path; - std::string encoded_path; - std::string encoded_tmp_path; - std::string tmp_path; - std::string if_exists; - bool decompress; - unsigned connection_retries; - - std::string uri_str; - - Poco::Net::HTTPClientSession session; - std::ostream * ostr; /// this is owned by session - std::unique_ptr impl; - - /// Have sent all the data and renamed the file - bool finalized; - -public: - /** If tmp_path is not empty, it writes first the temporary file, and then renames it, - * deleting existing files, if any. - * Otherwise, if_exists parameter is used. - */ - RemoteWriteBuffer(const std::string & host_, int port_, const std::string & path_, const std::string & tmp_path_ = "", const std::string & if_exists_ = "remove", bool decompress_ = false, unsigned connection_retries_ = 3, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT, 0), const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT, 0), const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT, 0)) - : WriteBuffer(nullptr, 0) - , host(host_) - , port(port_) - , path(path_) - , tmp_path(tmp_path_) - , if_exists(if_exists_) - , decompress(decompress_) - , connection_retries(connection_retries_) - , finalized(false) - { - Poco::URI::encode(path, "&#", encoded_path); - Poco::URI::encode(tmp_path, "&#", encoded_tmp_path); - - std::stringstream uri; - uri << "http://" << host << ":" << port - << "/?action=write" - << "&path=" << (tmp_path.empty() ? encoded_path : encoded_tmp_path) - << "&if_exists=" << if_exists - << "&decompress=" << (decompress ? "true" : "false"); - - uri_str = Poco::URI(uri.str()).getPathAndQuery(); - - session.setHost(host); - session.setPort(port); - session.setKeepAlive(true); - - /// set the timeout -#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000 - session.setTimeout(connection_timeout, send_timeout, receive_timeout); -#else - session.setTimeout(connection_timeout); -#endif - - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri_str, Poco::Net::HTTPRequest::HTTP_1_1); - - request.setChunkedTransferEncoding(true); - - for (unsigned i = 0; i < connection_retries; ++i) - { - LOG_TRACE((&Poco::Logger::get("RemoteWriteBuffer")), "Sending write request to {}:{}{}", host, port, uri_str); - - try - { - ostr = &session.sendRequest(request); - } - catch (const Poco::Net::NetException & e) - { - if (i + 1 == connection_retries) - throw; - - LOG_WARNING((&Poco::Logger::get("RemoteWriteBuffer")), "{}, URL: {}:{}{}, try No {}.", e.displayText(), host, port, uri_str, i + 1); - session.reset(); - continue; - } - catch (const Poco::TimeoutException & e) - { - if (i + 1 == connection_retries) - throw; - - LOG_WARNING((&Poco::Logger::get("RemoteWriteBuffer")), "Connection timeout from {}:{}{}, try No {}.", host, port, uri_str, i + 1); - session.reset(); - continue; - } - - break; - } - - impl = std::make_unique(*ostr, buffer_size_); - - set(impl->buffer().begin(), impl->buffer().size()); - } - - void nextImpl() override - { - if (!offset() || finalized) - return; - - /// For correct work with AsynchronousWriteBuffer, which replaces buffers. - impl->set(buffer().begin(), buffer().size()); - - impl->position() = pos; - - try - { - impl->next(); - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::CANNOT_WRITE_TO_OSTREAM) - checkStatus(); /// Change the error message to a clearer one. - throw; - } - } - - void finalize() - { - if (finalized) - return; - - next(); - checkStatus(); - - /// Rename the file if necessary. - if (!tmp_path.empty()) - rename(); - - finalized = true; - } - - void cancel() - { - finalized = true; - } - - ~RemoteWriteBuffer() - { - try - { - finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - - -private: - void checkStatus() - { - Poco::Net::HTTPResponse response; - std::istream & istr = session.receiveResponse(response); - Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus(); - - std::stringstream message; - message << istr.rdbuf(); - - if (status != Poco::Net::HTTPResponse::HTTP_OK || message.str() != "Ok.\n") - { - std::stringstream error_message; - error_message << "Received error from remote server " << uri_str << ", body: " << message.str(); - - throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); - } - } - - void rename() - { - std::stringstream uri; - uri << "http://" << host << ":" << port - << "/?action=rename" - << "&from=" << encoded_tmp_path - << "&to=" << encoded_path; - - uri_str = Poco::URI(uri.str()).getPathAndQuery(); - - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri_str, Poco::Net::HTTPRequest::HTTP_1_1); - - for (unsigned i = 0; i < connection_retries; ++i) - { - LOG_TRACE((&Poco::Logger::get("RemoteWriteBuffer")), "Sending rename request to {}:{}{}", host, port, uri_str); - - try - { - session.sendRequest(request); - checkStatus(); - } - catch (const Poco::Net::NetException & e) - { - if (i + 1 == connection_retries) - throw; - - LOG_WARNING((&Poco::Logger::get("RemoteWriteBuffer")), "{}, message: {}, URL: {}:{}{}, try No {}.", e.what(), e.displayText(), host, port, uri_str, i + 1); - session.reset(); - continue; - } - catch (const Poco::TimeoutException & e) - { - if (i + 1 == connection_retries) - throw; - - LOG_WARNING((&Poco::Logger::get("RemoteWriteBuffer")), "Connection timeout from {}:{}{}, try No {}.", host, port, uri_str, i + 1); - session.reset(); - continue; - } - catch (const Exception & e) - { - /// If in the last attempt we did not receive a response from the server, but the file was renamed already. - if (i != 0 && e.code() == ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER - && nullptr != strstr(e.displayText().data(), "File not found")) - { - LOG_TRACE((&Poco::Logger::get("RemoteWriteBuffer")), "File already renamed"); - } - else - throw; - } - - break; - } - } -}; - -} // namespace DB diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp deleted file mode 100644 index f71eefddf9a..00000000000 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -namespace ErrorCodes -{ -extern const int LOGICAL_ERROR; -} - - -void WriteBufferFromHTTPServerResponse::startSendHeaders() -{ - if (!headers_started_sending) - { - headers_started_sending = true; - - if (add_cors_header) - response.set("Access-Control-Allow-Origin", "*"); - - setResponseDefaultHeaders(response, keep_alive_timeout); - -#if POCO_CLICKHOUSE_PATCH - if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) - std::tie(response_header_ostr, response_body_ostr) = response.beginSend(); -#endif - } -} - - -void WriteBufferFromHTTPServerResponse::finishSendHeaders() -{ - if (!headers_finished_sending) - { - headers_finished_sending = true; - - if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) - { -#if POCO_CLICKHOUSE_PATCH - /// Send end of headers delimiter. - if (response_header_ostr) - *response_header_ostr << "\r\n" - << std::flush; -#else - /// Newline autosent by response.send() - /// if nothing to send in body: - if (!response_body_ostr) - response_body_ostr = &(response.send()); -#endif - } - else - { - if (!response_body_ostr) - response_body_ostr = &(response.send()); - } - } -} - - -void WriteBufferFromHTTPServerResponse::nextImpl() -{ - { - std::lock_guard lock(mutex); - - startSendHeaders(); - - if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) - { - if (compress) - { - if (compression_method == ZlibCompressionMethod::Gzip) - { -#if POCO_CLICKHOUSE_PATCH - *response_header_ostr << "Content-Encoding: gzip\r\n"; -#else - response.set("Content-Encoding", "gzip"); -#endif - } - else if (compression_method == ZlibCompressionMethod::Zlib) - { -#if POCO_CLICKHOUSE_PATCH - *response_header_ostr << "Content-Encoding: deflate\r\n"; -#else - response.set("Content-Encoding", "deflate"); -#endif - } - else - throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse", - ErrorCodes::LOGICAL_ERROR); - /// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy. - -#if !POCO_CLICKHOUSE_PATCH - response_body_ostr = &(response.send()); -#endif - - out_raw.emplace(*response_body_ostr); - deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin()); - out = &*deflating_buf; - } - else - { -#if !POCO_CLICKHOUSE_PATCH - response_body_ostr = &(response.send()); -#endif - - out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin()); - out = &*out_raw; - } - } - - finishSendHeaders(); - } - - if (out) - { - out->position() = position(); - out->next(); - } -} - - -WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( - Poco::Net::HTTPServerRequest & request_, - Poco::Net::HTTPServerResponse & response_, - unsigned keep_alive_timeout_, - bool compress_, - ZlibCompressionMethod compression_method_, - size_t size) - : BufferWithOwnMemory(size) - , request(request_) - , response(response_) - , keep_alive_timeout(keep_alive_timeout_) - , compress(compress_) - , compression_method(compression_method_) -{ -} - - -void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress) -{ - std::lock_guard lock(mutex); - - /// Cannot add new headers if body was started to send. - if (headers_finished_sending) - return; - - accumulated_progress.incrementPiecewiseAtomically(progress); - - if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000) - { - progress_watch.restart(); - - /// Send all common headers before our special progress headers. - startSendHeaders(); - - WriteBufferFromOwnString progress_string_writer; - accumulated_progress.writeJSON(progress_string_writer); - -#if POCO_CLICKHOUSE_PATCH - *response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" - << std::flush; -#endif - } -} - - -void WriteBufferFromHTTPServerResponse::finalize() -{ - if (offset()) - { - next(); - } - else - { - /// If no remaining data, just send headers. - std::lock_guard lock(mutex); - startSendHeaders(); - finishSendHeaders(); - } -} - - -WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse() -{ - try - { - finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } -} - -} // namespace DB diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.h b/dbms/src/IO/WriteBufferFromHTTPServerResponse.h deleted file mode 100644 index ece67dfd3ed..00000000000 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.h +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - - -namespace Poco -{ -namespace Net -{ -class HTTPServerResponse; -} -} // namespace Poco - - -namespace DB -{ -/// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream -/// (using response.send()) only after data is flushed for the first time. This is needed in HTTP -/// servers to change some HTTP headers (e.g. response code) before any data is sent to the client -/// (headers can't be changed after response.send() is called). -/// -/// In short, it allows delaying the call to response.send(). -/// -/// Additionally, supports HTTP response compression (in this case corresponding Content-Encoding -/// header will be set). -/// -/// Also this class write and flush special X-ClickHouse-Progress HTTP headers -/// if no data was sent at the time of progress notification. -/// This allows to implement progress bar in HTTP clients. -class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory -{ -private: - Poco::Net::HTTPServerRequest & request; - Poco::Net::HTTPServerResponse & response; - - bool add_cors_header = false; - unsigned keep_alive_timeout = 0; - bool compress = false; - ZlibCompressionMethod compression_method; - int compression_level = Z_DEFAULT_COMPRESSION; - - std::ostream * response_body_ostr = nullptr; - -#if POCO_CLICKHOUSE_PATCH - std::ostream * response_header_ostr = nullptr; -#endif - - std::optional out_raw; - std::optional deflating_buf; - - WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf. - - bool headers_started_sending = false; - bool headers_finished_sending = false; /// If true, you could not add any headers. - - Progress accumulated_progress; - size_t send_progress_interval_ms = 100; - Stopwatch progress_watch; - - std::mutex mutex; /// progress callback could be called from different threads. - - - /// Must be called under locked mutex. - /// This method send headers, if this was not done already, - /// but not finish them with \r\n, allowing to send more headers subsequently. - void startSendHeaders(); - - /// This method finish headers with \r\n, allowing to start to send body. - void finishSendHeaders(); - - void nextImpl() override; - -public: - WriteBufferFromHTTPServerResponse( - Poco::Net::HTTPServerRequest & request_, - Poco::Net::HTTPServerResponse & response_, - unsigned keep_alive_timeout_, - bool compress_ = false, /// If true - set Content-Encoding header and compress the result. - ZlibCompressionMethod compression_method_ = ZlibCompressionMethod::Gzip, - size_t size = DBMS_DEFAULT_BUFFER_SIZE); - - /// Writes progess in repeating HTTP headers. - void onProgress(const Progress & progress); - - /// Send at least HTTP headers if no data has been sent yet. - /// Use after the data has possibly been sent and no error happened (and thus you do not plan - /// to change response HTTP code. - /// This method is idempotent. - void finalize(); - - /// Turn compression on or off. - /// The setting has any effect only if HTTP headers haven't been sent yet. - void setCompression(bool enable_compression) - { - compress = enable_compression; - } - - /// Set compression level if the compression is turned on. - /// The setting has any effect only if HTTP headers haven't been sent yet. - void setCompressionLevel(int level) - { - compression_level = level; - } - - /// Turn CORS on or off. - /// The setting has any effect only if HTTP headers haven't been sent yet. - void addHeaderCORS(bool enable_cors) - { - add_cors_header = enable_cors; - } - - /// Don't send HTTP headers with progress more frequently. - void setSendProgressInterval(size_t send_progress_interval_ms_) - { - send_progress_interval_ms = send_progress_interval_ms_; - } - - ~WriteBufferFromHTTPServerResponse(); -}; - -} // namespace DB diff --git a/dbms/src/Interpreters/ClientInfo.cpp b/dbms/src/Interpreters/ClientInfo.cpp index 4e7fa9666bb..fd9d0fdbb34 100644 --- a/dbms/src/Interpreters/ClientInfo.cpp +++ b/dbms/src/Interpreters/ClientInfo.cpp @@ -55,11 +55,6 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) writeVarUInt(client_version_minor, out); writeVarUInt(client_revision, out); } - else if (interface == Interface::HTTP) - { - writeBinary(UInt8(http_method), out); - writeBinary(http_user_agent, out); - } if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) writeBinary(quota_key, out); @@ -97,14 +92,6 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) readVarUInt(client_version_minor, in); readVarUInt(client_revision, in); } - else if (interface == Interface::HTTP) - { - UInt8 read_http_method = 0; - readBinary(read_http_method, in); - http_method = HTTPMethod(read_http_method); - - readBinary(http_user_agent, in); - } if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) readBinary(quota_key, in); diff --git a/dbms/src/Interpreters/ClientInfo.h b/dbms/src/Interpreters/ClientInfo.h index 29847d9ced3..6167bb9a39a 100644 --- a/dbms/src/Interpreters/ClientInfo.h +++ b/dbms/src/Interpreters/ClientInfo.h @@ -36,15 +36,7 @@ class ClientInfo enum class Interface : UInt8 { TCP = 1, - HTTP = 2, - GRPC = 3, - }; - - enum class HTTPMethod : UInt8 - { - UNKNOWN = 0, - GET = 1, - POST = 2, + GRPC = 2, }; enum class QueryKind : UInt8 @@ -81,10 +73,6 @@ class ClientInfo UInt64 client_version_minor = 0; unsigned client_revision = 0; - /// For http - HTTPMethod http_method = HTTPMethod::UNKNOWN; - String http_user_agent; - /// Common String quota_key; @@ -94,8 +82,8 @@ class ClientInfo * Only values that are not calculated automatically or passed separately are serialized. * Revisions are passed to use format that server will understand or client was used. */ - void write(WriteBuffer & out, const UInt64 server_protocol_revision) const; - void read(ReadBuffer & in, const UInt64 client_protocol_revision); + void write(WriteBuffer & out, UInt64 server_protocol_revision) const; + void read(ReadBuffer & in, UInt64 client_protocol_revision); void fillOSUserHostNameAndVersionInfo(); }; diff --git a/dbms/src/Interpreters/InterpreterFactory.cpp b/dbms/src/Interpreters/InterpreterFactory.cpp index 269eaed8330..ff747e7e0f3 100644 --- a/dbms/src/Interpreters/InterpreterFactory.cpp +++ b/dbms/src/Interpreters/InterpreterFactory.cpp @@ -58,13 +58,7 @@ static void throwIfReadOnly(Context & context) { if (context.getSettingsRef().readonly) { - const auto & client_info = context.getClientInfo(); - if (client_info.interface == ClientInfo::Interface::HTTP && client_info.http_method == ClientInfo::HTTPMethod::GET) - throw Exception("Cannot execute query in readonly mode. " - "For queries over HTTP, method GET implies readonly. You should use method POST for modifying queries.", - ErrorCodes::READONLY); - else - throw Exception("Cannot execute query in readonly mode", ErrorCodes::READONLY); + throw Exception("Cannot execute query in readonly mode", ErrorCodes::READONLY); } } diff --git a/dbms/src/Interpreters/QueryLog.cpp b/dbms/src/Interpreters/QueryLog.cpp index 51288b8a212..8acb96213a2 100644 --- a/dbms/src/Interpreters/QueryLog.cpp +++ b/dbms/src/Interpreters/QueryLog.cpp @@ -84,7 +84,7 @@ Block QueryLogElement::createBlock() static std::array IPv6ToBinary(const Poco::Net::IPAddress & address) { - std::array res; + std::array res{}; if (Poco::Net::IPAddress::IPv6 == address.family()) { @@ -151,9 +151,6 @@ void QueryLogElement::appendToBlock(Block & block) const columns[i++]->insert(client_info.client_name); columns[i++]->insert(UInt64(client_info.client_revision)); - columns[i++]->insert(UInt64(client_info.http_method)); - columns[i++]->insert(client_info.http_user_agent); - columns[i++]->insert(client_info.quota_key); columns[i++]->insert(UInt64(ClickHouseRevision::get())); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index c12a10c86c5..ad63ac6032c 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -115,17 +115,10 @@ struct Settings \ M(SettingInt64, memory_tracker_accuracy_diff_for_test, 0, "For testing of the accuracy of the memory tracker - throw an exception when real_rss is much larger than tracked amount.") \ \ - M(SettingBool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.") \ - M(SettingInt64, http_zlib_compression_level, 3, "Compression level - used if the client on HTTP said that it understands data compressed by gzip or deflate.") \ - \ - M(SettingBool, http_native_compression_disable_checksumming_on_decompress, 0, "If you uncompress the POST data from the client compressed by the native format, do not check the checksum.") \ - \ M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)") \ \ M(SettingBool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.") \ \ - M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \ - \ M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats).") \ \ M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \ @@ -138,17 +131,9 @@ struct Settings \ M(SettingBool, use_client_time_zone, false, "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.") \ \ - M(SettingBool, send_progress_in_http_headers, false, "Send progress notifications using X-ClickHouse-Progress headers. Some clients do not support high amount of HTTP headers " \ - "(Python requests in particular), so it is disabled by default.") \ - \ - M(SettingUInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.") \ - \ M(SettingBool, fsync_metadata, 1, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server " \ "with high load of DDL queries and high load of disk subsystem.") \ \ - M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ - M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \ - M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \ M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \ \ M(SettingUInt64, max_bytes_before_external_group_by, 0, "") \ diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt index 41b26e767c4..7e0704e32e7 100644 --- a/dbms/src/Server/CMakeLists.txt +++ b/dbms/src/Server/CMakeLists.txt @@ -35,13 +35,9 @@ target_link_libraries (server_for_test PUBLIC tiflash_common_io tiflash_storages target_include_directories (server_for_test PRIVATE ${TiFlash_SOURCE_DIR}/contrib/grpc/) add_library (tiflash-server-lib - HTTPHandler.cpp CertificateReloader.cpp MetricsTransmitter.cpp MetricsPrometheus.cpp - NotFoundHandler.cpp - PingRequestHandler.cpp - RootRequestHandler.cpp ServerInfo.cpp Server.cpp StatusFile.cpp diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp deleted file mode 100644 index fe7236fbc18..00000000000 --- a/dbms/src/Server/HTTPHandler.cpp +++ /dev/null @@ -1,649 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "HTTPHandler.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -namespace DB -{ -namespace ErrorCodes -{ -extern const int READONLY; -extern const int UNKNOWN_COMPRESSION_METHOD; - -extern const int CANNOT_PARSE_TEXT; -extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; -extern const int CANNOT_PARSE_QUOTED_STRING; -extern const int CANNOT_PARSE_DATE; -extern const int CANNOT_PARSE_DATETIME; -extern const int CANNOT_PARSE_NUMBER; -extern const int CANNOT_OPEN_FILE; - -extern const int UNKNOWN_ELEMENT_IN_AST; -extern const int UNKNOWN_TYPE_OF_AST_NODE; -extern const int TOO_DEEP_AST; -extern const int TOO_BIG_AST; -extern const int UNEXPECTED_AST_STRUCTURE; - -extern const int UNKNOWN_TABLE; -extern const int UNKNOWN_FUNCTION; -extern const int UNKNOWN_IDENTIFIER; -extern const int UNKNOWN_TYPE; -extern const int UNKNOWN_STORAGE; -extern const int UNKNOWN_DATABASE; -extern const int UNKNOWN_SETTING; -extern const int UNKNOWN_DIRECTION_OF_SORTING; -extern const int UNKNOWN_AGGREGATE_FUNCTION; -extern const int UNKNOWN_FORMAT; -extern const int UNKNOWN_DATABASE_ENGINE; -extern const int UNKNOWN_TYPE_OF_QUERY; - -extern const int QUERY_IS_TOO_LARGE; - -extern const int NOT_IMPLEMENTED; -extern const int SOCKET_TIMEOUT; - -extern const int UNKNOWN_USER; -extern const int WRONG_PASSWORD; -extern const int REQUIRED_PASSWORD; - -extern const int INVALID_SESSION_TIMEOUT; -extern const int HTTP_LENGTH_REQUIRED; -} // namespace ErrorCodes - - -static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int exception_code) -{ - using namespace Poco::Net; - - if (exception_code == ErrorCodes::REQUIRED_PASSWORD) - return HTTPResponse::HTTP_UNAUTHORIZED; - else if (exception_code == ErrorCodes::CANNOT_PARSE_TEXT || exception_code == ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE || exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING || exception_code == ErrorCodes::CANNOT_PARSE_DATE || exception_code == ErrorCodes::CANNOT_PARSE_DATETIME || exception_code == ErrorCodes::CANNOT_PARSE_NUMBER) - return HTTPResponse::HTTP_BAD_REQUEST; - else if (exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST || exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE || exception_code == ErrorCodes::TOO_DEEP_AST || exception_code == ErrorCodes::TOO_BIG_AST || exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE) - return HTTPResponse::HTTP_BAD_REQUEST; - else if (exception_code == ErrorCodes::UNKNOWN_TABLE || exception_code == ErrorCodes::UNKNOWN_FUNCTION || exception_code == ErrorCodes::UNKNOWN_IDENTIFIER || exception_code == ErrorCodes::UNKNOWN_TYPE || exception_code == ErrorCodes::UNKNOWN_STORAGE || exception_code == ErrorCodes::UNKNOWN_DATABASE || exception_code == ErrorCodes::UNKNOWN_SETTING || exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING || exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION || exception_code == ErrorCodes::UNKNOWN_FORMAT || exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE) - return HTTPResponse::HTTP_NOT_FOUND; - else if (exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY) - return HTTPResponse::HTTP_NOT_FOUND; - else if (exception_code == ErrorCodes::QUERY_IS_TOO_LARGE) - return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE; - else if (exception_code == ErrorCodes::NOT_IMPLEMENTED) - return HTTPResponse::HTTP_NOT_IMPLEMENTED; - else if (exception_code == ErrorCodes::SOCKET_TIMEOUT || exception_code == ErrorCodes::CANNOT_OPEN_FILE) - return HTTPResponse::HTTP_SERVICE_UNAVAILABLE; - else if (exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED) - return HTTPResponse::HTTP_LENGTH_REQUIRED; - - return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR; -} - - -static std::chrono::steady_clock::duration parseSessionTimeout( - const Poco::Util::AbstractConfiguration & config, - const HTMLForm & params) -{ - unsigned session_timeout = config.getInt("default_session_timeout", 60); - - if (params.has("session_timeout")) - { - unsigned max_session_timeout = config.getUInt("max_session_timeout", 3600); - std::string session_timeout_str = params.get("session_timeout"); - - ReadBufferFromString buf(session_timeout_str); - if (!tryReadIntText(session_timeout, buf) || !buf.eof()) - throw Exception("Invalid session timeout: '" + session_timeout_str + "'", ErrorCodes::INVALID_SESSION_TIMEOUT); - - if (session_timeout > max_session_timeout) - throw Exception("Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout) - + ". Maximum session timeout could be modified in configuration file.", - ErrorCodes::INVALID_SESSION_TIMEOUT); - } - - return std::chrono::seconds(session_timeout); -} - - -void HTTPHandler::pushDelayedResults(Output & used_output) -{ - std::vector write_buffers; - std::vector read_buffers; - std::vector read_buffers_raw_ptr; - - auto * cascade_buffer = typeid_cast(used_output.out_maybe_delayed_and_compressed.get()); - if (!cascade_buffer) - throw Exception("Expected CascadeWriteBuffer", ErrorCodes::LOGICAL_ERROR); - - cascade_buffer->getResultBuffers(write_buffers); - - if (write_buffers.empty()) - throw Exception("At least one buffer is expected to overwrite result into HTTP response", ErrorCodes::LOGICAL_ERROR); - - for (auto & write_buf : write_buffers) - { - IReadableWriteBuffer * write_buf_concrete; - ReadBufferPtr reread_buf; - - if (write_buf - && (write_buf_concrete = dynamic_cast(write_buf.get())) - && (reread_buf = write_buf_concrete->tryGetReadBuffer())) - { - read_buffers.emplace_back(reread_buf); - read_buffers_raw_ptr.emplace_back(reread_buf.get()); - } - } - - ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); - copyData(concat_read_buffer, *used_output.out_maybe_compressed); -} - - -HTTPHandler::HTTPHandler(IServer & server_) - : server(server_) - , log(&Poco::Logger::get("HTTPHandler")) -{ - server_display_name = server.config().getString("display_name", "TiFlash"); -} - - -void HTTPHandler::processQuery( - Poco::Net::HTTPServerRequest & request, - HTMLForm & params, - Poco::Net::HTTPServerResponse & response, - Output & used_output) -{ - LOG_TRACE(log, "Request URI: {}", request.getURI()); - - std::istream & istr = request.stream(); - - /// Part of the query can be passed in the 'query' parameter and the rest in the request body - /// (http method need not necessarily be POST). In this case the entire query consists of the - /// contents of the 'query' parameter, a line break and the request body. - std::string query_param = params.get("query", ""); - if (!query_param.empty()) - query_param += '\n'; - - /// The user and password can be passed by headers (similar to X-Auth-*), - /// which is used by load balancers to pass authentication information. - std::string user = request.get("X-ClickHouse-User", ""); - std::string password = request.get("X-ClickHouse-Key", ""); - std::string quota_key = request.get("X-ClickHouse-Quota", ""); - - if (user.empty() && password.empty() && quota_key.empty()) - { - /// User name and password can be passed using query parameters - /// or using HTTP Basic auth (both methods are insecure). - if (request.hasCredentials()) - { - Poco::Net::HTTPBasicCredentials credentials(request); - - user = credentials.getUsername(); - password = credentials.getPassword(); - } - else - { - user = params.get("user", "default"); - password = params.get("password", ""); - } - - quota_key = params.get("quota_key", ""); - } - else - { - /// It is prohibited to mix different authorization schemes. - if (request.hasCredentials() - || params.has("user") - || params.has("password") - || params.has("quota_key")) - { - throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously", ErrorCodes::REQUIRED_PASSWORD); - } - } - - std::string query_id = params.get("query_id", ""); - - const auto & config = server.config(); - - Context context = server.context(); - context.setGlobalContext(server.context()); - - context.setUser(user, password, request.clientAddress(), quota_key); - context.setCurrentQueryId(query_id); - - /// The user could specify session identifier and session timeout. - /// It allows to modify settings, create temporary tables and reuse them in subsequent requests. - - std::shared_ptr session; - String session_id; - std::chrono::steady_clock::duration session_timeout; - bool session_is_set = params.has("session_id"); - - if (session_is_set) - { - session_id = params.get("session_id"); - session_timeout = parseSessionTimeout(config, params); - std::string session_check = params.get("session_check", ""); - - session = context.acquireSession(session_id, session_timeout, session_check == "1"); - - context = *session; - context.setSessionContext(*session); - } - - SCOPE_EXIT({ - if (session_is_set) - session->releaseSession(session_id, session_timeout); - }); - - /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). - String http_response_compression_methods = request.get("Accept-Encoding", ""); - bool client_supports_http_compression = false; - ZlibCompressionMethod http_response_compression_method{}; - - if (!http_response_compression_methods.empty()) - { - /// Both gzip and deflate are supported. If the client supports both, gzip is preferred. - /// NOTE parsing of the list of methods is slightly incorrect. - if (std::string::npos != http_response_compression_methods.find("gzip")) - { - client_supports_http_compression = true; - http_response_compression_method = ZlibCompressionMethod::Gzip; - } - else if (std::string::npos != http_response_compression_methods.find("deflate")) - { - client_supports_http_compression = true; - http_response_compression_method = ZlibCompressionMethod::Zlib; - } - } - - /// Client can pass a 'compress' flag in the query string. In this case the query result is - /// compressed using internal algorithm. This is not reflected in HTTP headers. - bool internal_compression = params.getParsed("compress", false); - - /// At least, we should postpone sending of first buffer_size result bytes - size_t buffer_size_total = std::max( - params.getParsed("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), - static_cast(DBMS_DEFAULT_BUFFER_SIZE)); - - /// If it is specified, the whole result will be buffered. - /// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file. - bool buffer_until_eof = params.getParsed("wait_end_of_query", false); - - size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE; - size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0; - - unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10); - - used_output.out = std::make_shared( - request, - response, - keep_alive_timeout, - client_supports_http_compression, - http_response_compression_method, - buffer_size_http); - if (internal_compression) - used_output.out_maybe_compressed = std::make_shared>(*used_output.out); - else - used_output.out_maybe_compressed = used_output.out; - - if (buffer_size_memory > 0 || buffer_until_eof) - { - CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1; - CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2; - - if (buffer_size_memory > 0) - cascade_buffer1.emplace_back(std::make_shared(buffer_size_memory)); - - if (buffer_until_eof) - { - std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/"; - - auto create_tmp_disk_buffer = [tmp_path_template](const WriteBufferPtr &) { - return WriteBufferFromTemporaryFile::create(tmp_path_template); - }; - - cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer)); - } - else - { - auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed](const WriteBufferPtr & prev_buf) { - auto * prev_memory_buffer = typeid_cast(prev_buf.get()); - if (!prev_memory_buffer) - throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR); - - auto rdbuf = prev_memory_buffer->tryGetReadBuffer(); - copyData(*rdbuf, *next_buffer); - - return next_buffer; - }; - - cascade_buffer2.emplace_back(push_memory_buffer_and_continue); - } - - used_output.out_maybe_delayed_and_compressed = std::make_shared( - std::move(cascade_buffer1), - std::move(cascade_buffer2)); - } - else - { - used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed; - } - - std::unique_ptr in_param = std::make_unique(query_param); - - std::unique_ptr in_post_raw = std::make_unique(istr); - - /// Request body can be compressed using algorithm specified in the Content-Encoding header. - std::unique_ptr in_post; - String http_request_compression_method_str = request.get("Content-Encoding", ""); - if (!http_request_compression_method_str.empty()) - { - ZlibCompressionMethod method; - if (http_request_compression_method_str == "gzip") - { - method = ZlibCompressionMethod::Gzip; - } - else if (http_request_compression_method_str == "deflate") - { - method = ZlibCompressionMethod::Zlib; - } - else - throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str, - ErrorCodes::UNKNOWN_COMPRESSION_METHOD); - in_post = std::make_unique(*in_post_raw, method); - } - else - in_post = std::move(in_post_raw); - - /// The data can also be compressed using incompatible internal algorithm. This is indicated by - /// 'decompress' query parameter. - std::unique_ptr in_post_maybe_compressed; - bool in_post_compressed = false; - if (params.getParsed("decompress", false)) - { - in_post_maybe_compressed = std::make_unique>(*in_post); - in_post_compressed = true; - } - else - in_post_maybe_compressed = std::move(in_post); - - std::unique_ptr in; - - /// Support for "external data for query processing". - if (startsWith(request.getContentType(), "multipart/form-data")) - { - in = std::move(in_param); - ExternalTablesHandler handler(context, params); - - params.load(request, istr, handler); - - /// Erase unneeded parameters to avoid confusing them later with context settings or query - /// parameters. - for (const auto & it : handler.names) - { - params.erase(it + "_format"); - params.erase(it + "_types"); - params.erase(it + "_structure"); - } - } - else - in = std::make_unique(*in_param, *in_post_maybe_compressed); - - /// Settings can be overridden in the query. - /// Some parameters (database, default_format, everything used in the code above) do not - /// belong to the Settings class. - - /// 'readonly' setting values mean: - /// readonly = 0 - any query is allowed, client can change any setting. - /// readonly = 1 - only readonly queries are allowed, client can't change settings. - /// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'. - - /// In theory if initially readonly = 0, the client can change any setting and then set readonly - /// to some other value. - auto & settings = context.getSettingsRef(); - - /// Only readonly queries are allowed for HTTP GET requests. - if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) - { - if (settings.readonly == 0) - settings.readonly = 2; - } - - auto readonly_before_query = settings.readonly; - - NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"}; - - for (const auto & param : params) - { - if (param.first == "database") - { - context.setCurrentDatabase(param.second); - } - else if (param.first == "default_format") - { - context.setDefaultFormat(param.second); - } - else if (reserved_param_names.find(param.first) != reserved_param_names.end()) - { - } - else - { - /// All other query parameters are treated as settings. - String value; - /// Setting is skipped if value wasn't changed. - if (!settings.tryGet(param.first, value) || param.second != value) - { - if (readonly_before_query == 1) - throw Exception("Cannot override setting (" + param.first + ") in readonly mode", ErrorCodes::READONLY); - - if (readonly_before_query && param.first == "readonly") - throw Exception("Setting 'readonly' cannot be overrided in readonly mode", ErrorCodes::READONLY); - - context.setSetting(param.first, param.second); - } - } - } - - /// HTTP response compression is turned on only if the client signalled that they support it - /// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on. - used_output.out->setCompression(client_supports_http_compression && settings.enable_http_compression); - if (client_supports_http_compression) - used_output.out->setCompressionLevel(settings.http_zlib_compression_level); - - used_output.out->setSendProgressInterval(settings.http_headers_progress_interval_ms); - - /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, - /// checksums of client data compressed with internal algorithm are not checked. - if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress) - static_cast &>(*in_post_maybe_compressed).disableChecksumming(); - - /// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed - /// Origin header. - used_output.out->addHeaderCORS(settings.add_http_cors_header && !request.get("Origin", "").empty()); - - ClientInfo & client_info = context.getClientInfo(); - client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; - client_info.interface = ClientInfo::Interface::HTTP; - - /// Query sent through HTTP interface is initial. - client_info.initial_user = client_info.current_user; - client_info.initial_query_id = client_info.current_query_id; - client_info.initial_address = client_info.current_address; - - ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN; - if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) - http_method = ClientInfo::HTTPMethod::GET; - else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST) - http_method = ClientInfo::HTTPMethod::POST; - - client_info.http_method = http_method; - client_info.http_user_agent = request.get("User-Agent", ""); - - /// While still no data has been sent, we will report about query execution progress by sending HTTP headers. - if (settings.send_progress_in_http_headers) - context.setProgressCallback([&used_output](const Progress & progress) { used_output.out->onProgress(progress); }); - - executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context, [&response](const String & content_type) { response.setContentType(content_type); }); - - if (used_output.hasDelayed()) - { - /// TODO: set Content-Length if possible - pushDelayedResults(used_output); - } - - /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to - /// the client. - used_output.out->finalize(); -} - -void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output) -{ - try - { - /// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body - /// to avoid reading part of the current request body in the next request. - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST - && response.getKeepAlive() - && !request.stream().eof() - && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED) - { - request.stream().ignore(std::numeric_limits::max()); - } - - bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER || exception_code == ErrorCodes::WRONG_PASSWORD || exception_code == ErrorCodes::REQUIRED_PASSWORD; - - if (auth_fail) - { - response.requireAuthentication("ClickHouse server HTTP API"); - } - else - { - response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code)); - } - - if (!response.sent() && !used_output.out_maybe_compressed) - { - /// If nothing was sent yet and we don't even know if we must compress the response. - response.send() << s << std::endl; - } - else if (used_output.out_maybe_compressed) - { - /// Destroy CascadeBuffer to actualize buffers' positions and reset extra references - if (used_output.hasDelayed()) - used_output.out_maybe_delayed_and_compressed.reset(); - - /// Send the error message into already used (and possibly compressed) stream. - /// Note that the error message will possibly be sent after some data. - /// Also HTTP code 200 could have already been sent. - - /// If buffer has data, and that data wasn't sent yet, then no need to send that data - bool data_sent = used_output.out->count() != used_output.out->offset(); - - if (!data_sent) - { - used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin(); - used_output.out->position() = used_output.out->buffer().begin(); - } - - writeString(s, *used_output.out_maybe_compressed); - writeChar('\n', *used_output.out_maybe_compressed); - - used_output.out_maybe_compressed->next(); - used_output.out->next(); - used_output.out->finalize(); - } - } - catch (...) - { - tryLogCurrentException(log, "Cannot send exception to client"); - } -} - - -void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) -{ - Output used_output; - - /// In case of exception, send stack trace to client. - bool with_stacktrace = false; - - try - { - response.setContentType("text/plain; charset=UTF-8"); - response.set("X-ClickHouse-Server-Display-Name", server_display_name); - /// For keep-alive to work. - if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) - response.setChunkedTransferEncoding(true); - - HTMLForm params(request); - with_stacktrace = params.getParsed("stacktrace", false); - - /// Workaround. Poco does not detect 411 Length Required case. - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && !request.hasContentLength()) - { - throw Exception("There is neither Transfer-Encoding header nor Content-Length header", ErrorCodes::HTTP_LENGTH_REQUIRED); - } - - processQuery(request, params, response, used_output); - LOG_INFO(log, "Done processing query"); - } - catch (...) - { - tryLogCurrentException(log); - - /** If exception is received from remote server, then stack trace is embedded in message. - * If exception is thrown on local server, then stack trace is in separate field. - */ - std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true); - int exception_code = getCurrentExceptionCode(); - - trySendExceptionToClient(exception_message, exception_code, request, response, used_output); - } -} - - -} // namespace DB diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h deleted file mode 100644 index 7a32eb16b32..00000000000 --- a/dbms/src/Server/HTTPHandler.h +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include - -#include "IServer.h" - - -namespace Poco -{ -class Logger; -} - -namespace DB -{ -class WriteBufferFromHTTPServerResponse; - - -class HTTPHandler : public Poco::Net::HTTPRequestHandler -{ -public: - explicit HTTPHandler(IServer & server_); - - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; - -private: - struct Output - { - /* Raw data - * ↓ - * CascadeWriteBuffer out_maybe_delayed_and_compressed (optional) - * ↓ (forwards data if an overflow is occur or explicitly via pushDelayedResults) - * CompressedWriteBuffer out_maybe_compressed (optional) - * ↓ - * WriteBufferFromHTTPServerResponse out - */ - - std::shared_ptr out; - /// Points to 'out' or to CompressedWriteBuffer(*out), depending on settings. - std::shared_ptr out_maybe_compressed; - /// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer. - std::shared_ptr out_maybe_delayed_and_compressed; - - inline bool hasDelayed() const - { - return out_maybe_delayed_and_compressed != out_maybe_compressed; - } - }; - - IServer & server; - Poco::Logger * log; - - /// It is the name of the server that will be sent in an http-header X-ClickHouse-Server-Display-Name. - String server_display_name; - - /// Also initializes 'used_output'. - void processQuery( - Poco::Net::HTTPServerRequest & request, - HTMLForm & params, - Poco::Net::HTTPServerResponse & response, - Output & used_output); - - void trySendExceptionToClient( - const std::string & s, - int exception_code, - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response, - Output & used_output); - - static void pushDelayedResults(Output & used_output); -}; - -} // namespace DB diff --git a/dbms/src/Server/HTTPHandlerFactory.h b/dbms/src/Server/HTTPHandlerFactory.h deleted file mode 100644 index 51366a4c065..00000000000 --- a/dbms/src/Server/HTTPHandlerFactory.h +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -#include "HTTPHandler.h" -#include "IServer.h" -#include "NotFoundHandler.h" -#include "PingRequestHandler.h" -#include "RootRequestHandler.h" - - -namespace DB -{ -template -class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory -{ -private: - IServer & server; - Poco::Logger * log; - std::string name; - -public: - HTTPRequestHandlerFactory(IServer & server_, const std::string & name_) - : server(server_) - , log(&Poco::Logger::get(name_)) - , name(name_) - { - } - - Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override - { - LOG_TRACE( - log, - "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}", - name, - request.getMethod(), - request.clientAddress().toString(), - (request.has("User-Agent") ? request.get("User-Agent") : "none")); - - const auto & uri = request.getURI(); - - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD) - { - if (uri == "/") - return new RootRequestHandler(server); - if (uri == "/ping") - return new PingRequestHandler(server); - } - - if (uri.find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) - { - return new HandlerType(server); - } - - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD - || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) - { - return new NotFoundHandler; - } - - return nullptr; - } -}; - -using HTTPHandlerFactory = HTTPRequestHandlerFactory; - -} // namespace DB diff --git a/dbms/src/Server/NotFoundHandler.cpp b/dbms/src/Server/NotFoundHandler.cpp deleted file mode 100644 index 39c4ee18eaa..00000000000 --- a/dbms/src/Server/NotFoundHandler.cpp +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "NotFoundHandler.h" - -#include - -#include - -#include -#include - -namespace DB -{ - -void NotFoundHandler::handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) -{ - try - { - response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); - response.send() << "There is no handle " << request.getURI() << "\n\n" - << "Use / or /ping for health checks.\n" - << "Or /replicas_status for more sophisticated health checks.\n\n" - << "Send queries from your program with POST method or GET /?query=...\n\n" - << "Use clickhouse-client:\n\n" - << "For interactive data analysis:\n" - << " clickhouse-client\n\n" - << "For batch query processing:\n" - << " clickhouse-client --query='SELECT 1' > result\n" - << " clickhouse-client < query > result\n"; - } - catch (...) - { - tryLogCurrentException("NotFoundHandler"); - } -} - -} diff --git a/dbms/src/Server/NotFoundHandler.h b/dbms/src/Server/NotFoundHandler.h deleted file mode 100644 index a2e30312112..00000000000 --- a/dbms/src/Server/NotFoundHandler.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - - -namespace DB -{ - -/// Response with 404 and verbose description. -class NotFoundHandler : public Poco::Net::HTTPRequestHandler -{ -public: - void handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) override; -}; - -} diff --git a/dbms/src/Server/PingRequestHandler.cpp b/dbms/src/Server/PingRequestHandler.cpp deleted file mode 100644 index 664b375dfcd..00000000000 --- a/dbms/src/Server/PingRequestHandler.cpp +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "PingRequestHandler.h" - -#include - -#include - -#include -#include - -namespace DB -{ - -void PingRequestHandler::handleRequest( - Poco::Net::HTTPServerRequest &, - Poco::Net::HTTPServerResponse & response) -{ - try - { - const auto & config = server.config(); - setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", 10)); - - const char * data = "Ok.\n"; - response.sendBuffer(data, strlen(data)); - } - catch (...) - { - tryLogCurrentException("PingRequestHandler"); - } -} - -} diff --git a/dbms/src/Server/PingRequestHandler.h b/dbms/src/Server/PingRequestHandler.h deleted file mode 100644 index 975025026c5..00000000000 --- a/dbms/src/Server/PingRequestHandler.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "IServer.h" - -#include - - -namespace DB -{ - -/// Response with "Ok.\n". Used for availability checks. -class PingRequestHandler : public Poco::Net::HTTPRequestHandler -{ -private: - IServer & server; - -public: - explicit PingRequestHandler(IServer & server_) : server(server_) - { - } - - void handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) override; -}; - -} diff --git a/dbms/src/Server/RootRequestHandler.cpp b/dbms/src/Server/RootRequestHandler.cpp deleted file mode 100644 index c75bdf58442..00000000000 --- a/dbms/src/Server/RootRequestHandler.cpp +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "RootRequestHandler.h" - -#include - -#include - -#include -#include - -namespace DB -{ - -void RootRequestHandler::handleRequest( - Poco::Net::HTTPServerRequest &, - Poco::Net::HTTPServerResponse & response) -{ - try - { - const auto & config = server.config(); - setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", 10)); - - response.setContentType("text/html; charset=UTF-8"); - - const std::string data = config.getString("http_server_default_response", "Ok.\n"); - response.sendBuffer(data.data(), data.size()); - } - catch (...) - { - tryLogCurrentException("RootRequestHandler"); - } -} - -} diff --git a/dbms/src/Server/RootRequestHandler.h b/dbms/src/Server/RootRequestHandler.h deleted file mode 100644 index 8551131eae8..00000000000 --- a/dbms/src/Server/RootRequestHandler.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "IServer.h" - -#include - - -namespace DB -{ - -/// Response with custom string. Can be used for browser. -class RootRequestHandler : public Poco::Net::HTTPRequestHandler -{ -private: - IServer & server; - -public: - explicit RootRequestHandler(IServer & server_) : server(server_) - { - } - - void handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) override; -}; - -} diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 94b5375fdea..239ec3e795b 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -57,14 +57,12 @@ #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -364,20 +362,6 @@ void printGRPCLog(gpr_log_func_args * args) } } -struct HTTPServer : Poco::Net::HTTPServer -{ - HTTPServer(Poco::Net::HTTPRequestHandlerFactory::Ptr pFactory, Poco::ThreadPool & threadPool, const Poco::Net::ServerSocket & socket, Poco::Net::HTTPServerParams::Ptr pParams) - : Poco::Net::HTTPServer(pFactory, threadPool, socket, pParams) - {} - -protected: - void run() override - { - setThreadName("HTTPServer"); - Poco::Net::HTTPServer::run(); - } -}; - struct TCPServer : Poco::Net::TCPServer { TCPServer(Poco::Net::TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool & threadPool, const Poco::Net::ServerSocket & socket, Poco::Net::TCPServerParams::Ptr pParams) @@ -602,57 +586,6 @@ class Server::TcpHttpServersHolder /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file. try { - /// HTTPS - if (config.has("https_port")) - { -#if Poco_NetSSL_FOUND - if (!security_config->hasTlsConfig()) - { - LOG_ERROR(log, "https_port is set but tls config is not set"); - } - auto [ca_path, cert_path, key_path] = security_config->getPaths(); - Poco::Net::Context::Ptr context = new Poco::Net::Context(Poco::Net::Context::TLSV1_2_SERVER_USE, - key_path, - cert_path, - ca_path, - Poco::Net::Context::VerificationMode::VERIFY_STRICT); - auto check_common_name = [&](const Poco::Crypto::X509Certificate & cert) { - return server.global_context->getSecurityConfig()->checkCommonName(cert); - }; - context->setAdhocVerification(check_common_name); - - Poco::Net::SecureServerSocket socket(context); - CertificateReloader::initSSLCallback(context, server.global_context.get()); - auto address = socket_bind_listen(socket, listen_host, config.getInt("https_port"), /* secure = */ true); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back( - new HTTPServer(new HTTPHandlerFactory(server, "HTTPSHandler-factory"), server_pool, socket, http_params)); - - LOG_INFO(log, "Listening https://{}", address.toString()); -#else - throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - } - else - { - /// HTTP - if (security_config->hasTlsConfig()) - { - throw Exception("tls config is set but https_port is not set ", ErrorCodes::INVALID_CONFIG_PARAMETER); - } - Poco::Net::ServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config.getInt("http_port", DEFAULT_HTTP_PORT)); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back( - new HTTPServer(new HTTPHandlerFactory(server, "HTTPHandler-factory"), server_pool, socket, http_params)); - - LOG_INFO(log, "Listening http://{}", address.toString()); - } - - /// TCP if (config.has("tcp_port")) { @@ -703,7 +636,7 @@ class Server::TcpHttpServersHolder LOG_INFO(log, "tcp_port_secure is closed because tls config is set"); } - /// At least one of TCP and HTTP servers must be created. + /// TCP servers must be created. if (servers.empty()) throw Exception("No 'tcp_port' and 'http_port' is specified in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG); } diff --git a/dbms/src/Server/Server.h b/dbms/src/Server/Server.h index c4567a09e81..fc8141dbd9d 100644 --- a/dbms/src/Server/Server.h +++ b/dbms/src/Server/Server.h @@ -19,9 +19,8 @@ #include #include -/** Server provides three interfaces: - * 1. HTTP - simple interface for any applications. - * 2. TCP - interface for native clickhouse-client and for server to server internal communications. +/** Server provides the following interfaces: + * 1. TCP - interface for native clickhouse-client and for server to server internal communications. * More rich and efficient, but less compatible * - data is transferred by columns; * - data is transferred compressed; diff --git a/dbms/src/Storages/System/StorageSystemProcesses.cpp b/dbms/src/Storages/System/StorageSystemProcesses.cpp index e8fccd3dd07..41f40579854 100644 --- a/dbms/src/Storages/System/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/System/StorageSystemProcesses.cpp @@ -13,12 +13,12 @@ // limitations under the License. #include +#include #include #include -#include +#include #include #include -#include namespace DB @@ -29,42 +29,39 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_) : name(name_) { setColumns(ColumnsDescription({ - { "is_initial_query", std::make_shared() }, - - { "user", std::make_shared() }, - { "query_id", std::make_shared() }, - { "address", std::make_shared() }, - { "port", std::make_shared() }, - - { "initial_user", std::make_shared() }, - { "initial_query_id", std::make_shared() }, - { "initial_address", std::make_shared() }, - { "initial_port", std::make_shared() }, - - { "interface", std::make_shared() }, - - { "os_user", std::make_shared() }, - { "client_hostname", std::make_shared() }, - { "client_name", std::make_shared() }, - { "client_version_major", std::make_shared() }, - { "client_version_minor", std::make_shared() }, - { "client_revision", std::make_shared() }, - - { "http_method", std::make_shared() }, - { "http_user_agent", std::make_shared() }, - - { "quota_key", std::make_shared() }, - - { "elapsed", std::make_shared() }, - { "is_cancelled", std::make_shared() }, - { "read_rows", std::make_shared() }, - { "read_bytes", std::make_shared() }, - { "total_rows_approx", std::make_shared() }, - { "written_rows", std::make_shared() }, - { "written_bytes", std::make_shared() }, - { "memory_usage", std::make_shared() }, - { "peak_memory_usage", std::make_shared() }, - { "query", std::make_shared() }, + {"is_initial_query", std::make_shared()}, + + {"user", std::make_shared()}, + {"query_id", std::make_shared()}, + {"address", std::make_shared()}, + {"port", std::make_shared()}, + + {"initial_user", std::make_shared()}, + {"initial_query_id", std::make_shared()}, + {"initial_address", std::make_shared()}, + {"initial_port", std::make_shared()}, + + {"interface", std::make_shared()}, + + {"os_user", std::make_shared()}, + {"client_hostname", std::make_shared()}, + {"client_name", std::make_shared()}, + {"client_version_major", std::make_shared()}, + {"client_version_minor", std::make_shared()}, + {"client_revision", std::make_shared()}, + + {"quota_key", std::make_shared()}, + + {"elapsed", std::make_shared()}, + {"is_cancelled", std::make_shared()}, + {"read_rows", std::make_shared()}, + {"read_bytes", std::make_shared()}, + {"total_rows_approx", std::make_shared()}, + {"written_rows", std::make_shared()}, + {"written_bytes", std::make_shared()}, + {"memory_usage", std::make_shared()}, + {"peak_memory_usage", std::make_shared()}, + {"query", std::make_shared()}, })); } @@ -103,8 +100,6 @@ BlockInputStreams StorageSystemProcesses::read( res_columns[i++]->insert(process.client_info.client_version_major); res_columns[i++]->insert(process.client_info.client_version_minor); res_columns[i++]->insert(UInt64(process.client_info.client_revision)); - res_columns[i++]->insert(UInt64(process.client_info.http_method)); - res_columns[i++]->insert(process.client_info.http_user_agent); res_columns[i++]->insert(process.client_info.quota_key); res_columns[i++]->insert(process.elapsed_seconds); res_columns[i++]->insert(UInt64(process.is_cancelled)); @@ -122,4 +117,4 @@ BlockInputStreams StorageSystemProcesses::read( } -} +} // namespace DB diff --git a/etc/config-template.toml b/etc/config-template.toml index 4f8ae3763b3..5da94d94e28 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -## The listening host for supporting services such as TPC/HTTP. It is recommended to configure it as "0.0.0.0", which means to listen on all IP addresses of this machine. +## The listening host for supporting TCP service. It is recommended to configure it as "0.0.0.0", which means to listen on all IP addresses of this machine. # listen_host = "0.0.0.0" ## The TiFlash TCP service port. # tcp_port = 9000 -## The TiFlash HTTP service port. -# http_port = 8123 ## The cache size limit of the metadata of a data block. Generally, you do not need to change this value. # mark_cache_size = 1073741824 ## The cache size limit of the min-max index of a data block. Generally, you do not need to change this value. diff --git a/tests/fullstack-test/system-table/read_system_table.test b/tests/fullstack-test/system-table/read_system_table.test new file mode 100644 index 00000000000..b08ac2f1960 --- /dev/null +++ b/tests/fullstack-test/system-table/read_system_table.test @@ -0,0 +1,14 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +