From 3c95106d45163fd0c9c7df769b8d711b9c2f7cdf Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 27 Oct 2022 00:02:25 +0800 Subject: [PATCH] [Bug](jdbc) Fix memory leak for JDBC datasource (#13657) --- be/src/common/config.h | 2 + be/src/exec/odbc_connector.cpp | 2 +- be/src/exec/odbc_connector.h | 2 +- be/src/exec/odbc_scan_node.cpp | 2 +- be/src/exec/table_connector.h | 4 +- be/src/runtime/odbc_table_sink.cpp | 2 +- be/src/util/jni-util.cpp | 14 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 3 +- be/src/vec/exec/scan/new_odbc_scanner.cpp | 2 +- be/src/vec/exec/vjdbc_connector.cpp | 151 ++++++++++-------- be/src/vec/exec/vjdbc_connector.h | 8 +- be/src/vec/exec/vjdbc_scan_node.cpp | 2 +- be/src/vec/exec/vodbc_scan_node.cpp | 2 +- be/src/vec/sink/vjdbc_table_sink.cpp | 3 +- be/src/vec/sink/vodbc_table_sink.cpp | 2 +- dist/LICENSE-dist.txt | 2 +- fe/java-udf/pom.xml | 5 + .../org/apache/doris/udf/JdbcExecutor.java | 70 +++++--- fe/pom.xml | 1 + gensrc/thrift/Types.thrift | 12 +- 20 files changed, 175 insertions(+), 116 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index b30cc1b00523b8..8a690c2c1df887 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -865,6 +865,8 @@ CONF_String(be_node_role, "mix"); // Hide the be config page for webserver. CONF_Bool(hide_webserver_config_page, "false"); +CONF_String(jvm_max_heap_size, "1024M"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp index 0ffedf4f9a4a6a..826d8b8bb97e38 100644 --- a/be/src/exec/odbc_connector.cpp +++ b/be/src/exec/odbc_connector.cpp @@ -70,7 +70,7 @@ ODBCConnector::~ODBCConnector() { } } -Status ODBCConnector::open() { +Status ODBCConnector::open(RuntimeState* state, bool read) { if (_is_open) { LOG(INFO) << "this scanner already opened"; return Status::OK(); diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h index 41c7280741e556..2aabcb61b6033c 100644 --- a/be/src/exec/odbc_connector.h +++ b/be/src/exec/odbc_connector.h @@ -54,7 +54,7 @@ class ODBCConnector : public TableConnector { explicit ODBCConnector(const ODBCConnectorParam& param); ~ODBCConnector() override; - Status open() override; + Status open(RuntimeState* state, bool read = false) override; // query for ODBC table Status query() override; diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp index c0cfd53bb44ae8..84194c7510d91f 100644 --- a/be/src/exec/odbc_scan_node.cpp +++ b/be/src/exec/odbc_scan_node.cpp @@ -106,7 +106,7 @@ Status OdbcScanNode::open(RuntimeState* state) { } RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(_odbc_scanner->open()); + RETURN_IF_ERROR(_odbc_scanner->open(state)); RETURN_IF_ERROR(_odbc_scanner->query()); // check materialize slot num diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h index 06bdadd3808a58..3fa9f5f5b14c5b 100644 --- a/be/src/exec/table_connector.h +++ b/be/src/exec/table_connector.h @@ -38,7 +38,7 @@ class TableConnector { TableConnector(const TupleDescriptor* tuple_desc, const std::string& sql_str); virtual ~TableConnector() = default; - virtual Status open() = 0; + virtual Status open(RuntimeState* state, bool read = false) = 0; // exec query for table virtual Status query() = 0; @@ -64,6 +64,8 @@ class TableConnector { std::u16string utf8_to_u16string(const char* first, const char* last); + virtual Status close() { return Status::OK(); } + protected: bool _is_open; bool _is_in_transaction; diff --git a/be/src/runtime/odbc_table_sink.cpp b/be/src/runtime/odbc_table_sink.cpp index 3191431138f778..a7c58d22d4d12a 100644 --- a/be/src/runtime/odbc_table_sink.cpp +++ b/be/src/runtime/odbc_table_sink.cpp @@ -65,7 +65,7 @@ Status OdbcTableSink::open(RuntimeState* state) { RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); // create writer _writer.reset(new ODBCConnector(_odbc_param)); - RETURN_IF_ERROR(_writer->open()); + RETURN_IF_ERROR(_writer->open(state)); if (_use_transaction) { RETURN_IF_ERROR(_writer->begin_trans()); } diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 384af3e4144596..e9a0673dc8695b 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -22,6 +22,7 @@ #include #include +#include "common/config.h" #include "gutil/once.h" #include "gutil/strings/substitute.h" @@ -39,13 +40,16 @@ void FindOrCreateJavaVM() { if (rv == 0) { JNIEnv* env; JavaVMInitArgs vm_args; - JavaVMOption options[1]; - char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); - options[0].optionString = str; + JavaVMOption options[2]; + char* cp = getenv("DORIS_JNI_CLASSPATH_PARAMETER"); + options[0].optionString = cp; + std::string heap_size = fmt::format("-Xmx{}", config::jvm_max_heap_size); + options[1].optionString = const_cast(heap_size.c_str()); vm_args.version = JNI_VERSION_1_8; vm_args.options = options; - vm_args.nOptions = 1; - vm_args.ignoreUnrecognized = JNI_TRUE; + vm_args.nOptions = 2; + // Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max size config. + vm_args.ignoreUnrecognized = JNI_FALSE; jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args); if (JNI_OK != res) { diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 61c9ff53c57c62..2141d6b7e8befb 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -81,7 +81,7 @@ Status NewJdbcScanner::open(RuntimeState* state) { } RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VScanner::open(state)); - RETURN_IF_ERROR(_jdbc_connector->open()); + RETURN_IF_ERROR(_jdbc_connector->open(state, true)); RETURN_IF_ERROR(_jdbc_connector->query()); return Status::OK(); } @@ -147,6 +147,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* Status NewJdbcScanner::close(RuntimeState* state) { RETURN_IF_ERROR(VScanner::close(state)); + RETURN_IF_ERROR(_jdbc_connector->close()); return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp index a69ecb1f328eb6..59c94cf8874b0b 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -88,7 +88,7 @@ Status NewOdbcScanner::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VScanner::open(state)); - RETURN_IF_ERROR(_odbc_connector->open()); + RETURN_IF_ERROR(_odbc_connector->open(state)); RETURN_IF_ERROR(_odbc_connector->query()); // check materialize slot num diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 2551fd88c2a797..5cff86bf4606f8 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -17,6 +17,7 @@ #include "vec/exec/vjdbc_connector.h" #ifdef LIBJVM +#include "common/status.h" #include "exec/table_connector.h" #include "gen_cpp/Types_types.h" #include "gutil/strings/substitute.h" @@ -30,7 +31,7 @@ namespace doris { namespace vectorized { const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor"; const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; -const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I"; +const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; @@ -39,24 +40,50 @@ const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J"; const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V"; JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) - : TableConnector(param.tuple_desc, param.query_string), _conn_param(param) {} + : TableConnector(param.tuple_desc, param.query_string), + _conn_param(param), + _closed(false) {} JdbcConnector::~JdbcConnector() { + if (!_closed) { + close(); + } +} + +#define GET_BASIC_JAVA_CLAZZ(JAVA_TYPE, CPP_TYPE) \ + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JAVA_TYPE, &_executor_##CPP_TYPE##_clazz)); + +#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz); + +Status JdbcConnector::close() { + _closed = true; if (!_is_open) { - return; + return Status::OK(); } if (_is_in_transaction) { - abort_trans(); + RETURN_IF_ERROR(abort_trans()); } JNIEnv* env; - Status status; - RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env)); + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->DeleteGlobalRef(_executor_clazz); + DELETE_BASIC_JAVA_CLAZZ_REF(object) + DELETE_BASIC_JAVA_CLAZZ_REF(uint8_t) + DELETE_BASIC_JAVA_CLAZZ_REF(int8_t) + DELETE_BASIC_JAVA_CLAZZ_REF(int16_t) + DELETE_BASIC_JAVA_CLAZZ_REF(int32_t) + DELETE_BASIC_JAVA_CLAZZ_REF(int64_t) + DELETE_BASIC_JAVA_CLAZZ_REF(float) + DELETE_BASIC_JAVA_CLAZZ_REF(double) + DELETE_BASIC_JAVA_CLAZZ_REF(string) + DELETE_BASIC_JAVA_CLAZZ_REF(list) +#undef DELETE_BASIC_JAVA_CLAZZ_REF env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id); - RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env)); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); env->DeleteGlobalRef(_executor_obj); + return Status::OK(); } -Status JdbcConnector::open() { +Status JdbcConnector::open(RuntimeState* state, bool read) { if (_is_open) { LOG(INFO) << "this scanner of jdbc already opened"; return Status::OK(); @@ -65,16 +92,19 @@ Status JdbcConnector::open() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz)); - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/String", &_executor_string_clazz)); + + GET_BASIC_JAVA_CLAZZ("java/util/List", list) + GET_BASIC_JAVA_CLAZZ("java/lang/Object", object) + GET_BASIC_JAVA_CLAZZ("java/lang/Boolean", uint8_t) + GET_BASIC_JAVA_CLAZZ("java/lang/Byte", int8_t) + GET_BASIC_JAVA_CLAZZ("java/lang/Short", int16_t) + GET_BASIC_JAVA_CLAZZ("java/lang/Integer", int32_t) + GET_BASIC_JAVA_CLAZZ("java/lang/Long", int64_t) + GET_BASIC_JAVA_CLAZZ("java/lang/Float", float) + GET_BASIC_JAVA_CLAZZ("java/lang/Float", double) + GET_BASIC_JAVA_CLAZZ("java/lang/String", string) + +#undef GET_BASIC_JAVA_CLAZZ RETURN_IF_ERROR(_register_func_id(env)); // Add a scoped cleanup jni reference object. This cleans up local refs made below. @@ -87,17 +117,23 @@ Status JdbcConnector::open() { std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path, _conn_param.driver_checksum, &local_location)); TJdbcExecutorCtorParams ctor_params; - ctor_params.__set_jar_location_path(local_location); + ctor_params.__set_statement(_sql_str); ctor_params.__set_jdbc_url(_conn_param.jdbc_url); ctor_params.__set_jdbc_user(_conn_param.user); ctor_params.__set_jdbc_password(_conn_param.passwd); ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); + ctor_params.__set_batch_size(read ? state->batch_size() : 0); + ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); jbyteArray ctor_params_bytes; // Pushed frame will be popped when jni_frame goes out-of-scope. RETURN_IF_ERROR(jni_frame.push(env)); RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes); + + jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); + env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); + env->DeleteLocalRef(ctor_params_bytes); } RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj)); @@ -119,10 +155,8 @@ Status JdbcConnector::query() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - jstring query_sql = env->NewStringUTF(_sql_str.c_str()); - jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, - _executor_query_id, query_sql); - env->DeleteLocalRef(query_sql); + jint colunm_count = + env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id); RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); if (colunm_count != materialize_num) { @@ -164,10 +198,14 @@ Status JdbcConnector::get_next(bool* eos, std::vector& columns for (int row = 0; row < num_rows; ++row) { jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row); _convert_column_data(env, cur_data, slot_desc, columns[column_index].get()); + env->DeleteLocalRef(cur_data); } + env->DeleteLocalRef(column_data); materialized_column_index++; } + // All Java objects returned by JNI functions are local references. + env->DeleteLocalRef(block_obj); return JniUtil::GetJniExceptionMsg(env); } @@ -186,8 +224,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { RETURN_IF_ERROR(register_id(_executor_clazz, "", JDBC_EXECUTOR_CTOR_SIGNATURE, _executor_ctor_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "querySQL", JDBC_EXECUTOR_QUERYSQL_SIGNATURE, - _executor_query_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE, + _executor_write_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE, _executor_close_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE, @@ -232,48 +271,20 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, } switch (slot_desc->type().type) { - case TYPE_BOOLEAN: { - uint8_t num = _jobject_to_uint8_t(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value( - (uint8_t)num); - break; - } - case TYPE_TINYINT: { - int8_t num = _jobject_to_int8_t(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value(num); - break; +#define M(TYPE, CPP_TYPE, COLUMN_TYPE) \ + case TYPE: { \ + CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj); \ + reinterpret_cast(col_ptr)->insert_value(num); \ + break; \ } - case TYPE_SMALLINT: { - int16_t num = _jobject_to_int16_t(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value(num); - break; - } - - case TYPE_INT: { - int32_t num = _jobject_to_int32_t(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value(num); - break; - } - - case TYPE_BIGINT: { - int64_t num = _jobject_to_int64_t(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value(num); - break; - } - - case TYPE_FLOAT: { - float num = _jobject_to_float(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value( - num); - break; - } - case TYPE_DOUBLE: { - double num = _jobject_to_double(env, jobj); - reinterpret_cast*>(col_ptr)->insert_value( - num); - break; - } - + M(TYPE_BOOLEAN, uint8_t, vectorized::ColumnVector) + M(TYPE_TINYINT, int8_t, vectorized::ColumnVector) + M(TYPE_SMALLINT, int16_t, vectorized::ColumnVector) + M(TYPE_INT, int32_t, vectorized::ColumnVector) + M(TYPE_BIGINT, int64_t, vectorized::ColumnVector) + M(TYPE_FLOAT, float, vectorized::ColumnVector) + M(TYPE_DOUBLE, double, vectorized::ColumnVector) +#undef M case TYPE_STRING: case TYPE_CHAR: case TYPE_VARCHAR: { @@ -317,7 +328,7 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt, JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); jstring query_sql = env->NewString((const jchar*)insert_stmt.c_str(), insert_stmt.size()); - env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_query_id, query_sql); + env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_write_id, query_sql); env->DeleteLocalRef(query_sql); RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); return Status::OK(); @@ -325,13 +336,15 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt, std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { jobject jstr = env->CallObjectMethod(jobj, _to_string_id); - const jbyteArray stringJbytes = - (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, env->NewStringUTF("UTF-8")); + auto coding = env->NewStringUTF("UTF-8"); + const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding); size_t length = (size_t)env->GetArrayLength(stringJbytes); jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr); std::string str = std::string((char*)pBytes, length); env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); env->DeleteLocalRef(stringJbytes); + env->DeleteLocalRef(jstr); + env->DeleteLocalRef(coding); return str; } diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 04aa8982da2f8b..e6f957746f1342 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -42,7 +42,7 @@ class JdbcConnector : public TableConnector { ~JdbcConnector() override; - Status open() override; + Status open(RuntimeState* state, bool read = false) override; Status query() override; @@ -56,6 +56,8 @@ class JdbcConnector : public TableConnector { Status abort_trans() override; // should be call after transaction abort Status finish_trans() override; // should be call after transaction commit + Status close() override; + private: Status _register_func_id(JNIEnv* env); Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, @@ -65,13 +67,15 @@ class JdbcConnector : public TableConnector { int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj); const JdbcConnectorParam& _conn_param; + bool _closed; jclass _executor_clazz; jclass _executor_list_clazz; jclass _executor_object_clazz; jclass _executor_string_clazz; jobject _executor_obj; jmethodID _executor_ctor_id; - jmethodID _executor_query_id; + jmethodID _executor_write_id; + jmethodID _executor_read_id; jmethodID _executor_has_next_id; jmethodID _executor_get_blocks_id; jmethodID _executor_close_id; diff --git a/be/src/vec/exec/vjdbc_scan_node.cpp b/be/src/vec/exec/vjdbc_scan_node.cpp index e857218e97e319..7d156039922835 100644 --- a/be/src/vec/exec/vjdbc_scan_node.cpp +++ b/be/src/vec/exec/vjdbc_scan_node.cpp @@ -91,7 +91,7 @@ Status VJdbcScanNode::open(RuntimeState* state) { } RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(_jdbc_connector->open()); + RETURN_IF_ERROR(_jdbc_connector->open(state, true)); RETURN_IF_ERROR(_jdbc_connector->query()); return Status::OK(); } diff --git a/be/src/vec/exec/vodbc_scan_node.cpp b/be/src/vec/exec/vodbc_scan_node.cpp index 7bdc44501f9bf1..994eedc926bb7f 100644 --- a/be/src/vec/exec/vodbc_scan_node.cpp +++ b/be/src/vec/exec/vodbc_scan_node.cpp @@ -100,7 +100,7 @@ Status VOdbcScanNode::open(RuntimeState* state) { } RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(_odbc_scanner->open()); + RETURN_IF_ERROR(_odbc_scanner->open(state)); RETURN_IF_ERROR(_odbc_scanner->query()); // check materialize slot num diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp index b4ad942a3d6688..4c428aa47b21ce 100644 --- a/be/src/vec/sink/vjdbc_table_sink.cpp +++ b/be/src/vec/sink/vjdbc_table_sink.cpp @@ -58,7 +58,7 @@ Status VJdbcTableSink::open(RuntimeState* state) { // create writer _writer.reset(new JdbcConnector(_jdbc_param)); - RETURN_IF_ERROR(_writer->open()); + RETURN_IF_ERROR(_writer->open(state, false)); if (_use_transaction) { RETURN_IF_ERROR(_writer->begin_trans()); } @@ -96,6 +96,7 @@ Status VJdbcTableSink::close(RuntimeState* state, Status exec_status) { if (exec_status.ok() && _use_transaction) { RETURN_IF_ERROR(_writer->finish_trans()); } + RETURN_IF_ERROR(_writer->close()); return DataSink::close(state, exec_status); } } // namespace vectorized diff --git a/be/src/vec/sink/vodbc_table_sink.cpp b/be/src/vec/sink/vodbc_table_sink.cpp index 695c06ac6837b3..8d6a0596b2f61f 100644 --- a/be/src/vec/sink/vodbc_table_sink.cpp +++ b/be/src/vec/sink/vodbc_table_sink.cpp @@ -48,7 +48,7 @@ Status VOdbcTableSink::open(RuntimeState* state) { // create writer _writer.reset(new ODBCConnector(_odbc_param)); - RETURN_IF_ERROR(_writer->open()); + RETURN_IF_ERROR(_writer->open(state)); if (_use_transaction) { RETURN_IF_ERROR(_writer->begin_trans()); } diff --git a/dist/LICENSE-dist.txt b/dist/LICENSE-dist.txt index 4403b5a5198493..02888b004b2483 100644 --- a/dist/LICENSE-dist.txt +++ b/dist/LICENSE-dist.txt @@ -882,7 +882,7 @@ The Apache Software License, Version 2.0 * Hibernate Validator Engine: - org.hibernate:hibernate-validator:5.1.0.Final (http://validator.hibernate.org/hibernate-validator) * HikariCP: - - com.zaxxer:HikariCP:4.0.3 (https://github.com/brettwooldridge/HikariCP) + - com.zaxxer:HikariCP:3.4.5 (https://github.com/brettwooldridge/HikariCP) * Hive Common: - org.apache.hive:hive-common:2.3.7 (https://hive.apache.org/hive-common) * Hive Llap Client: diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index dbcebd9ced9b91..3b0595442d7c7c 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -70,6 +70,11 @@ under the License. ${junit.version} test + + com.zaxxer + HikariCP + ${hikaricp.version} + java-udf diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index 4936bc5be3f27e..49da4332c74acb 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -19,17 +19,20 @@ import org.apache.doris.thrift.TJdbcExecutorCtorParams; +import org.apache.doris.thrift.TJdbcOperation; +import com.google.common.base.Preconditions; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import java.net.MalformedURLException; -import java.net.URLClassLoader; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -43,11 +46,12 @@ public class JdbcExecutor { private static final Logger LOG = Logger.getLogger(JdbcExecutor.class); private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); - private URLClassLoader classLoader = null; private Connection conn = null; private Statement stmt = null; private ResultSet resultSet = null; private ResultSetMetaData resultSetMetaData = null; + // Use HikariDataSource to help us manage the JDBC connections. + private HikariDataSource dataSource = null; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -57,8 +61,8 @@ public JdbcExecutor(byte[] thriftParams) throws Exception { } catch (TException e) { throw new InternalException(e.getMessage()); } - init(request.jar_location_path, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user, - request.jdbc_password); + init(request.statement, request.batch_size, request.jdbc_driver_class, request.jdbc_url, request.jdbc_user, + request.jdbc_password, request.op); } public void close() throws Exception { @@ -71,12 +75,26 @@ public void close() throws Exception { if (conn != null) { conn.close(); } - if (classLoader != null) { - classLoader.close(); + if (dataSource != null) { + dataSource.close(); } + resultSet = null; + stmt = null; + conn = null; + dataSource = null; } - public int querySQL(String sql) throws UdfRuntimeException { + public int read() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + resultSetMetaData = resultSet.getMetaData(); + return resultSetMetaData.getColumnCount(); + } catch (SQLException e) { + throw new UdfRuntimeException("JDBC executor sql has error: ", e); + } + } + + public int write(String sql) throws UdfRuntimeException { try { boolean res = stmt.execute(sql); if (res) { // sql query @@ -175,28 +193,28 @@ public long convertDateTimeToLong(Object obj) { return time; } - private void init(String driverPath, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword) - throws UdfRuntimeException { + private void init(String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, + String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException { try { - ClassLoader loader; - if (driverPath != null) { - ClassLoader parent = getClass().getClassLoader(); - classLoader = UdfUtils.getClassLoader(driverPath, parent); - loader = classLoader; + HikariConfig config = new HikariConfig(); + config.setDriverClassName(driverClass); + config.setJdbcUrl(jdbcUrl); + config.setUsername(jdbcUser); + config.setPassword(jdbcPassword); + config.setMaximumPoolSize(1); + + dataSource = new HikariDataSource(config); + conn = dataSource.getConnection(); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + if (op == TJdbcOperation.READ) { + Preconditions.checkArgument(sql != null); + stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + stmt.setFetchSize(batchSize); } else { - loader = ClassLoader.getSystemClassLoader(); + stmt = conn.createStatement(); } - Class.forName(driverClass, true, loader); - conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); - stmt = conn.createStatement(); - } catch (MalformedURLException e) { - throw new UdfRuntimeException("MalformedURLException to load class about " + driverPath, e); - } catch (ClassNotFoundException e) { - throw new UdfRuntimeException("Loading JDBC class error ClassNotFoundException about " + driverClass, e); } catch (SQLException e) { - throw new UdfRuntimeException("Connection JDBC class error about " + jdbcUrl, e); - } catch (Exception e) { - throw new UdfRuntimeException("unable to init jdbc executor Exception ", e); + throw new UdfRuntimeException("Initialize datasource failed: ", e); } } } diff --git a/fe/pom.xml b/fe/pom.xml index 4c228dd007c758..9f9888b680b5b4 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -188,6 +188,7 @@ under the License. 2.6 1.1.1 5.8.2 + 3.4.5 0.13.0 2.18.0 4.0.2 diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index d4a4851d9b8c20..5104045f237744 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -357,9 +357,13 @@ struct TFunction { 13: optional bool vectorized = false } +enum TJdbcOperation { + READ, + WRITE +} + struct TJdbcExecutorCtorParams { - // Local path to the UDF's jar file - 1: optional string jar_location_path + 1: optional string statement // "jdbc:mysql://127.0.0.1:3307/test"; 2: optional string jdbc_url @@ -372,6 +376,10 @@ struct TJdbcExecutorCtorParams { //"com.mysql.jdbc.Driver" 5: optional string jdbc_driver_class + + 6: optional i32 batch_size + + 7: optional TJdbcOperation op } struct TJavaUdfExecutorCtorParams {