From 1e744c62b3560258a11ccf59e339f2f742cc0d5a Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Tue, 26 Dec 2023 01:43:53 +0200 Subject: [PATCH 01/12] Add ECS support --- logstash_async/formatter.py | 260 +++++++++++++++++++------ setup.py | 3 + tests/formatter_test.py | 365 +++++++++++++++++++++++++++++++++++- 3 files changed, 569 insertions(+), 59 deletions(-) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 5c46b82..f06ef26 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -21,10 +21,40 @@ import simplejson as json +class _LogstashMessageSchema: + TIMESTAMP = '@timestamp' + VERSION = '@version' + METADATA = '@metadata' + HOST = 'host' + LOG_LEVEL = 'level' + LOG_SOURCE = 'logsource' + LOGGER_NAME = 'logger_name' + LINE = 'line' + MESSAGE = 'message' + MESSAGE_TYPE = 'type' + FUNC_NAME = 'func_name' + THREAD_NAME = 'thread_name' + PROCESS_NAME = 'process_name' + INTERPRETER = 'interpreter' + INTERPRETER_VERSION = 'interpreter_version' + PATH = 'path' + PID = 'pid' + PROGRAM = 'program' + STACK_TRACE = 'stack_trace' + ERROR_TYPE = 'error_type' + TAGS = 'tags' + LOGSTASH_ASYNC_VERSION = 'logstash_async_version' + + class LogstashFormatter(logging.Formatter): _basic_data_types = (type(None), bool, str, int, float) + formatter_record_field_skip_list = constants.FORMATTER_RECORD_FIELD_SKIP_LIST + formatter_logstash_message_field_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST + + MessageSchema = _LogstashMessageSchema + # ---------------------------------------------------------------------- # pylint: disable=too-many-arguments def __init__( @@ -90,21 +120,22 @@ def _prefetch_program_name(self): # ---------------------------------------------------------------------- def format(self, record): + Schema = self.MessageSchema message = { - '@timestamp': self._format_timestamp(record.created), - '@version': '1', - 'host': self._host, - 'level': record.levelname, - 'logsource': self._logsource, - 'message': record.getMessage(), - 'pid': record.process, - 'program': self._program_name, - 'type': self._message_type, + Schema.TIMESTAMP: self._format_timestamp(record.created), + Schema.VERSION: '1', + Schema.HOST: self._host, + Schema.LOG_LEVEL: record.levelname, + Schema.LOG_SOURCE: self._logsource, + Schema.MESSAGE: record.getMessage(), + Schema.PID: record.process, + Schema.PROGRAM: self._program_name, + Schema.MESSAGE_TYPE: self._message_type, } if self._metadata: - message['@metadata'] = self._metadata + message[Schema.METADATA] = self._metadata if self._tags: - message['tags'] = self._tags + message[Schema.TAGS] = self._tags # record fields record_fields = self._get_record_fields(record) @@ -113,14 +144,11 @@ def format(self, record): extra_fields = self._get_extra_fields(record) # remove all fields to be excluded self._remove_excluded_fields(message, extra_fields) - # wrap extra fields in configurable namespace - if self._extra_prefix: - message[self._extra_prefix] = extra_fields - else: - message.update(extra_fields) + message.update(extra_fields) # move existing extra record fields into the configured prefix self._move_extra_record_fields_to_prefix(message) + self._post_process_message(message) return self._serialize(message) @@ -152,23 +180,25 @@ def _value_repr(self, value): # ---------------------------------------------------------------------- def _get_extra_fields(self, record): + Schema = self.MessageSchema extra_fields = { - 'func_name': record.funcName, - 'interpreter': self._interpreter, - 'interpreter_version': self._interpreter_version, - 'line': record.lineno, - 'logger_name': record.name, - 'logstash_async_version': logstash_async.__version__, - 'path': record.pathname, - 'process_name': record.processName, - 'thread_name': record.threadName, + Schema.FUNC_NAME: record.funcName, + Schema.INTERPRETER: self._interpreter, + Schema.INTERPRETER_VERSION: self._interpreter_version, + Schema.LINE: record.lineno, + Schema.LOGGER_NAME: record.name, + Schema.LOGSTASH_ASYNC_VERSION: logstash_async.__version__, + Schema.PATH: record.pathname, + Schema.PROCESS_NAME: record.processName, + Schema.THREAD_NAME: record.threadName, } # static extra fields if self._extra: extra_fields.update(self._extra) # exceptions if record.exc_info: - extra_fields['stack_trace'] = self._format_exception(record.exc_info) + extra_fields[Schema.ERROR_TYPE] = record.exc_info[0].__name__ + extra_fields[Schema.STACK_TRACE] = self._format_exception(record.exc_info) return extra_fields # ---------------------------------------------------------------------- @@ -185,7 +215,7 @@ def _format_exception(self, exc_info): def _remove_excluded_fields(self, message, extra_fields): for fields in (message, extra_fields): for field_name in list(fields): - if field_name in constants.FORMATTER_RECORD_FIELD_SKIP_LIST: + if field_name in self.formatter_record_field_skip_list: del fields[field_name] # ---------------------------------------------------------------------- @@ -199,17 +229,68 @@ def _move_extra_record_fields_to_prefix(self, message): if not self._extra_prefix: return # early out if no prefix is configured - field_skip_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST + [self._extra_prefix] + message.setdefault(self._extra_prefix, {}) + field_skip_list = self.formatter_logstash_message_field_list + [self._extra_prefix] for key in list(message): if key not in field_skip_list: message[self._extra_prefix][key] = message.pop(key) + # ---------------------------------------------------------------------- + def _post_process_message(self, message): + """Override when needed""" + # ---------------------------------------------------------------------- def _serialize(self, message): return json.dumps(message, ensure_ascii=self._ensure_ascii) +class LogstashEcsFormatter(LogstashFormatter): + ecs_version = '8.11.0' + __schema_dict = { + 'ECS_VERSION': 'ecs.version', + 'MESSAGE_TYPE': 'event.module', + 'HOST': 'host.hostname', + 'LOG_LEVEL': 'log.level', + 'LOGGER_NAME': 'log.logger', + 'LOG_SOURCE': 'log.syslog.hostname', + 'LINE': 'log.origin.file.line', + 'PATH': 'log.origin.file.name', + 'FUNC_NAME': 'log.origin.function', + 'STACK_TRACE': 'error.stack_trace', + 'ERROR_TYPE': 'error.type', + 'PROGRAM': 'process.executable', + 'PROCESS_NAME': 'process.name', + 'PID': 'process.pid', + 'THREAD_NAME': 'process.thread.name', + } + + formatter_logstash_message_field_list = (LogstashFormatter.formatter_logstash_message_field_list + + list(__schema_dict.values())) + MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) + + def _post_process_message(self, message): + super()._post_process_message(message) + Schema = self.MessageSchema + message[Schema.ECS_VERSION] = self.ecs_version + + class DjangoLogstashFormatter(LogstashFormatter): + class MessageSchema(LogstashFormatter.MessageSchema): + DJANGO_VERSION = 'django_version' + RESP_STATUS_CODE = 'status_code' + REQ_USER_AGENT = 'req_useragent' + REQ_REMOTE_ADDRESS = 'req_remote_address' + REQ_HOST = 'req_host' + REQ_URI = 'req_uri' + REQ_USER = 'req_user' + REQ_METHOD = 'req_method' + REQ_REFERER = 'req_referer' + REQ_FORWARDED_PROTO = 'req_forwarded_proto' + REQ_FORWARDED_FOR = 'req_forwarded_for' + TMPL_NAME = 'tmpl_name' + TMPL_LINE = 'tmpl_line' + TMPL_MESSAGE = 'tmpl_message' + TMPL_DURING = 'tmpl_during' # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): @@ -225,9 +306,10 @@ def _fetch_django_version(self): # ---------------------------------------------------------------------- def _get_extra_fields(self, record): extra_fields = super()._get_extra_fields(record) + Schema = self.MessageSchema if hasattr(record, 'status_code'): - extra_fields['status_code'] = record.status_code + extra_fields[Schema.RESP_STATUS_CODE] = record.status_code # Django's runserver command passes socketobject and WSGIRequest instances as "request". # Hence the check for the META attribute. @@ -236,34 +318,34 @@ def _get_extra_fields(self, record): request = record.request request_user = self._get_attribute_with_default(request, 'user', '') - extra_fields['django_version'] = self._django_version - extra_fields['req_useragent'] = request.META.get('HTTP_USER_AGENT', '') - extra_fields['req_remote_address'] = request.META.get('REMOTE_ADDR', '') - extra_fields['req_host'] = self._try_to_get_host_from_remote(request) - extra_fields['req_uri'] = self._try_to_get_full_request_uri(request) - extra_fields['req_user'] = str(request_user) - extra_fields['req_method'] = request.META.get('REQUEST_METHOD', '') - extra_fields['req_referer'] = request.META.get('HTTP_REFERER', '') + extra_fields[Schema.DJANGO_VERSION] = self._django_version + extra_fields[Schema.REQ_USER_AGENT] = request.META.get('HTTP_USER_AGENT', '') + extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.META.get('REMOTE_ADDR', '') + extra_fields[Schema.REQ_HOST] = self._try_to_get_host_from_remote(request) + extra_fields[Schema.REQ_URI] = self._try_to_get_full_request_uri(request) + extra_fields[Schema.REQ_USER] = str(request_user) + extra_fields[Schema.REQ_METHOD] = request.META.get('REQUEST_METHOD', '') + extra_fields[Schema.REQ_REFERER] = request.META.get('HTTP_REFERER', '') forwarded_proto = request.META.get('HTTP_X_FORWARDED_PROTO', None) if forwarded_proto is not None: - extra_fields['req_forwarded_proto'] = forwarded_proto + extra_fields[Schema.REQ_FORWARDED_PROTO] = forwarded_proto forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None) if forwarded_for is not None: # make it a list forwarded_for_list = forwarded_for.replace(' ', '').split(',') - extra_fields['req_forwarded_for'] = forwarded_for_list + extra_fields[Schema.REQ_FORWARDED_FOR] = forwarded_for_list # template debug if isinstance(record.exc_info, tuple): exc_value = record.exc_info[1] template_info = getattr(exc_value, 'template_debug', None) if template_info: - extra_fields['tmpl_name'] = template_info['name'] - extra_fields['tmpl_line'] = template_info['line'] - extra_fields['tmpl_message'] = template_info['message'] - extra_fields['tmpl_during'] = template_info['during'] + extra_fields[Schema.TMPL_NAME] = template_info['name'] + extra_fields[Schema.TMPL_LINE] = template_info['line'] + extra_fields[Schema.TMPL_MESSAGE] = template_info['message'] + extra_fields[Schema.TMPL_DURING] = template_info['during'] return extra_fields @@ -299,12 +381,49 @@ def _try_to_get_full_request_uri(self, request): return None +class DjangoLogstashEcsFormatter(DjangoLogstashFormatter, LogstashEcsFormatter): + __schema_dict = { + 'RESP_STATUS_CODE': 'http.response.status_code', + 'REQ_USER_AGENT': 'user_agent.original', + 'REQ_REMOTE_ADDRESS': 'client.ip', + 'REQ_HOST': 'client.domain', + 'REQ_URI': 'url.original', + 'REQ_USER': 'user.name', + 'REQ_METHOD': 'http.request.method', + 'REQ_REFERER': 'http.request.referrer', + } + + formatter_logstash_message_field_list = (LogstashEcsFormatter.formatter_logstash_message_field_list + + list(__schema_dict.values())) + MessageSchema = type( + 'MessageSchema', + (DjangoLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), + __schema_dict, + ) + + def _remove_excluded_fields(self, message, extra_fields): + message.pop('status_code', None) + super()._remove_excluded_fields(message, extra_fields) + + class FlaskLogstashFormatter(LogstashFormatter): + class MessageSchema(LogstashFormatter.MessageSchema): + FLASK_VERSION = 'flask_version' + RESP_STATUS_CODE = 'status_code' + REQ_USER_AGENT = 'req_useragent' + REQ_REMOTE_ADDRESS = 'req_remote_address' + REQ_HOST = 'req_host' + REQ_URI = 'req_uri' + REQ_USER = 'req_user' + REQ_METHOD = 'req_method' + REQ_REFERER = 'req_referer' + REQ_ID = 'request_id' + REQ_FORWARDED_PROTO = 'req_forwarded_proto' + REQ_FORWARDED_FOR = 'req_forwarded_for' # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._django_version = None self._fetch_flask_version() # ---------------------------------------------------------------------- @@ -317,34 +436,61 @@ def _get_extra_fields(self, record): from flask import request # pylint: disable=import-error,import-outside-toplevel extra_fields = super()._get_extra_fields(record) + Schema = self.MessageSchema - extra_fields['flask_version'] = self._flask_version + extra_fields[Schema.FLASK_VERSION] = self._flask_version if request: # request might be unbound in other threads - extra_fields['req_useragent'] = str(request.user_agent) if request.user_agent else '' - extra_fields['req_remote_address'] = request.remote_addr - extra_fields['req_host'] = request.host.split(':', 1)[0] - extra_fields['req_uri'] = request.url - extra_fields['req_method'] = request.method - extra_fields['req_referer'] = request.referrer + extra_fields[Schema.REQ_USER_AGENT] = str(request.user_agent) if request.user_agent else '' + extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.remote_addr + extra_fields[Schema.REQ_HOST] = request.host.split(':', 1)[0] + extra_fields[Schema.REQ_URI] = request.url + extra_fields[Schema.REQ_METHOD] = request.method + extra_fields[Schema.REQ_REFERER] = request.referrer if 'X-Request-ID' in request.headers: - extra_fields['request_id'] = request.headers.get('X-Request-ID') + extra_fields[Schema.REQ_ID] = request.headers.get('X-Request-ID') if request.remote_user: - extra_fields['req_user'] = request.remote_user + extra_fields[Schema.REQ_USER] = request.remote_user forwarded_proto = request.headers.get('X-Forwarded-Proto', None) if forwarded_proto is not None: - extra_fields['req_forwarded_proto'] = forwarded_proto + extra_fields[Schema.REQ_FORWARDED_PROTO] = forwarded_proto forwarded_for = request.headers.get('X-Forwarded-For', None) if forwarded_for is not None: # make it a list forwarded_for_list = forwarded_for.replace(' ', '').split(',') - extra_fields['req_forwarded_for'] = forwarded_for_list + extra_fields[Schema.REQ_FORWARDED_FOR] = forwarded_for_list # check if we have a status code somewhere if hasattr(record, 'status_code'): - extra_fields['status_code'] = record.status_code + extra_fields[Schema.RESP_STATUS_CODE] = record.status_code if hasattr(record, 'response'): - extra_fields['status_code'] = record.response.status_code + extra_fields[Schema.RESP_STATUS_CODE] = record.response.status_code return extra_fields + + +class FlaskLogstashEcsFormatter(FlaskLogstashFormatter, LogstashEcsFormatter): + __schema_dict = { + 'RESP_STATUS_CODE': 'http.response.status_code', + 'REQ_USER_AGENT': 'user_agent.original', + 'REQ_REMOTE_ADDRESS': 'client.ip', + 'REQ_HOST': 'client.domain', + 'REQ_URI': 'url.original', + 'REQ_USER': 'user.name', + 'REQ_METHOD': 'http.request.method', + 'REQ_REFERER': 'http.request.referrer', + 'REQ_ID': 'http.request.id', + } + + formatter_logstash_message_field_list = (LogstashEcsFormatter.formatter_logstash_message_field_list + + list(__schema_dict.values())) + MessageSchema = type( + 'MessageSchema', + (FlaskLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), + __schema_dict, + ) + + def _remove_excluded_fields(self, message, extra_fields): + message.pop('status_code', None) + super()._remove_excluded_fields(message, extra_fields) diff --git a/setup.py b/setup.py index 0e016ed..def2622 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,9 @@ }, keywords='logging logstash asynchronous', install_requires=['limits', 'pylogbeat', 'requests'], + extras_require={ + 'dev': ['django', 'flask'], + }, python_requires='>3.5', include_package_data=True, classifiers=[ diff --git a/tests/formatter_test.py b/tests/formatter_test.py index 0cba802..bafb0db 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -3,16 +3,40 @@ # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. +import socket +from contextlib import suppress from logging import FileHandler, makeLogRecord import os import sys import unittest +from types import SimpleNamespace +from unittest.mock import patch -from logstash_async.formatter import LogstashFormatter - +import logstash_async +from logstash_async.formatter import LogstashFormatter, DjangoLogstashFormatter, FlaskLogstashFormatter, \ + LogstashEcsFormatter, DjangoLogstashEcsFormatter, FlaskLogstashEcsFormatter # pylint: disable=protected-access +_interpreter_version = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' + + +def create_log_record(**kwargs): + return makeLogRecord({ + 'msg': 'test', + 'created': 1635082335.024747, + 'levelname': 'INFO', + 'process': 1, + 'funcName': 'f', + 'lineno': 2, + 'name': 'foo', + 'pathname': 'a/b/c', + 'processName': 'bar', + 'threadName': 'baz', + 'exc_info': (ValueError, None, None), + **kwargs, + }) + class ExceptionCatchingFileHandler(FileHandler): def __init__(self, *args, **kwargs): @@ -60,6 +84,343 @@ def test_format_timestamp_microsecond_2(self): result = formatter._format_timestamp(test_time_microsecond2) self.assertEqual(result, '2021-10-24T13:32:15.024Z') + @patch.object(LogstashFormatter, '_serialize', lambda s, m: m) + @patch.object(LogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = LogstashFormatter(tags=['t1', 't2']) + result = formatter.format(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'taskName': None, + 'stack_trace': (ValueError, None, None), + 'error_type': 'ValueError', + } + }) + + +class LogstashEcsFormatterTest(unittest.TestCase): + @patch.object(LogstashEcsFormatter, '_serialize', lambda s, m: m) + @patch.object(LogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = LogstashEcsFormatter(tags=['t1', 't2']) + result = formatter.format(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs.version': '8.11.0', + 'event.module': 'python-logstash', + 'host.hostname': socket.gethostname(), + 'log.level': 'INFO', + 'log.syslog.hostname': socket.gethostname(), + 'log.origin.file.line': 2, + 'log.origin.file.name': 'a/b/c', + 'log.origin.function': 'f', + 'log.logger': 'foo', + 'message': 'test', + 'process.thread.name': 'baz', + 'process.name': 'bar', + 'process.pid': 1, + 'process.executable': sys.argv[0], + 'error.stack_trace': (ValueError, None, None), + 'error.type': 'ValueError', + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + 'taskName': None, + } + }) + + +class DjangoTestMixin: + @classmethod + def setUpClass(cls): + super().setUpClass() + + import django + from django.conf import settings + from django.http import HttpRequest + + with suppress(RuntimeError): + settings.configure() + cls.HttpRequest = HttpRequest + cls.django_version = django.get_version() + + def _create_request(self): + request = self.HttpRequest() + request.user = 'usr' + request.META.update({ + 'HTTP_USER_AGENT': 'dj-agent', + 'REMOTE_ADDR': 'dj-addr', + 'HTTP_HOST': 'dj-host', + 'HTTP_REFERER': 'dj-ref', + 'REQUEST_METHOD': 'GET', + 'HTTP_X_FORWARDED_PROTO': 'dj-f-proto', + 'HTTP_X_FORWARDED_FOR': 'dj-f1, dj-f2', + }) + return request + + +class DjangoLogstashFormatterTest(DjangoTestMixin, unittest.TestCase): + @patch.object(DjangoLogstashFormatter, '_serialize', lambda s, m: m) + @patch.object(DjangoLogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = DjangoLogstashFormatter(tags=['t1', 't2']) + exc_info = (ValueError, SimpleNamespace(template_debug={ + 'name': 'tpl', + 'line': 3, + 'message': 'tmsg', + 'during': 'd', + }), None) + result = formatter.format(create_log_record( + status_code=500, + request=self._create_request(), + exc_info=exc_info, + )) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'taskName': None, + 'stack_trace': exc_info, + 'error_type': 'ValueError', + 'status_code': 500, + 'django_version': self.django_version, + 'req_useragent': 'dj-agent', + 'req_remote_address': 'dj-addr', + 'req_host': 'dj-host', + 'req_uri': None, + 'req_user': 'usr', + 'req_method': 'GET', + 'req_referer': 'dj-ref', + 'req_forwarded_proto': 'dj-f-proto', + 'req_forwarded_for': ['dj-f1', 'dj-f2'], + 'tmpl_name': 'tpl', + 'tmpl_line': 3, + 'tmpl_message': 'tmsg', + 'tmpl_during': 'd', + 'request': '', + } + }) + + +class DjangoLogstashEcsFormatterTest(DjangoTestMixin, unittest.TestCase): + @patch.object(DjangoLogstashEcsFormatter, '_serialize', lambda s, m: m) + @patch.object(DjangoLogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = DjangoLogstashEcsFormatter(tags=['t1', 't2']) + exc_info = (ValueError, SimpleNamespace(template_debug={ + 'name': 'tpl', + 'line': 3, + 'message': 'tmsg', + 'during': 'd', + }), None) + result = formatter.format(create_log_record( + status_code=500, + request=self._create_request(), + exc_info=exc_info, + )) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs.version': '8.11.0', + 'event.module': 'python-logstash', + 'host.hostname': socket.gethostname(), + 'client.domain': 'dj-host', + 'client.ip': 'dj-addr', + 'http.request.method': 'GET', + 'http.request.referrer': 'dj-ref', + 'http.response.status_code': 500, + 'url.original': None, + 'user.name': 'usr', + 'user_agent.original': 'dj-agent', + 'log.level': 'INFO', + 'log.syslog.hostname': socket.gethostname(), + 'log.origin.file.line': 2, + 'log.origin.file.name': 'a/b/c', + 'log.origin.function': 'f', + 'log.logger': 'foo', + 'message': 'test', + 'process.thread.name': 'baz', + 'process.name': 'bar', + 'process.pid': 1, + 'process.executable': sys.argv[0], + 'error.stack_trace': exc_info, + 'error.type': 'ValueError', + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + 'taskName': None, + 'req_forwarded_proto': 'dj-f-proto', + 'req_forwarded_for': ['dj-f1', 'dj-f2'], + 'tmpl_name': 'tpl', + 'tmpl_line': 3, + 'tmpl_message': 'tmsg', + 'tmpl_during': 'd', + 'request': '', + 'django_version': self.django_version, + } + }) + + +class FlaskTestMixin: + @classmethod + def setUpClass(cls): + super().setUpClass() + + from flask import __version__ + cls.flask_version = __version__ + + def _create_request(self): + return SimpleNamespace( + user_agent='f-agent', + remote_addr='f-addr', + host='f-host:80', + url='f-url', + method='GET', + referrer='f-ref', + remote_user='usr', + headers={ + 'X-Request-ID': 'x-id', + 'X-Forwarded-Proto': 'f-proto', + 'X-Forwarded-For': 'f1, f2', + }, + ) + + +class FlaskLogstashFormatterTest(FlaskTestMixin, unittest.TestCase): + @patch.object(FlaskLogstashFormatter, '_serialize', lambda s, m: m) + @patch.object(FlaskLogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + self.enterContext(patch('flask.request', self._create_request())) + formatter = FlaskLogstashFormatter(tags=['t1', 't2']) + result = formatter.format(create_log_record(status_code=500)) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'taskName': None, + 'error_type': 'ValueError', + 'stack_trace': (ValueError, None, None), + 'status_code': 500, + 'flask_version': self.flask_version, + 'req_useragent': 'f-agent', + 'req_remote_address': 'f-addr', + 'req_host': 'f-host', + 'req_uri': 'f-url', + 'req_user': 'usr', + 'req_method': 'GET', + 'req_referer': 'f-ref', + 'req_forwarded_proto': 'f-proto', + 'req_forwarded_for': ['f1', 'f2'], + 'request_id': 'x-id', + } + }) + + +class FlaskLogstashEcsFormatterTest(FlaskTestMixin, unittest.TestCase): + @patch.object(FlaskLogstashEcsFormatter, '_serialize', lambda s, m: m) + @patch.object(FlaskLogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + self.enterContext(patch('flask.request', self._create_request())) + formatter = FlaskLogstashEcsFormatter(tags=['t1', 't2']) + result = formatter.format(create_log_record(status_code=500)) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs.version': '8.11.0', + 'event.module': 'python-logstash', + 'host.hostname': socket.gethostname(), + 'client.domain': 'f-host', + 'client.ip': 'f-addr', + 'http.request.id': 'x-id', + 'http.request.method': 'GET', + 'http.request.referrer': 'f-ref', + 'http.response.status_code': 500, + 'url.original': 'f-url', + 'user.name': 'usr', + 'user_agent.original': 'f-agent', + 'log.level': 'INFO', + 'log.syslog.hostname': socket.gethostname(), + 'log.origin.file.line': 2, + 'log.origin.file.name': 'a/b/c', + 'log.origin.function': 'f', + 'log.logger': 'foo', + 'message': 'test', + 'process.thread.name': 'baz', + 'process.name': 'bar', + 'process.pid': 1, + 'process.executable': sys.argv[0], + 'error.stack_trace': (ValueError, None, None), + 'error.type': 'ValueError', + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + 'taskName': None, + 'req_forwarded_proto': 'f-proto', + 'req_forwarded_for': ['f1', 'f2'], + 'flask_version': self.flask_version, + } + }) + if __name__ == '__main__': unittest.main() From fbe01d11aa86670b60700efae1f3dd3db92ec8c2 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Thu, 4 Jan 2024 01:55:16 +0200 Subject: [PATCH 02/12] Init _flask_version --- logstash_async/formatter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index f06ef26..6c1cd47 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -424,6 +424,7 @@ class MessageSchema(LogstashFormatter.MessageSchema): # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self._flask_version = None self._fetch_flask_version() # ---------------------------------------------------------------------- From 30372b2e4f6c2bdaa3d29d064632aae21eda6c39 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Thu, 4 Jan 2024 01:58:45 +0200 Subject: [PATCH 03/12] Split formatting logic --- logstash_async/formatter.py | 7 ++++++- tests/formatter_test.py | 18 ++++++------------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 6c1cd47..dac123f 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -120,6 +120,11 @@ def _prefetch_program_name(self): # ---------------------------------------------------------------------- def format(self, record): + message = self._format_to_dict(record) + return self._serialize(message) + + # ---------------------------------------------------------------------- + def _format_to_dict(self, record): Schema = self.MessageSchema message = { Schema.TIMESTAMP: self._format_timestamp(record.created), @@ -150,7 +155,7 @@ def format(self, record): self._move_extra_record_fields_to_prefix(message) self._post_process_message(message) - return self._serialize(message) + return message # ---------------------------------------------------------------------- def _format_timestamp(self, time_): diff --git a/tests/formatter_test.py b/tests/formatter_test.py index bafb0db..b3a2292 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -84,11 +84,10 @@ def test_format_timestamp_microsecond_2(self): result = formatter._format_timestamp(test_time_microsecond2) self.assertEqual(result, '2021-10-24T13:32:15.024Z') - @patch.object(LogstashFormatter, '_serialize', lambda s, m: m) @patch.object(LogstashFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): formatter = LogstashFormatter(tags=['t1', 't2']) - result = formatter.format(create_log_record()) + result = formatter._format_to_dict(create_log_record()) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', @@ -118,11 +117,10 @@ def test_default_schema(self): class LogstashEcsFormatterTest(unittest.TestCase): - @patch.object(LogstashEcsFormatter, '_serialize', lambda s, m: m) @patch.object(LogstashEcsFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): formatter = LogstashEcsFormatter(tags=['t1', 't2']) - result = formatter.format(create_log_record()) + result = formatter._format_to_dict(create_log_record()) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', @@ -182,7 +180,6 @@ def _create_request(self): class DjangoLogstashFormatterTest(DjangoTestMixin, unittest.TestCase): - @patch.object(DjangoLogstashFormatter, '_serialize', lambda s, m: m) @patch.object(DjangoLogstashFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): formatter = DjangoLogstashFormatter(tags=['t1', 't2']) @@ -192,7 +189,7 @@ def test_default_schema(self): 'message': 'tmsg', 'during': 'd', }), None) - result = formatter.format(create_log_record( + result = formatter._format_to_dict(create_log_record( status_code=500, request=self._create_request(), exc_info=exc_info, @@ -242,7 +239,6 @@ def test_default_schema(self): class DjangoLogstashEcsFormatterTest(DjangoTestMixin, unittest.TestCase): - @patch.object(DjangoLogstashEcsFormatter, '_serialize', lambda s, m: m) @patch.object(DjangoLogstashEcsFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): formatter = DjangoLogstashEcsFormatter(tags=['t1', 't2']) @@ -252,7 +248,7 @@ def test_default_schema(self): 'message': 'tmsg', 'during': 'd', }), None) - result = formatter.format(create_log_record( + result = formatter._format_to_dict(create_log_record( status_code=500, request=self._create_request(), exc_info=exc_info, @@ -328,12 +324,11 @@ def _create_request(self): class FlaskLogstashFormatterTest(FlaskTestMixin, unittest.TestCase): - @patch.object(FlaskLogstashFormatter, '_serialize', lambda s, m: m) @patch.object(FlaskLogstashFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): self.enterContext(patch('flask.request', self._create_request())) formatter = FlaskLogstashFormatter(tags=['t1', 't2']) - result = formatter.format(create_log_record(status_code=500)) + result = formatter._format_to_dict(create_log_record(status_code=500)) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', @@ -375,12 +370,11 @@ def test_default_schema(self): class FlaskLogstashEcsFormatterTest(FlaskTestMixin, unittest.TestCase): - @patch.object(FlaskLogstashEcsFormatter, '_serialize', lambda s, m: m) @patch.object(FlaskLogstashEcsFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): self.enterContext(patch('flask.request', self._create_request())) formatter = FlaskLogstashEcsFormatter(tags=['t1', 't2']) - result = formatter.format(create_log_record(status_code=500)) + result = formatter._format_to_dict(create_log_record(status_code=500)) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', From fd45badbf30e1ba80be497e596588e8f125ff416 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Thu, 4 Jan 2024 02:04:18 +0200 Subject: [PATCH 04/12] Move _LogstashMessageSchema --- logstash_async/formatter.py | 49 +++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index dac123f..bfbf1aa 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -21,31 +21,6 @@ import simplejson as json -class _LogstashMessageSchema: - TIMESTAMP = '@timestamp' - VERSION = '@version' - METADATA = '@metadata' - HOST = 'host' - LOG_LEVEL = 'level' - LOG_SOURCE = 'logsource' - LOGGER_NAME = 'logger_name' - LINE = 'line' - MESSAGE = 'message' - MESSAGE_TYPE = 'type' - FUNC_NAME = 'func_name' - THREAD_NAME = 'thread_name' - PROCESS_NAME = 'process_name' - INTERPRETER = 'interpreter' - INTERPRETER_VERSION = 'interpreter_version' - PATH = 'path' - PID = 'pid' - PROGRAM = 'program' - STACK_TRACE = 'stack_trace' - ERROR_TYPE = 'error_type' - TAGS = 'tags' - LOGSTASH_ASYNC_VERSION = 'logstash_async_version' - - class LogstashFormatter(logging.Formatter): _basic_data_types = (type(None), bool, str, int, float) @@ -53,7 +28,29 @@ class LogstashFormatter(logging.Formatter): formatter_record_field_skip_list = constants.FORMATTER_RECORD_FIELD_SKIP_LIST formatter_logstash_message_field_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST - MessageSchema = _LogstashMessageSchema + class MessageSchema: + TIMESTAMP = '@timestamp' + VERSION = '@version' + METADATA = '@metadata' + HOST = 'host' + LOG_LEVEL = 'level' + LOG_SOURCE = 'logsource' + LOGGER_NAME = 'logger_name' + LINE = 'line' + MESSAGE = 'message' + MESSAGE_TYPE = 'type' + FUNC_NAME = 'func_name' + THREAD_NAME = 'thread_name' + PROCESS_NAME = 'process_name' + INTERPRETER = 'interpreter' + INTERPRETER_VERSION = 'interpreter_version' + PATH = 'path' + PID = 'pid' + PROGRAM = 'program' + STACK_TRACE = 'stack_trace' + ERROR_TYPE = 'error_type' + TAGS = 'tags' + LOGSTASH_ASYNC_VERSION = 'logstash_async_version' # ---------------------------------------------------------------------- # pylint: disable=too-many-arguments From a1951fd6e8918f81b7b14940e4c187a2556354f3 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Thu, 4 Jan 2024 02:14:27 +0200 Subject: [PATCH 05/12] Use field sets instead of lists --- logstash_async/formatter.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index bfbf1aa..1dba351 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -25,8 +25,8 @@ class LogstashFormatter(logging.Formatter): _basic_data_types = (type(None), bool, str, int, float) - formatter_record_field_skip_list = constants.FORMATTER_RECORD_FIELD_SKIP_LIST - formatter_logstash_message_field_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST + formatter_record_field_skip_set = set(constants.FORMATTER_RECORD_FIELD_SKIP_LIST) + formatter_logstash_message_field_set = set(constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST) class MessageSchema: TIMESTAMP = '@timestamp' @@ -217,7 +217,7 @@ def _format_exception(self, exc_info): def _remove_excluded_fields(self, message, extra_fields): for fields in (message, extra_fields): for field_name in list(fields): - if field_name in self.formatter_record_field_skip_list: + if field_name in self.formatter_record_field_skip_set: del fields[field_name] # ---------------------------------------------------------------------- @@ -232,9 +232,9 @@ def _move_extra_record_fields_to_prefix(self, message): return # early out if no prefix is configured message.setdefault(self._extra_prefix, {}) - field_skip_list = self.formatter_logstash_message_field_list + [self._extra_prefix] + field_skip_set = self.formatter_logstash_message_field_set | {self._extra_prefix} for key in list(message): - if key not in field_skip_list: + if key not in field_skip_set: message[self._extra_prefix][key] = message.pop(key) # ---------------------------------------------------------------------- @@ -266,8 +266,8 @@ class LogstashEcsFormatter(LogstashFormatter): 'THREAD_NAME': 'process.thread.name', } - formatter_logstash_message_field_list = (LogstashFormatter.formatter_logstash_message_field_list - + list(__schema_dict.values())) + formatter_logstash_message_field_set = (LogstashFormatter.formatter_logstash_message_field_set + | set(__schema_dict.values())) MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) def _post_process_message(self, message): @@ -395,8 +395,8 @@ class DjangoLogstashEcsFormatter(DjangoLogstashFormatter, LogstashEcsFormatter): 'REQ_REFERER': 'http.request.referrer', } - formatter_logstash_message_field_list = (LogstashEcsFormatter.formatter_logstash_message_field_list - + list(__schema_dict.values())) + formatter_logstash_message_field_set = (LogstashEcsFormatter.formatter_logstash_message_field_set + | set(__schema_dict.values())) MessageSchema = type( 'MessageSchema', (DjangoLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), @@ -486,8 +486,8 @@ class FlaskLogstashEcsFormatter(FlaskLogstashFormatter, LogstashEcsFormatter): 'REQ_ID': 'http.request.id', } - formatter_logstash_message_field_list = (LogstashEcsFormatter.formatter_logstash_message_field_list - + list(__schema_dict.values())) + formatter_logstash_message_field_set = (LogstashEcsFormatter.formatter_logstash_message_field_set + | set(__schema_dict.values())) MessageSchema = type( 'MessageSchema', (FlaskLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), From 8f20512b3db7ca7b4289499eaf0c53b78eb88813 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Mon, 8 Jan 2024 02:11:15 +0200 Subject: [PATCH 06/12] Normalize ECS fields --- logstash_async/constants.py | 2 + logstash_async/formatter.py | 9 ++- logstash_async/utils.py | 58 +++++++++++++++ tests/formatter_test.py | 141 ++++++++++++++++++++++++------------ tests/utils_test.py | 49 +++++++++++++ 5 files changed, 210 insertions(+), 49 deletions(-) create mode 100644 tests/utils_test.py diff --git a/logstash_async/constants.py b/logstash_async/constants.py index 301da43..f32c0f5 100644 --- a/logstash_async/constants.py +++ b/logstash_async/constants.py @@ -44,6 +44,8 @@ class Constants: FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [ '@timestamp', '@version', 'host', 'level', 'logsource', 'message', 'pid', 'program', 'type', 'tags', '@metadata'] + # convert dotted ecs fields into nested objects + FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE = True # enable rate limiting for error messages (e.g. network errors) emitted by the logger # used in LogProcessingWorker, i.e. when transmitting log messages to the Logstash server. # Use a string like '5 per minute' or None to disable (default), for details see diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 1dba351..36e17f9 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -13,7 +13,7 @@ from logstash_async.constants import constants import logstash_async - +from logstash_async.utils import normalize_ecs_dict try: import json @@ -266,6 +266,7 @@ class LogstashEcsFormatter(LogstashFormatter): 'THREAD_NAME': 'process.thread.name', } + normalize_ecs_message = constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE formatter_logstash_message_field_set = (LogstashFormatter.formatter_logstash_message_field_set | set(__schema_dict.values())) MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) @@ -275,6 +276,12 @@ def _post_process_message(self, message): Schema = self.MessageSchema message[Schema.ECS_VERSION] = self.ecs_version + def _format_to_dict(self, record): + message = super()._format_to_dict(record) + if self.normalize_ecs_message: + message = normalize_ecs_dict(message) + return message + class DjangoLogstashFormatter(LogstashFormatter): class MessageSchema(LogstashFormatter.MessageSchema): diff --git a/logstash_async/utils.py b/logstash_async/utils.py index f2fd676..49a2c5b 100644 --- a/logstash_async/utils.py +++ b/logstash_async/utils.py @@ -5,6 +5,7 @@ from __future__ import print_function +from copy import deepcopy from datetime import datetime from importlib import import_module from itertools import chain, islice @@ -60,3 +61,60 @@ def import_string(dotted_path): except AttributeError as exc: raise ImportError( f'Module "{module_path}" does not define a "{class_name}" attribute/class') from exc + + +# ---------------------------------------------------------------------- +class normalize_ecs_dict: + """ + Convert dotted ecs fields into nested objects. + """ + + def __new__(cls, ecs_dict): + new_dict = deepcopy(ecs_dict) + cls.normalize_dict(new_dict) + return new_dict + + @classmethod + def normalize_dict(cls, ecs_dict): + for key in list(ecs_dict): + if '.' in key: + cls.merge_dicts(ecs_dict, cls.de_dot_record(key, ecs_dict.pop(key))) + for key, val in ecs_dict.items(): + cls.normalize_value(val) + + @classmethod + def normalize_sequence(cls, ecs_sequence): + for val in ecs_sequence: + cls.normalize_value(val) + + @classmethod + def normalize_value(cls, ecs_value): + if isinstance(ecs_value, dict): + cls.normalize_dict(ecs_value) + if isinstance(ecs_value, (list, tuple, set)): + cls.normalize_sequence(ecs_value) + + @classmethod + def merge_dicts(cls, target, src): + """ + Merge dicts recursively. + Mutates `target`. + Uses references from `src` which may lead to `src` mutation. + """ + for key, src_value in src.items(): + if key in target: + target_value = target[key] + if isinstance(target_value, dict) and isinstance(src_value, dict): + cls.merge_dicts(target_value, src_value) + else: + target[key] = src_value + else: + target[key] = src_value + + @classmethod + def de_dot_record(cls, key, value): + keys = key.split('.') + res = {keys.pop(): value} + for k in reversed(keys): + res = {k: res} + return res diff --git a/tests/formatter_test.py b/tests/formatter_test.py index b3a2292..c7c7398 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -116,11 +116,49 @@ def test_default_schema(self): }) +@patch.object(LogstashEcsFormatter, '_format_exception', lambda s, e: e) class LogstashEcsFormatterTest(unittest.TestCase): - @patch.object(LogstashEcsFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): formatter = LogstashEcsFormatter(tags=['t1', 't2']) result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': (ValueError, None, None), 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + 'taskName': None, + } + }) + + def test_dotted_schema(self): + class _LogstashEcsFormatter(LogstashEcsFormatter): + normalize_ecs_message = False + + formatter = _LogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', @@ -256,30 +294,34 @@ def test_default_schema(self): self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', - 'ecs.version': '8.11.0', - 'event.module': 'python-logstash', - 'host.hostname': socket.gethostname(), - 'client.domain': 'dj-host', - 'client.ip': 'dj-addr', - 'http.request.method': 'GET', - 'http.request.referrer': 'dj-ref', - 'http.response.status_code': 500, - 'url.original': None, - 'user.name': 'usr', - 'user_agent.original': 'dj-agent', - 'log.level': 'INFO', - 'log.syslog.hostname': socket.gethostname(), - 'log.origin.file.line': 2, - 'log.origin.file.name': 'a/b/c', - 'log.origin.function': 'f', - 'log.logger': 'foo', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'client': {'domain': 'dj-host', 'ip': 'dj-addr'}, + 'http': { + 'request': {'method': 'GET', 'referrer': 'dj-ref'}, + 'response': {'status_code': 500}, + }, + 'url': {'original': None}, + 'user': {'name': 'usr'}, + 'user_agent': {'original': 'dj-agent'}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, 'message': 'test', - 'process.thread.name': 'baz', - 'process.name': 'bar', - 'process.pid': 1, - 'process.executable': sys.argv[0], - 'error.stack_trace': exc_info, - 'error.type': 'ValueError', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': exc_info, 'type': 'ValueError'}, 'tags': ['t1', 't2'], 'extra': { 'interpreter': sys.executable, @@ -378,31 +420,34 @@ def test_default_schema(self): self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', - 'ecs.version': '8.11.0', - 'event.module': 'python-logstash', - 'host.hostname': socket.gethostname(), - 'client.domain': 'f-host', - 'client.ip': 'f-addr', - 'http.request.id': 'x-id', - 'http.request.method': 'GET', - 'http.request.referrer': 'f-ref', - 'http.response.status_code': 500, - 'url.original': 'f-url', - 'user.name': 'usr', - 'user_agent.original': 'f-agent', - 'log.level': 'INFO', - 'log.syslog.hostname': socket.gethostname(), - 'log.origin.file.line': 2, - 'log.origin.file.name': 'a/b/c', - 'log.origin.function': 'f', - 'log.logger': 'foo', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'client': {'domain': 'f-host', 'ip': 'f-addr'}, + 'http': { + 'request': {'id': 'x-id', 'method': 'GET', 'referrer': 'f-ref'}, + 'response': {'status_code': 500}, + }, + 'url': {'original': 'f-url'}, + 'user': {'name': 'usr'}, + 'user_agent': {'original': 'f-agent'}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, 'message': 'test', - 'process.thread.name': 'baz', - 'process.name': 'bar', - 'process.pid': 1, - 'process.executable': sys.argv[0], - 'error.stack_trace': (ValueError, None, None), - 'error.type': 'ValueError', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': (ValueError, None, None), 'type': 'ValueError'}, 'tags': ['t1', 't2'], 'extra': { 'interpreter': sys.executable, diff --git a/tests/utils_test.py b/tests/utils_test.py new file mode 100644 index 0000000..11d24d4 --- /dev/null +++ b/tests/utils_test.py @@ -0,0 +1,49 @@ +import unittest +from copy import deepcopy + +from logstash_async.utils import normalize_ecs_dict + + +class NormalizeEcsDictTest(unittest.TestCase): + def test_de_dot(self): + with self.subTest('no dots'): + result = normalize_ecs_dict.de_dot_record('a', {'x': [1]}) + self.assertDictEqual(result, {'a': {'x': [1]}}) + with self.subTest('dots'): + result = normalize_ecs_dict.de_dot_record('a.b.c', {'x': [1]}) + self.assertDictEqual(result, {'a': {'b': {'c': {'x': [1]}}}}) + + def test_normalization(self): + d = { + 'a': 1, + 'b': 11, + 'b.c': { + 'd.e': [2, ({'f.g': 3}, 4), 5], + 'h': None, + }, + 'b.c.x': {'y': 6}, + 'c': {'d': [1], 'e': 2}, + 'c.d': [2], + 'c.f': 3, + } + d_copy = deepcopy(d) + expected = { + 'a': 1, + 'b': { + 'c': { + 'd': { + 'e': [2, ({'f': {'g': 3}}, 4), 5], + }, + 'h': None, + 'x': {'y': 6}, + }, + }, + 'c': {'d': [2], 'e': 2, 'f': 3}, + } + result = normalize_ecs_dict(d) + self.assertDictEqual(result, expected) + + with self.subTest('source dict not mutated'): + self.assertDictEqual(d, d_copy) + result['c']['d'].append(22) + self.assertDictEqual(d, d_copy) From 97660bfb06ca17f3e4c743f3d63802bb00c01bf2 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Tue, 9 Jan 2024 00:49:41 +0200 Subject: [PATCH 07/12] Minor refactoring --- logstash_async/formatter.py | 86 ++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 36e17f9..41045f2 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -12,8 +12,8 @@ import uuid from logstash_async.constants import constants -import logstash_async from logstash_async.utils import normalize_ecs_dict +import logstash_async try: import json @@ -25,8 +25,8 @@ class LogstashFormatter(logging.Formatter): _basic_data_types = (type(None), bool, str, int, float) - formatter_record_field_skip_set = set(constants.FORMATTER_RECORD_FIELD_SKIP_LIST) - formatter_logstash_message_field_set = set(constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST) + field_skip_set = set(constants.FORMATTER_RECORD_FIELD_SKIP_LIST) + top_level_field_set = set(constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST) class MessageSchema: TIMESTAMP = '@timestamp' @@ -122,35 +122,18 @@ def format(self, record): # ---------------------------------------------------------------------- def _format_to_dict(self, record): - Schema = self.MessageSchema - message = { - Schema.TIMESTAMP: self._format_timestamp(record.created), - Schema.VERSION: '1', - Schema.HOST: self._host, - Schema.LOG_LEVEL: record.levelname, - Schema.LOG_SOURCE: self._logsource, - Schema.MESSAGE: record.getMessage(), - Schema.PID: record.process, - Schema.PROGRAM: self._program_name, - Schema.MESSAGE_TYPE: self._message_type, - } - if self._metadata: - message[Schema.METADATA] = self._metadata - if self._tags: - message[Schema.TAGS] = self._tags - + message = self._get_primary_fields(record) # record fields record_fields = self._get_record_fields(record) message.update(record_fields) # prepare dynamic extra fields extra_fields = self._get_extra_fields(record) - # remove all fields to be excluded - self._remove_excluded_fields(message, extra_fields) message.update(extra_fields) + # remove all fields to be excluded + self._remove_excluded_fields(message) # move existing extra record fields into the configured prefix self._move_extra_record_fields_to_prefix(message) - self._post_process_message(message) return message @@ -180,6 +163,26 @@ def _value_repr(self, value): else: return repr(value) + # ---------------------------------------------------------------------- + def _get_primary_fields(self, record): + Schema = self.MessageSchema + primary_fields = { + Schema.TIMESTAMP: self._format_timestamp(record.created), + Schema.VERSION: '1', + Schema.HOST: self._host, + Schema.LOG_LEVEL: record.levelname, + Schema.LOG_SOURCE: self._logsource, + Schema.MESSAGE: record.getMessage(), + Schema.PID: record.process, + Schema.PROGRAM: self._program_name, + Schema.MESSAGE_TYPE: self._message_type, + } + if self._metadata: + primary_fields[Schema.METADATA] = self._metadata + if self._tags: + primary_fields[Schema.TAGS] = self._tags + return primary_fields + # ---------------------------------------------------------------------- def _get_extra_fields(self, record): Schema = self.MessageSchema @@ -214,11 +217,10 @@ def _format_exception(self, exc_info): return stack_trace # ---------------------------------------------------------------------- - def _remove_excluded_fields(self, message, extra_fields): - for fields in (message, extra_fields): - for field_name in list(fields): - if field_name in self.formatter_record_field_skip_set: - del fields[field_name] + def _remove_excluded_fields(self, message): + for field_name in list(message): + if field_name in self.field_skip_set: + del message[field_name] # ---------------------------------------------------------------------- def _move_extra_record_fields_to_prefix(self, message): @@ -232,15 +234,11 @@ def _move_extra_record_fields_to_prefix(self, message): return # early out if no prefix is configured message.setdefault(self._extra_prefix, {}) - field_skip_set = self.formatter_logstash_message_field_set | {self._extra_prefix} + field_skip_set = self.top_level_field_set | {self._extra_prefix} for key in list(message): if key not in field_skip_set: message[self._extra_prefix][key] = message.pop(key) - # ---------------------------------------------------------------------- - def _post_process_message(self, message): - """Override when needed""" - # ---------------------------------------------------------------------- def _serialize(self, message): return json.dumps(message, ensure_ascii=self._ensure_ascii) @@ -267,14 +265,14 @@ class LogstashEcsFormatter(LogstashFormatter): } normalize_ecs_message = constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE - formatter_logstash_message_field_set = (LogstashFormatter.formatter_logstash_message_field_set - | set(__schema_dict.values())) + top_level_field_set = LogstashFormatter.top_level_field_set | set(__schema_dict.values()) MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) - def _post_process_message(self, message): - super()._post_process_message(message) + def _get_primary_fields(self, record): + message = super()._get_primary_fields(record) Schema = self.MessageSchema message[Schema.ECS_VERSION] = self.ecs_version + return message def _format_to_dict(self, record): message = super()._format_to_dict(record) @@ -402,17 +400,16 @@ class DjangoLogstashEcsFormatter(DjangoLogstashFormatter, LogstashEcsFormatter): 'REQ_REFERER': 'http.request.referrer', } - formatter_logstash_message_field_set = (LogstashEcsFormatter.formatter_logstash_message_field_set - | set(__schema_dict.values())) + top_level_field_set = LogstashEcsFormatter.top_level_field_set | set(__schema_dict.values()) MessageSchema = type( 'MessageSchema', (DjangoLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), __schema_dict, ) - def _remove_excluded_fields(self, message, extra_fields): + def _remove_excluded_fields(self, message): message.pop('status_code', None) - super()._remove_excluded_fields(message, extra_fields) + super()._remove_excluded_fields(message) class FlaskLogstashFormatter(LogstashFormatter): @@ -493,14 +490,13 @@ class FlaskLogstashEcsFormatter(FlaskLogstashFormatter, LogstashEcsFormatter): 'REQ_ID': 'http.request.id', } - formatter_logstash_message_field_set = (LogstashEcsFormatter.formatter_logstash_message_field_set - | set(__schema_dict.values())) + top_level_field_set = LogstashEcsFormatter.top_level_field_set | set(__schema_dict.values()) MessageSchema = type( 'MessageSchema', (FlaskLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), __schema_dict, ) - def _remove_excluded_fields(self, message, extra_fields): + def _remove_excluded_fields(self, message): message.pop('status_code', None) - super()._remove_excluded_fields(message, extra_fields) + super()._remove_excluded_fields(message) From dd79950c811050be7ed8ea4d370f1141b6259667 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Tue, 9 Jan 2024 15:35:31 +0200 Subject: [PATCH 08/12] Add FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST --- logstash_async/constants.py | 3 ++- logstash_async/formatter.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/logstash_async/constants.py b/logstash_async/constants.py index f32c0f5..b12edfd 100644 --- a/logstash_async/constants.py +++ b/logstash_async/constants.py @@ -44,7 +44,8 @@ class Constants: FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [ '@timestamp', '@version', 'host', 'level', 'logsource', 'message', 'pid', 'program', 'type', 'tags', '@metadata'] - # convert dotted ecs fields into nested objects + FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST = ['@timestamp', '@version', '@metadata', 'message', 'labels', 'tags'] + # convert dotted ECS fields into nested objects FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE = True # enable rate limiting for error messages (e.g. network errors) emitted by the logger # used in LogProcessingWorker, i.e. when transmitting log messages to the Logstash server. diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 41045f2..15ac7c0 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -265,7 +265,7 @@ class LogstashEcsFormatter(LogstashFormatter): } normalize_ecs_message = constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE - top_level_field_set = LogstashFormatter.top_level_field_set | set(__schema_dict.values()) + top_level_field_set = {*constants.FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST, *__schema_dict.values()} MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) def _get_primary_fields(self, record): From 82af01762b1f74ed64ab6c5ca7d3a8767348d02e Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Mon, 22 Jan 2024 23:40:51 +0200 Subject: [PATCH 09/12] Lint --- logstash_async/constants.py | 3 ++- logstash_async/formatter.py | 14 ++++++++++---- logstash_async/utils.py | 1 + tests/formatter_test.py | 27 +++++++++++++++++++-------- tests/utils_test.py | 3 ++- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/logstash_async/constants.py b/logstash_async/constants.py index b12edfd..0ae01ef 100644 --- a/logstash_async/constants.py +++ b/logstash_async/constants.py @@ -44,7 +44,8 @@ class Constants: FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [ '@timestamp', '@version', 'host', 'level', 'logsource', 'message', 'pid', 'program', 'type', 'tags', '@metadata'] - FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST = ['@timestamp', '@version', '@metadata', 'message', 'labels', 'tags'] + FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST = [ + '@timestamp', '@version', '@metadata', 'message', 'labels', 'tags'] # convert dotted ECS fields into nested objects FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE = True # enable rate limiting for error messages (e.g. network errors) emitted by the logger diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 15ac7c0..edf15aa 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -15,6 +15,7 @@ from logstash_async.utils import normalize_ecs_dict import logstash_async + try: import json except ImportError: @@ -265,7 +266,8 @@ class LogstashEcsFormatter(LogstashFormatter): } normalize_ecs_message = constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE - top_level_field_set = {*constants.FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST, *__schema_dict.values()} + top_level_field_set = {*constants.FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST, + *__schema_dict.values()} MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) def _get_primary_fields(self, record): @@ -277,6 +279,7 @@ def _get_primary_fields(self, record): def _format_to_dict(self, record): message = super()._format_to_dict(record) if self.normalize_ecs_message: + # pylint: disable-next=redefined-variable-type message = normalize_ecs_dict(message) return message @@ -435,19 +438,22 @@ def __init__(self, *args, **kwargs): # ---------------------------------------------------------------------- def _fetch_flask_version(self): - from flask import __version__ # pylint: disable=import-error,import-outside-toplevel + # pylint: disable-next=import-error,import-outside-toplevel,no-name-in-module + from flask import __version__ self._flask_version = __version__ # ---------------------------------------------------------------------- def _get_extra_fields(self, record): - from flask import request # pylint: disable=import-error,import-outside-toplevel + # pylint: disable-next=import-error,import-outside-toplevel + from flask import request extra_fields = super()._get_extra_fields(record) Schema = self.MessageSchema extra_fields[Schema.FLASK_VERSION] = self._flask_version if request: # request might be unbound in other threads - extra_fields[Schema.REQ_USER_AGENT] = str(request.user_agent) if request.user_agent else '' + extra_fields[Schema.REQ_USER_AGENT] = (str(request.user_agent) + if request.user_agent else '') extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.remote_addr extra_fields[Schema.REQ_HOST] = request.host.split(':', 1)[0] extra_fields[Schema.REQ_URI] = request.url diff --git a/logstash_async/utils.py b/logstash_async/utils.py index 49a2c5b..9274538 100644 --- a/logstash_async/utils.py +++ b/logstash_async/utils.py @@ -64,6 +64,7 @@ def import_string(dotted_path): # ---------------------------------------------------------------------- +# pylint: disable-next=invalid-name class normalize_ecs_dict: """ Convert dotted ecs fields into nested objects. diff --git a/tests/formatter_test.py b/tests/formatter_test.py index c7c7398..0838427 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -3,18 +3,25 @@ # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. -import socket from contextlib import suppress from logging import FileHandler, makeLogRecord +from types import SimpleNamespace +from unittest.mock import patch import os +import socket import sys import unittest -from types import SimpleNamespace -from unittest.mock import patch +from logstash_async.formatter import ( + DjangoLogstashEcsFormatter, + DjangoLogstashFormatter, + FlaskLogstashEcsFormatter, + FlaskLogstashFormatter, + LogstashEcsFormatter, + LogstashFormatter, +) import logstash_async -from logstash_async.formatter import LogstashFormatter, DjangoLogstashFormatter, FlaskLogstashFormatter, \ - LogstashEcsFormatter, DjangoLogstashEcsFormatter, FlaskLogstashEcsFormatter + # pylint: disable=protected-access @@ -190,12 +197,15 @@ class _LogstashEcsFormatter(LogstashEcsFormatter): class DjangoTestMixin: @classmethod - def setUpClass(cls): + def setUpClass(cls): # pylint: disable=invalid-name super().setUpClass() - import django + # pylint: disable=import-outside-toplevel from django.conf import settings from django.http import HttpRequest + import django + + # pylint: enable=import-outside-toplevel with suppress(RuntimeError): settings.configure() @@ -342,9 +352,10 @@ def test_default_schema(self): class FlaskTestMixin: @classmethod - def setUpClass(cls): + def setUpClass(cls): # pylint: disable=invalid-name super().setUpClass() + # pylint: disable-next=import-outside-toplevel,no-name-in-module from flask import __version__ cls.flask_version = __version__ diff --git a/tests/utils_test.py b/tests/utils_test.py index 11d24d4..1b1c749 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -1,5 +1,5 @@ -import unittest from copy import deepcopy +import unittest from logstash_async.utils import normalize_ecs_dict @@ -45,5 +45,6 @@ def test_normalization(self): with self.subTest('source dict not mutated'): self.assertDictEqual(d, d_copy) + # pylint: disable-next=unsubscriptable-object result['c']['d'].append(22) self.assertDictEqual(d, d_copy) From 3da892d9ccc5d51527b09a746180ba8d6ac404b1 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Thu, 25 Jan 2024 02:43:18 +0200 Subject: [PATCH 10/12] Add tox deps --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 422fc05..5ed3032 100644 --- a/tox.ini +++ b/tox.ini @@ -15,6 +15,8 @@ deps = flake8 isort pylint + Django + Flask commands = # linting and code analysis {envbindir}/flake8 {[tox]logstash_async_modules} From fb0bce9ef0a616e1894b9a7d0b223a716042f5c7 Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Mon, 29 Jan 2024 17:04:39 +0200 Subject: [PATCH 11/12] Fix Flask version warning --- logstash_async/formatter.py | 5 ++--- tests/formatter_test.py | 6 ++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index edf15aa..86f5fd8 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -4,6 +4,7 @@ # of the MIT license. See the LICENSE file for details. from datetime import date, datetime +import importlib.metadata import logging import socket import sys @@ -438,9 +439,7 @@ def __init__(self, *args, **kwargs): # ---------------------------------------------------------------------- def _fetch_flask_version(self): - # pylint: disable-next=import-error,import-outside-toplevel,no-name-in-module - from flask import __version__ - self._flask_version = __version__ + self._flask_version = importlib.metadata.version('flask') # ---------------------------------------------------------------------- def _get_extra_fields(self, record): diff --git a/tests/formatter_test.py b/tests/formatter_test.py index 0838427..4163f7d 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -7,6 +7,7 @@ from logging import FileHandler, makeLogRecord from types import SimpleNamespace from unittest.mock import patch +import importlib.metadata import os import socket import sys @@ -354,10 +355,7 @@ class FlaskTestMixin: @classmethod def setUpClass(cls): # pylint: disable=invalid-name super().setUpClass() - - # pylint: disable-next=import-outside-toplevel,no-name-in-module - from flask import __version__ - cls.flask_version = __version__ + cls.flask_version = importlib.metadata.version('flask') def _create_request(self): return SimpleNamespace( From 1f9e18cf29b32edc01c95aba6255daa0a36f77ac Mon Sep 17 00:00:00 2001 From: Andrii Lahuta <13280256+andriilahuta@users.noreply.github.com> Date: Mon, 29 Jan 2024 18:15:49 +0200 Subject: [PATCH 12/12] Support LogRecord.taskName. Fix tests for python <3.12. --- .github/workflows/tests.yml | 2 +- logstash_async/constants.py | 2 +- logstash_async/formatter.py | 3 +++ tests/formatter_test.py | 19 ++++++------------- tox.ini | 2 +- 5 files changed, 12 insertions(+), 16 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2490cb9..33c63ec 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: ['3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] fail-fast: false steps: diff --git a/logstash_async/constants.py b/logstash_async/constants.py index 0ae01ef..86fe1e6 100644 --- a/logstash_async/constants.py +++ b/logstash_async/constants.py @@ -38,7 +38,7 @@ class Constants: 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', 'msecs', 'msg', 'name', 'pathname', 'process', - 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName'] + 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName', 'taskName'] # fields to be set on the top-level of a Logstash event/message, do not modify this # unless you know what you are doing FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [ diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 86f5fd8..cc9cf09 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -42,6 +42,7 @@ class MessageSchema: MESSAGE = 'message' MESSAGE_TYPE = 'type' FUNC_NAME = 'func_name' + TASK_NAME = 'task_name' THREAD_NAME = 'thread_name' PROCESS_NAME = 'process_name' INTERPRETER = 'interpreter' @@ -202,6 +203,8 @@ def _get_extra_fields(self, record): # static extra fields if self._extra: extra_fields.update(self._extra) + if getattr(record, 'taskName', None): + extra_fields[Schema.TASK_NAME] = record.taskName # exceptions if record.exc_info: extra_fields[Schema.ERROR_TYPE] = record.exc_info[0].__name__ diff --git a/tests/formatter_test.py b/tests/formatter_test.py index 4163f7d..c1d916e 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -117,7 +117,6 @@ def test_default_schema(self): 'path': 'a/b/c', 'process_name': 'bar', 'thread_name': 'baz', - 'taskName': None, 'stack_trace': (ValueError, None, None), 'error_type': 'ValueError', } @@ -157,7 +156,6 @@ def test_default_schema(self): 'interpreter': sys.executable, 'interpreter_version': _interpreter_version, 'logstash_async_version': logstash_async.__version__, - 'taskName': None, } }) @@ -191,7 +189,6 @@ class _LogstashEcsFormatter(LogstashEcsFormatter): 'interpreter': sys.executable, 'interpreter_version': _interpreter_version, 'logstash_async_version': logstash_async.__version__, - 'taskName': None, } }) @@ -264,7 +261,6 @@ def test_default_schema(self): 'path': 'a/b/c', 'process_name': 'bar', 'thread_name': 'baz', - 'taskName': None, 'stack_trace': exc_info, 'error_type': 'ValueError', 'status_code': 500, @@ -338,7 +334,6 @@ def test_default_schema(self): 'interpreter': sys.executable, 'interpreter_version': _interpreter_version, 'logstash_async_version': logstash_async.__version__, - 'taskName': None, 'req_forwarded_proto': 'dj-f-proto', 'req_forwarded_for': ['dj-f1', 'dj-f2'], 'tmpl_name': 'tpl', @@ -377,9 +372,9 @@ def _create_request(self): class FlaskLogstashFormatterTest(FlaskTestMixin, unittest.TestCase): @patch.object(FlaskLogstashFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): - self.enterContext(patch('flask.request', self._create_request())) - formatter = FlaskLogstashFormatter(tags=['t1', 't2']) - result = formatter._format_to_dict(create_log_record(status_code=500)) + with patch('flask.request', self._create_request()): + formatter = FlaskLogstashFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record(status_code=500)) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', @@ -401,7 +396,6 @@ def test_default_schema(self): 'path': 'a/b/c', 'process_name': 'bar', 'thread_name': 'baz', - 'taskName': None, 'error_type': 'ValueError', 'stack_trace': (ValueError, None, None), 'status_code': 500, @@ -423,9 +417,9 @@ def test_default_schema(self): class FlaskLogstashEcsFormatterTest(FlaskTestMixin, unittest.TestCase): @patch.object(FlaskLogstashEcsFormatter, '_format_exception', lambda s, e: e) def test_default_schema(self): - self.enterContext(patch('flask.request', self._create_request())) - formatter = FlaskLogstashEcsFormatter(tags=['t1', 't2']) - result = formatter._format_to_dict(create_log_record(status_code=500)) + with patch('flask.request', self._create_request()): + formatter = FlaskLogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record(status_code=500)) self.assertDictEqual(result, { '@timestamp': '2021-10-24T13:32:15.024Z', '@version': '1', @@ -462,7 +456,6 @@ def test_default_schema(self): 'interpreter': sys.executable, 'interpreter_version': _interpreter_version, 'logstash_async_version': logstash_async.__version__, - 'taskName': None, 'req_forwarded_proto': 'f-proto', 'req_forwarded_for': ['f1', 'f2'], 'flask_version': self.flask_version, diff --git a/tox.ini b/tox.ini index 5ed3032..1ff2b55 100644 --- a/tox.ini +++ b/tox.ini @@ -6,7 +6,7 @@ [tox] skip_missing_interpreters = true envlist = - docs,py38,py39,py310,py311,pypy3 + docs,py38,py39,py310,py311,py312,pypy3 logstash_async_modules = logstash_async tests