Skip to content

Commit

Permalink
[Enhancement](jdbc catalog) Add a property to test the connection whe…
Browse files Browse the repository at this point in the history
…n creating a Jdbc catalog (#32125)
  • Loading branch information
zy-kkk authored Mar 13, 2024
1 parent f3fdb30 commit 684766e
Show file tree
Hide file tree
Showing 18 changed files with 331 additions and 5 deletions.
60 changes: 60 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <stddef.h>
#include <stdint.h>
#include <sys/stat.h>
#include <vec/exec/vjdbc_connector.h>

#include <algorithm>
#include <exception>
Expand Down Expand Up @@ -776,6 +777,65 @@ void PInternalService::tablet_fetch_data(google::protobuf::RpcController* contro
}
}

void PInternalService::test_jdbc_connection(google::protobuf::RpcController* controller,
const PJdbcTestConnectionRequest* request,
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([request, result, done]() {
VLOG_RPC << "test jdbc connection";
brpc::ClosureGuard closure_guard(done);
TTableDescriptor table_desc;
vectorized::JdbcConnectorParam jdbc_param;
Status st = Status::OK();
{
const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
uint32_t len = request->jdbc_table().size();
st = deserialize_thrift_msg(buf, &len, false, &table_desc);
if (!st.ok()) {
LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
}
TJdbcTable jdbc_table = (table_desc.jdbcTable);
jdbc_param.catalog_id = jdbc_table.catalog_id;
jdbc_param.driver_class = jdbc_table.jdbc_driver_class;
jdbc_param.driver_path = jdbc_table.jdbc_driver_url;
jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum;
jdbc_param.jdbc_url = jdbc_table.jdbc_url;
jdbc_param.user = jdbc_table.jdbc_user;
jdbc_param.passwd = jdbc_table.jdbc_password;
jdbc_param.query_string = request->query_str();
jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type());
jdbc_param.use_transaction = false;
jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size;
jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size;
jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time;
jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time;
jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive;

std::unique_ptr<vectorized::JdbcConnector> jdbc_connector;
jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param));

st = jdbc_connector->test_connection();
st.to_protobuf(result->mutable_status());

Status clean_st = jdbc_connector->clean_datasource();
if (!clean_st.ok()) {
LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg();
}
Status close_st = jdbc_connector->close();
if (!close_st.ok()) {
LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg();
}
});

if (!ret) {
offer_failed(result, done, _heavy_work_pool);
return;
}
}

void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
const PFetchColIdsRequest* request,
PFetchColIdsResponse* response,
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ class PInternalService : public PBackendService {
PTabletKeyLookupResponse* response,
google::protobuf::Closure* done) override;

void test_jdbc_connection(google::protobuf::RpcController* controller,
const PJdbcTestConnectionRequest* request,
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) override;

private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,
Expand Down
27 changes: 26 additions & 1 deletion be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
ctor_params.__set_jdbc_password(_conn_param.passwd);
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
ctor_params.__set_driver_path(local_location);
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
if (state == nullptr) {
ctor_params.__set_batch_size(read ? 1 : 0);
} else {
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
}
ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
ctor_params.__set_table_type(_conn_param.table_type);
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
Expand Down Expand Up @@ -195,6 +199,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
return Status::OK();
}

Status JdbcConnector::test_connection() {
RETURN_IF_ERROR(open(nullptr, true));

JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));

env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_test_connection_id);
return JniUtil::GetJniExceptionMsg(env);
}

Status JdbcConnector::clean_datasource() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_clean_datasource_id);
return JniUtil::GetJniExceptionMsg(env);
}

Status JdbcConnector::query() {
if (!_is_open) {
return Status::InternalError("Query before open of JdbcConnector.");
Expand Down Expand Up @@ -380,6 +401,10 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames",
JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id));
RETURN_IF_ERROR(
register_id(_executor_clazz, "testConnection", "()V", _executor_test_connection_id));
RETURN_IF_ERROR(
register_id(_executor_clazz, "cleanDataSource", "()V", _executor_clean_datasource_id));
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class JdbcConnector : public TableConnector {

Status close(Status s = Status::OK()) override;

Status test_connection();
Status clean_datasource();

protected:
JdbcConnectorParam _conn_param;

Expand Down Expand Up @@ -155,6 +158,8 @@ class JdbcConnector : public TableConnector {
jmethodID _executor_begin_trans_id;
jmethodID _executor_finish_trans_id;
jmethodID _executor_abort_trans_id;
jmethodID _executor_test_connection_id;
jmethodID _executor_clean_datasource_id;

std::map<int, int> _map_column_idx_to_cast_idx_hll;
std::vector<DataTypePtr> _input_hll_string_types;
Expand Down
2 changes: 2 additions & 0 deletions docs/en/docs/lakehouse/multi-catalog/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ PROPERTIES ("key" = "value", ...)
| `meta_names_mapping` | No | | When the JDBC external data source has the same name but different case, e.g. DORIS and doris, Doris reports an error when querying the catalog due to ambiguity. In this case, the `meta_names_mapping` parameter needs to be specified to resolve the conflict. |
| `include_database_list` | No | | When `only_specified_database = true`,only synchronize the specified databases. Separate with `,`. Database name is case sensitive. |
| `exclude_database_list` | No | | When `only_specified_database = true`,do not synchronize the specified databases. Separate with `,`. Database name is case sensitive. |
| `test_connection` | No | "false" | Whether to test the connection when creating the catalog. If set to `true`, the connection will be tested when creating the catalog and will refuse to create the catalog if the connection fails. If set to `false`, the connection will not be tested. |


### Driver path

Expand Down
3 changes: 2 additions & 1 deletion docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ PROPERTIES ("key"="value", ...)

## 参数说明

| 参数 | 必须 | 默认值 | 说明 |
| 参数 | 必须 | 默认值 | 说明 |
|---------------------------|-----|---------|-----------------------------------------------------------------------|
| `user` || | 对应数据库的用户名 |
| `password` || | 对应数据库的密码 |
Expand All @@ -56,6 +56,7 @@ PROPERTIES ("key"="value", ...)
| `only_specified_database` || "false" | 指定是否只同步指定的 database |
| `include_database_list` || "" | 当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。 |
| `exclude_database_list` || "" | 当only_specified_database=true时,指定不需要同步的多个database,以','分割。db名称是大小写敏感的。 |
| `test_connection` || "false" | 是否在创建 Catalog 时测试连接。如果设置为 `true`,则会在创建 Catalog 时测试连接,如果连接失败,则会拒绝创建 Catalog。如果设置为 `false`,则不会测试连接。 |

### 驱动包路径

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,26 @@ protected boolean abortReadConnection(Connection connection, ResultSet resultSet
return false;
}

public void cleanDataSource() {
if (druidDataSource != null) {
druidDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
druidDataSource = null;
}
}

public void testConnection() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
if (!resultSet.next()) {
throw new UdfRuntimeException(
"Failed to test connection in BE: query executed but returned no results.");
}
} catch (SQLException e) {
throw new UdfRuntimeException("Failed to test connection in BE: ", e);
}
}

public int read() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ public boolean abortReadConnection(Connection connection, ResultSet resultSet, T
return false;
}

public void cleanDataSource() {
if (druidDataSource != null) {
druidDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
druidDataSource = null;
}
}

public void testConnection() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
if (!resultSet.next()) {
throw new UdfRuntimeException(
"Failed to test connection in BE: query executed but returned no results.");
}
} catch (SQLException e) {
throw new UdfRuntimeException("Failed to test connection in BE: ", e);
}
}

public int read() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class JdbcResource extends Resource {
public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive";
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
public static final String TEST_CONNECTION = "test_connection";

private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
USER,
Expand All @@ -122,7 +124,8 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
CONNECTION_POOL_KEEP_ALIVE
CONNECTION_POOL_KEEP_ALIVE,
TEST_CONNECTION
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
Expand All @@ -134,7 +137,8 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
CONNECTION_POOL_KEEP_ALIVE
CONNECTION_POOL_KEEP_ALIVE,
TEST_CONNECTION
).build();

// The default value of optional properties
Expand All @@ -152,6 +156,7 @@ public class JdbcResource extends Resource {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "false");
}

// timeout for both connection and read. 10 seconds is long enough.
Expand Down
Loading

0 comments on commit 684766e

Please sign in to comment.