Skip to content

Commit

Permalink
[Bug](jdbc) Fix memory leak for JDBC datasource (apache#13657)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 26, 2022
1 parent 0134e9d commit 3c95106
Show file tree
Hide file tree
Showing 20 changed files with 175 additions and 116 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,8 @@ CONF_String(be_node_role, "mix");
// Hide the be config page for webserver.
CONF_Bool(hide_webserver_config_page, "false");

CONF_String(jvm_max_heap_size, "1024M");

#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ODBCConnector::~ODBCConnector() {
}
}

Status ODBCConnector::open() {
Status ODBCConnector::open(RuntimeState* state, bool read) {
if (_is_open) {
LOG(INFO) << "this scanner already opened";
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ODBCConnector : public TableConnector {
explicit ODBCConnector(const ODBCConnectorParam& param);
~ODBCConnector() override;

Status open() override;
Status open(RuntimeState* state, bool read = false) override;
// query for ODBC table
Status query() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Status OdbcScanNode::open(RuntimeState* state) {
}

RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(_odbc_scanner->open());
RETURN_IF_ERROR(_odbc_scanner->open(state));
RETURN_IF_ERROR(_odbc_scanner->query());
// check materialize slot num

Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/table_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TableConnector {
TableConnector(const TupleDescriptor* tuple_desc, const std::string& sql_str);
virtual ~TableConnector() = default;

virtual Status open() = 0;
virtual Status open(RuntimeState* state, bool read = false) = 0;
// exec query for table
virtual Status query() = 0;

Expand All @@ -64,6 +64,8 @@ class TableConnector {

std::u16string utf8_to_u16string(const char* first, const char* last);

virtual Status close() { return Status::OK(); }

protected:
bool _is_open;
bool _is_in_transaction;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/odbc_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Status OdbcTableSink::open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
// create writer
_writer.reset(new ODBCConnector(_odbc_param));
RETURN_IF_ERROR(_writer->open());
RETURN_IF_ERROR(_writer->open(state));
if (_use_transaction) {
RETURN_IF_ERROR(_writer->begin_trans());
}
Expand Down
14 changes: 9 additions & 5 deletions be/src/util/jni-util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <jni_md.h>
#include <stdlib.h>

#include "common/config.h"
#include "gutil/once.h"
#include "gutil/strings/substitute.h"

Expand All @@ -39,13 +40,16 @@ void FindOrCreateJavaVM() {
if (rv == 0) {
JNIEnv* env;
JavaVMInitArgs vm_args;
JavaVMOption options[1];
char* str = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
options[0].optionString = str;
JavaVMOption options[2];
char* cp = getenv("DORIS_JNI_CLASSPATH_PARAMETER");
options[0].optionString = cp;
std::string heap_size = fmt::format("-Xmx{}", config::jvm_max_heap_size);
options[1].optionString = const_cast<char*>(heap_size.c_str());
vm_args.version = JNI_VERSION_1_8;
vm_args.options = options;
vm_args.nOptions = 1;
vm_args.ignoreUnrecognized = JNI_TRUE;
vm_args.nOptions = 2;
// Set it to JNI_FALSE because JNI_TRUE will let JVM ignore the max size config.
vm_args.ignoreUnrecognized = JNI_FALSE;

jint res = JNI_CreateJavaVM(&g_vm, (void**)&env, &vm_args);
if (JNI_OK != res) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/new_jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status NewJdbcScanner::open(RuntimeState* state) {
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
RETURN_IF_ERROR(_jdbc_connector->open());
RETURN_IF_ERROR(_jdbc_connector->open(state, true));
RETURN_IF_ERROR(_jdbc_connector->query());
return Status::OK();
}
Expand Down Expand Up @@ -147,6 +147,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*

Status NewJdbcScanner::close(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::close(state));
RETURN_IF_ERROR(_jdbc_connector->close());
return Status::OK();
}
} // namespace doris::vectorized
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/new_odbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status NewOdbcScanner::open(RuntimeState* state) {

RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
RETURN_IF_ERROR(_odbc_connector->open());
RETURN_IF_ERROR(_odbc_connector->open(state));
RETURN_IF_ERROR(_odbc_connector->query());
// check materialize slot num

Expand Down
151 changes: 82 additions & 69 deletions be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "vec/exec/vjdbc_connector.h"
#ifdef LIBJVM
#include "common/status.h"
#include "exec/table_connector.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
Expand All @@ -30,7 +31,7 @@ namespace doris {
namespace vectorized {
const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor";
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I";
const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
Expand All @@ -39,24 +40,50 @@ const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J";
const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";

JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
: TableConnector(param.tuple_desc, param.query_string), _conn_param(param) {}
: TableConnector(param.tuple_desc, param.query_string),
_conn_param(param),
_closed(false) {}

JdbcConnector::~JdbcConnector() {
if (!_closed) {
close();
}
}

#define GET_BASIC_JAVA_CLAZZ(JAVA_TYPE, CPP_TYPE) \
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JAVA_TYPE, &_executor_##CPP_TYPE##_clazz));

#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);

Status JdbcConnector::close() {
_closed = true;
if (!_is_open) {
return;
return Status::OK();
}
if (_is_in_transaction) {
abort_trans();
RETURN_IF_ERROR(abort_trans());
}
JNIEnv* env;
Status status;
RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
env->DeleteGlobalRef(_executor_clazz);
DELETE_BASIC_JAVA_CLAZZ_REF(object)
DELETE_BASIC_JAVA_CLAZZ_REF(uint8_t)
DELETE_BASIC_JAVA_CLAZZ_REF(int8_t)
DELETE_BASIC_JAVA_CLAZZ_REF(int16_t)
DELETE_BASIC_JAVA_CLAZZ_REF(int32_t)
DELETE_BASIC_JAVA_CLAZZ_REF(int64_t)
DELETE_BASIC_JAVA_CLAZZ_REF(float)
DELETE_BASIC_JAVA_CLAZZ_REF(double)
DELETE_BASIC_JAVA_CLAZZ_REF(string)
DELETE_BASIC_JAVA_CLAZZ_REF(list)
#undef DELETE_BASIC_JAVA_CLAZZ_REF
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id);
RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteGlobalRef(_executor_obj);
return Status::OK();
}

Status JdbcConnector::open() {
Status JdbcConnector::open(RuntimeState* state, bool read) {
if (_is_open) {
LOG(INFO) << "this scanner of jdbc already opened";
return Status::OK();
Expand All @@ -65,16 +92,19 @@ Status JdbcConnector::open() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/String", &_executor_string_clazz));

GET_BASIC_JAVA_CLAZZ("java/util/List", list)
GET_BASIC_JAVA_CLAZZ("java/lang/Object", object)
GET_BASIC_JAVA_CLAZZ("java/lang/Boolean", uint8_t)
GET_BASIC_JAVA_CLAZZ("java/lang/Byte", int8_t)
GET_BASIC_JAVA_CLAZZ("java/lang/Short", int16_t)
GET_BASIC_JAVA_CLAZZ("java/lang/Integer", int32_t)
GET_BASIC_JAVA_CLAZZ("java/lang/Long", int64_t)
GET_BASIC_JAVA_CLAZZ("java/lang/Float", float)
GET_BASIC_JAVA_CLAZZ("java/lang/Float", double)
GET_BASIC_JAVA_CLAZZ("java/lang/String", string)

#undef GET_BASIC_JAVA_CLAZZ
RETURN_IF_ERROR(_register_func_id(env));

// Add a scoped cleanup jni reference object. This cleans up local refs made below.
Expand All @@ -87,17 +117,23 @@ Status JdbcConnector::open() {
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
TJdbcExecutorCtorParams ctor_params;
ctor_params.__set_jar_location_path(local_location);
ctor_params.__set_statement(_sql_str);
ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
ctor_params.__set_jdbc_user(_conn_param.user);
ctor_params.__set_jdbc_password(_conn_param.passwd);
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);

jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
_executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);

jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
env->DeleteLocalRef(ctor_params_bytes);
}
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj));
Expand All @@ -119,10 +155,8 @@ Status JdbcConnector::query() {

JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
jstring query_sql = env->NewStringUTF(_sql_str.c_str());
jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
_executor_query_id, query_sql);
env->DeleteLocalRef(query_sql);
jint colunm_count =
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));

if (colunm_count != materialize_num) {
Expand Down Expand Up @@ -164,10 +198,14 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
for (int row = 0; row < num_rows; ++row) {
jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
_convert_column_data(env, cur_data, slot_desc, columns[column_index].get());
env->DeleteLocalRef(cur_data);
}
env->DeleteLocalRef(column_data);

materialized_column_index++;
}
// All Java objects returned by JNI functions are local references.
env->DeleteLocalRef(block_obj);
return JniUtil::GetJniExceptionMsg(env);
}

Expand All @@ -186,8 +224,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {

RETURN_IF_ERROR(register_id(_executor_clazz, "<init>", JDBC_EXECUTOR_CTOR_SIGNATURE,
_executor_ctor_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "querySQL", JDBC_EXECUTOR_QUERYSQL_SIGNATURE,
_executor_query_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "write", JDBC_EXECUTOR_WRITE_SIGNATURE,
_executor_write_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "read", "()I", _executor_read_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
_executor_close_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
Expand Down Expand Up @@ -232,48 +271,20 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
}

switch (slot_desc->type().type) {
case TYPE_BOOLEAN: {
uint8_t num = _jobject_to_uint8_t(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
(uint8_t)num);
break;
}
case TYPE_TINYINT: {
int8_t num = _jobject_to_int8_t(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
break;
#define M(TYPE, CPP_TYPE, COLUMN_TYPE) \
case TYPE: { \
CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj); \
reinterpret_cast<COLUMN_TYPE*>(col_ptr)->insert_value(num); \
break; \
}
case TYPE_SMALLINT: {
int16_t num = _jobject_to_int16_t(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
break;
}

case TYPE_INT: {
int32_t num = _jobject_to_int32_t(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
break;
}

case TYPE_BIGINT: {
int64_t num = _jobject_to_int64_t(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
break;
}

case TYPE_FLOAT: {
float num = _jobject_to_float(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
num);
break;
}
case TYPE_DOUBLE: {
double num = _jobject_to_double(env, jobj);
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
num);
break;
}

M(TYPE_BOOLEAN, uint8_t, vectorized::ColumnVector<vectorized::UInt8>)
M(TYPE_TINYINT, int8_t, vectorized::ColumnVector<vectorized::Int8>)
M(TYPE_SMALLINT, int16_t, vectorized::ColumnVector<vectorized::Int16>)
M(TYPE_INT, int32_t, vectorized::ColumnVector<vectorized::Int32>)
M(TYPE_BIGINT, int64_t, vectorized::ColumnVector<vectorized::Int64>)
M(TYPE_FLOAT, float, vectorized::ColumnVector<vectorized::Float32>)
M(TYPE_DOUBLE, double, vectorized::ColumnVector<vectorized::Float64>)
#undef M
case TYPE_STRING:
case TYPE_CHAR:
case TYPE_VARCHAR: {
Expand Down Expand Up @@ -317,21 +328,23 @@ Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
jstring query_sql = env->NewString((const jchar*)insert_stmt.c_str(), insert_stmt.size());
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_query_id, query_sql);
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_write_id, query_sql);
env->DeleteLocalRef(query_sql);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
return Status::OK();
}

std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) {
jobject jstr = env->CallObjectMethod(jobj, _to_string_id);
const jbyteArray stringJbytes =
(jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, env->NewStringUTF("UTF-8"));
auto coding = env->NewStringUTF("UTF-8");
const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding);
size_t length = (size_t)env->GetArrayLength(stringJbytes);
jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr);
std::string str = std::string((char*)pBytes, length);
env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT);
env->DeleteLocalRef(stringJbytes);
env->DeleteLocalRef(jstr);
env->DeleteLocalRef(coding);
return str;
}

Expand Down
Loading

0 comments on commit 3c95106

Please sign in to comment.