-
Notifications
You must be signed in to change notification settings - Fork 7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CLICKHOUSE-3878: ODBC-Bridge tool implementation #2828
Changes from 11 commits
dd01eb6
92f3beb
fe10ccb
b31dd7b
46e9dc1
65c6a8f
97bcdce
1bedb97
bff4bbf
6d40546
04db4dd
dde09bd
cd9a016
c3588a5
f11574c
53b23e0
6fe3f0b
a78904a
76baaf9
83d5dba
af19d41
edc2dc4
942d9a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
add_library (clickhouse-odbc-bridge-lib | ||
Handlers.cpp | ||
HandlerFactory.cpp | ||
ODBCBridge.cpp | ||
) | ||
|
||
target_link_libraries (clickhouse-odbc-bridge-lib clickhouse_common_io daemon) | ||
target_include_directories (clickhouse-odbc-bridge-lib PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include) | ||
|
||
if (CLICKHOUSE_SPLIT_BINARY) | ||
add_executable (clickhouse-odbc-bridge odbc-bridge.cpp) | ||
target_link_libraries (clickhouse-odbc-bridge clickhouse-odbc-bridge-lib) | ||
endif () |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
#include "HandlerFactory.h" | ||
#include <Common/HTMLForm.h> | ||
|
||
#include <Dictionaries/validateODBCConnectionString.h> | ||
#include <Poco/Ext/SessionPoolHelpers.h> | ||
#include <Poco/Net/HTTPServerRequest.h> | ||
#include <common/logger_useful.h> | ||
|
||
namespace DB | ||
{ | ||
Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request) | ||
{ | ||
const auto & uri = request.getURI(); | ||
LOG_TRACE(log, "Request URI: " + uri); | ||
|
||
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) | ||
return new PingHandler(keep_alive_timeout); | ||
|
||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) | ||
return new ODBCHandler(pool_map, keep_alive_timeout, context); | ||
|
||
return nullptr; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
#pragma once | ||
#include <Interpreters/Context.h> | ||
#include <Poco/Logger.h> | ||
#include <Poco/Net/HTTPRequestHandler.h> | ||
#include <Poco/Net/HTTPRequestHandlerFactory.h> | ||
#include "Handlers.h" | ||
|
||
#pragma GCC diagnostic push | ||
#pragma GCC diagnostic ignored "-Wunused-parameter" | ||
#include <Poco/Data/SessionPool.h> | ||
#pragma GCC diagnostic pop | ||
|
||
|
||
namespace DB | ||
{ | ||
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory | ||
{ | ||
public: | ||
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr<Context> context_) | ||
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_) | ||
{ | ||
pool_map = std::make_shared<ODBCHandler::PoolMap>(); | ||
} | ||
|
||
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; | ||
|
||
private: | ||
Poco::Logger * log; | ||
std::string name; | ||
size_t keep_alive_timeout; | ||
std::shared_ptr<Context> context; | ||
std::shared_ptr<ODBCHandler::PoolMap> pool_map; | ||
}; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
#include "Handlers.h" | ||
#include <Common/HTMLForm.h> | ||
|
||
#include <memory> | ||
#include <DataStreams/IBlockOutputStream.h> | ||
#include <DataStreams/copyData.h> | ||
#include <DataTypes/DataTypeFactory.h> | ||
#include <Dictionaries/ODBCBlockInputStream.h> | ||
#include <Formats/BinaryRowInputStream.h> | ||
#include <Formats/FormatFactory.h> | ||
#include <IO/ReadBufferFromIStream.h> | ||
#include <IO/WriteBufferFromHTTPServerResponse.h> | ||
#include <IO/WriteHelpers.h> | ||
#include <Interpreters/Context.h> | ||
#include <boost/algorithm/string.hpp> | ||
#include <boost/tokenizer.hpp> | ||
#include <Poco/Ext/SessionPoolHelpers.h> | ||
#include <Poco/Net/HTTPServerRequest.h> | ||
#include <Poco/Net/HTTPServerResponse.h> | ||
#include <common/logger_useful.h> | ||
namespace DB | ||
{ | ||
namespace ErrorCodes | ||
{ | ||
extern const int BAD_REQUEST_PARAMETER; | ||
} | ||
|
||
namespace | ||
{ | ||
std::unique_ptr<Block> parseColumns(std::string && column_string) | ||
{ | ||
std::unique_ptr<Block> sample_block = std::make_unique<Block>(); | ||
auto names_and_types = NamesAndTypesList::parse(column_string); | ||
for (const NameAndTypePair & column_data : names_and_types) | ||
sample_block->insert({column_data.type, column_data.name}); | ||
return sample_block; | ||
} | ||
|
||
size_t parseMaxBlockSize(const std::string & max_block_size_str, Poco::Logger * log) | ||
{ | ||
size_t max_block_size = DEFAULT_BLOCK_SIZE; | ||
if (!max_block_size_str.empty()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should throw an exception if the user specified |
||
{ | ||
try | ||
{ | ||
max_block_size = std::stoul(max_block_size_str); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't work as expected:
Better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to check for all possibly wrong usages of |
||
} | ||
catch (...) | ||
{ | ||
tryLogCurrentException(log); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to catch exception. |
||
} | ||
} | ||
return max_block_size; | ||
} | ||
} | ||
|
||
|
||
ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str) | ||
{ | ||
if (!pool_map->count(connection_str)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And here. |
||
{ | ||
std::lock_guard lock(mutex); | ||
pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] { | ||
return std::make_shared<Poco::Data::SessionPool>("ODBC", connection_str); | ||
})); | ||
} | ||
return pool_map->at(connection_str); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we don't lock mutex here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
} | ||
|
||
void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) | ||
{ | ||
Poco::Net::HTMLForm params(request, request.stream()); | ||
LOG_TRACE(log, "Request URI: " + request.getURI()); | ||
|
||
auto process_error = [&response, this](const std::string & message) { | ||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); | ||
if (!response.sent()) | ||
response.send() << message << std::endl; | ||
LOG_WARNING(log, message); | ||
}; | ||
|
||
if (!params.has("query")) | ||
{ | ||
process_error("ODBCBridge: No 'query' in request body"); | ||
return; | ||
} | ||
|
||
if (!params.has("columns")) | ||
{ | ||
process_error("ODBCBridge: No 'columns' in request URL"); | ||
return; | ||
} | ||
|
||
if (!params.has("connection_string")) | ||
{ | ||
process_error("ODBCBridge: No 'connection_string' in request URL"); | ||
return; | ||
} | ||
|
||
size_t max_block_size = parseMaxBlockSize(params.get("max_block_size", ""), log); | ||
|
||
std::string columns = params.get("columns"); | ||
std::unique_ptr<Block> sample_block; | ||
try | ||
{ | ||
sample_block = parseColumns(std::move(columns)); | ||
} | ||
catch (const Exception & ex) | ||
{ | ||
process_error("ODBCBridge: Invalid 'columns' parameter in request body '" + ex.message() + "'"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception stack trace is lost. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
return; | ||
} | ||
|
||
std::string format = params.get("format", "RowBinary"); | ||
std::string query = params.get("query"); | ||
LOG_TRACE(log, "ODBCBridge: Query '" << query << "'"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You put query in single quotes, but query is unescaped - that may lead to confusion, because query often contains single quotes inside. |
||
|
||
std::string connection_string = params.get("connection_string"); | ||
LOG_TRACE(log, "ODBCBridge: Connection string '" << connection_string << "'"); | ||
|
||
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); | ||
try | ||
{ | ||
|
||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context); | ||
auto pool = getPool(connection_string); | ||
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size); | ||
copyData(inp, *writer); | ||
} | ||
catch (...) | ||
{ | ||
auto message = "ODBCBridge:\n" + getCurrentExceptionMessage(true); | ||
response.setStatusAndReason( | ||
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending | ||
writeStringBinary(message, out); | ||
LOG_WARNING(log, message); | ||
} | ||
} | ||
|
||
void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response) | ||
{ | ||
try | ||
{ | ||
setResponseDefaultHeaders(response, keep_alive_timeout); | ||
const char * data = "Ok.\n"; | ||
response.sendBuffer(data, strlen(data)); | ||
} | ||
catch (...) | ||
{ | ||
tryLogCurrentException("PingHandler"); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
#pragma once | ||
|
||
#include <Interpreters/Context.h> | ||
#include <Poco/Logger.h> | ||
#include <Poco/Net/HTTPRequestHandler.h> | ||
|
||
#pragma GCC diagnostic push | ||
#pragma GCC diagnostic ignored "-Wunused-parameter" | ||
#include <Poco/Data/SessionPool.h> | ||
#pragma GCC diagnostic pop | ||
|
||
namespace DB | ||
{ | ||
class ODBCHandler : public Poco::Net::HTTPRequestHandler | ||
{ | ||
public: | ||
using PoolPtr = std::shared_ptr<Poco::Data::SessionPool>; | ||
using PoolMap = std::unordered_map<std::string, PoolPtr>; | ||
|
||
ODBCHandler(std::shared_ptr<PoolMap> pool_map_, | ||
size_t keep_alive_timeout_, | ||
std::shared_ptr<Context> context_) | ||
: log(&Poco::Logger::get("ODBCHandler")) | ||
, pool_map(pool_map_) | ||
, keep_alive_timeout(keep_alive_timeout_) | ||
, context(context_) | ||
{ | ||
} | ||
|
||
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; | ||
|
||
private: | ||
Poco::Logger * log; | ||
|
||
std::shared_ptr<PoolMap> pool_map; | ||
size_t keep_alive_timeout; | ||
std::shared_ptr<Context> context; | ||
|
||
static inline std::mutex mutex; | ||
|
||
PoolPtr getPool(const std::string & connection_str); | ||
}; | ||
|
||
class PingHandler : public Poco::Net::HTTPRequestHandler | ||
{ | ||
public: | ||
PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {} | ||
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; | ||
|
||
private: | ||
size_t keep_alive_timeout; | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this function at all.