diff --git a/.gitignore b/.gitignore index bf91800594652..57422b701c114 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,7 @@ target/ # Python *.pyc +.python-version # Perf tools *.hgrm diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 56f71239a3622..942ec8ffc0944 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -461,8 +461,7 @@ def __init__(self, service_url, conf.concurrent_lookup_requests(concurrent_lookup_requests) if log_conf_file_path: conf.log_conf_file_path(log_conf_file_path) - if logger: - conf.set_logger(logger) + conf.set_logger(self._prepare_logger(logger) if logger else None) if listener_name: conf.listener_name(listener_name) if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): @@ -476,6 +475,16 @@ def __init__(self, service_url, self._client = _pulsar.Client(service_url, conf) self._consumers = [] + @staticmethod + def _prepare_logger(logger): + import logging + def log(level, message): + old_threads = logging.logThreads + logging.logThreads = False + logger.log(logging.getLevelName(level), message) + logging.logThreads = old_threads + return log + def create_producer(self, topic, producer_name=None, schema=schema.BytesSchema(), diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 582514fb923c9..5f46edc4365e6 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -19,6 +19,8 @@ # +import threading +import logging from unittest import TestCase, main import time import os @@ -1249,6 +1251,35 @@ def test_json_schema_encode(self): second_encode = schema.encode(record) self.assertEqual(first_encode, second_encode) + def test_logger_thread_leaks(self): + def _do_connect(close): + logger = logging.getLogger(str(threading.current_thread().ident)) + logger.setLevel(logging.INFO) + client = pulsar.Client( + service_url="pulsar://localhost:6650", + io_threads=4, + message_listener_threads=4, + operation_timeout_seconds=1, + log_conf_file_path=None, + authentication=None, + logger=logger, + ) + client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test") + if close: + client.close() + + for should_close in (True, False): + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close)) + _do_connect(should_close) + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close)) + threads = [] + for _ in range(10): + threads.append(threading.Thread(target=_do_connect, args=(should_close))) + threads[-1].start() + for thread in threads: + thread.join() + assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close) + def test_chunking(self): client = Client(self.serviceUrl) data_size = 10 * 1024 * 1024 diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index b4ee06e0a2411..fed9c283f6a4c 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -93,27 +93,21 @@ static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfigu return conf; } -class LoggerWrapper : public Logger { - PyObject* const _pyLogger; - const int _pythonLogLevel; +class LoggerWrapper : public Logger, public CaptivePythonObjectMixin { const std::unique_ptr _fallbackLogger; - static constexpr int _getLogLevelValue(Level level) { return 10 + (level * 10); } - public: - LoggerWrapper(PyObject* pyLogger, int pythonLogLevel, Logger* fallbackLogger) - : _pyLogger(pyLogger), _pythonLogLevel(pythonLogLevel), _fallbackLogger(fallbackLogger) { - Py_XINCREF(_pyLogger); - } + LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger) + : CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {} LoggerWrapper(const LoggerWrapper&) = delete; LoggerWrapper(LoggerWrapper&&) noexcept = delete; LoggerWrapper& operator=(const LoggerWrapper&) = delete; LoggerWrapper& operator=(LoggerWrapper&&) = delete; - virtual ~LoggerWrapper() { Py_XDECREF(_pyLogger); } - - bool isEnabled(Level level) { return _getLogLevelValue(level) >= _pythonLogLevel; } + bool isEnabled(Level level) { + return true; // Python loggers are always enabled; they decide internally whether or not to log. + } void log(Level level, int line, const std::string& message) { if (!Py_IsInitialized()) { @@ -121,66 +115,45 @@ class LoggerWrapper : public Logger { _fallbackLogger->log(level, line, message); } else { PyGILState_STATE state = PyGILState_Ensure(); - + PyObject *type, *value, *traceback; + PyErr_Fetch(&type, &value, &traceback); try { switch (level) { case Logger::LEVEL_DEBUG: - py::call_method(_pyLogger, "debug", message.c_str()); + py::call(_captive, "DEBUG", message.c_str()); break; case Logger::LEVEL_INFO: - py::call_method(_pyLogger, "info", message.c_str()); + py::call(_captive, "INFO", message.c_str()); break; case Logger::LEVEL_WARN: - py::call_method(_pyLogger, "warning", message.c_str()); + py::call(_captive, "WARNING", message.c_str()); break; case Logger::LEVEL_ERROR: - py::call_method(_pyLogger, "error", message.c_str()); + py::call(_captive, "ERROR", message.c_str()); break; } - } catch (const py::error_already_set& e) { + PyErr_Print(); _fallbackLogger->log(level, line, message); } - + PyErr_Restore(type, value, traceback); PyGILState_Release(state); } } }; -class LoggerWrapperFactory : public LoggerFactory { +class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin { std::unique_ptr _fallbackLoggerFactory{new ConsoleLoggerFactory}; - PyObject* _pyLogger; - Optional _pythonLogLevel{Optional::empty()}; - - void initializePythonLogLevel() { - PyGILState_STATE state = PyGILState_Ensure(); - - try { - int level = py::call_method(_pyLogger, "getEffectiveLevel"); - _pythonLogLevel = Optional::of(level); - } catch (const py::error_already_set& e) { - // Failed to get log level from _pyLogger, set it to empty to fallback to _fallbackLogger - _pythonLogLevel = Optional::empty(); - } - - PyGILState_Release(state); - } public: - LoggerWrapperFactory(py::object pyLogger) { - _pyLogger = pyLogger.ptr(); - Py_XINCREF(_pyLogger); - initializePythonLogLevel(); - } - - virtual ~LoggerWrapperFactory() { Py_XDECREF(_pyLogger); } + LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {} Logger* getLogger(const std::string& fileName) { const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName); - if (_pythonLogLevel.is_present()) { - return new LoggerWrapper(_pyLogger, _pythonLogLevel.value(), fallbackLogger); - } else { + if (_captive == py::object().ptr()) { return fallbackLogger; + } else { + return new LoggerWrapper(_captive, fallbackLogger); } } }; diff --git a/pulsar-client-cpp/python/src/utils.h b/pulsar-client-cpp/python/src/utils.h index d544792b75eb2..4b69ff82e6c45 100644 --- a/pulsar-client-cpp/python/src/utils.h +++ b/pulsar-client-cpp/python/src/utils.h @@ -82,3 +82,23 @@ struct CryptoKeyReaderWrapper { CryptoKeyReaderWrapper(); CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath); }; + +class CaptivePythonObjectMixin { + protected: + PyObject* _captive; + + CaptivePythonObjectMixin(PyObject* captive) { + _captive = captive; + PyGILState_STATE state = PyGILState_Ensure(); + Py_XINCREF(_captive); + PyGILState_Release(state); + } + + ~CaptivePythonObjectMixin() { + if (Py_IsInitialized()) { + PyGILState_STATE state = PyGILState_Ensure(); + Py_XDECREF(_captive); + PyGILState_Release(state); + } + } +};