Skip to content

Commit

Permalink
Fix Invalid log order in ElasticsearchTaskHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
suhanovv committed Aug 11, 2021
1 parent 719709b commit 2783dbd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
8 changes: 6 additions & 2 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list:

return logs

def emit(self, record):
if self.handler:
record.offset = int(time() * (10 ** 9))
self.handler.emit(record)

def set_context(self, ti: TaskInstance) -> None:
"""
Provide task_instance context to airflow task handler.
Expand All @@ -261,14 +266,13 @@ def set_context(self, ti: TaskInstance) -> None:
if self.json_format:
self.formatter = JSONFormatter(
fmt=self.formatter._fmt,
json_fields=self.json_fields,
json_fields=self.json_fields + [self.offset_field],
extras={
'dag_id': str(ti.dag_id),
'task_id': str(ti.task_id),
'execution_date': self._clean_execution_date(ti.execution_date),
'try_number': str(ti.try_number),
'log_id': self._render_log_id(ti, ti.try_number),
'offset': int(time() * (10 ** 9)),
},
)

Expand Down
45 changes: 44 additions & 1 deletion tests/providers/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import io
import json
import logging
import os
import shutil
Expand All @@ -24,6 +25,7 @@
from urllib.parse import quote

import elasticsearch
import freezegun
import pendulum
from parameterized import parameterized

Expand Down Expand Up @@ -442,3 +444,44 @@ def test_get_external_log_url(self, json_format, es_frontend, expected_url):
def test_supports_external_link(self, frontend, expected):
self.es_task_handler.frontend = frontend
assert self.es_task_handler.supports_external_link == expected

@mock.patch('sys.__stdout__', new_callable=io.StringIO)
def test_dynamic_offset(self, stdout_mock):
# arrange
handler = ElasticsearchTaskHandler(
base_log_folder=self.local_log_location,
filename_template=self.filename_template,
log_id_template=self.log_id_template,
end_of_log_mark=self.end_of_log_mark,
write_stdout=True,
json_format=True,
json_fields=self.json_fields,
host_field=self.host_field,
offset_field=self.offset_field,
)
handler.formatter = logging.Formatter()

logger = logging.getLogger(__name__)
logger.handlers = [handler]
logger.propagate = False

self.ti._log = logger
handler.set_context(self.ti)

t1 = pendulum.naive(year=2017, month=1, day=1, hour=1, minute=1, second=15)
t2, t3 = t1 + pendulum.duration(seconds=5), t1 + pendulum.duration(seconds=10)

# act
with freezegun.freeze_time(t1):
self.ti.log.info("Test")
with freezegun.freeze_time(t2):
self.ti.log.info("Test2")
with freezegun.freeze_time(t3):
self.ti.log.info("Test3")

# assert
first_log, second_log, third_log = map(json.loads, stdout_mock.getvalue().strip().split("\n"))
assert first_log['offset'] < second_log['offset'] < third_log['offset']
assert first_log['asctime'] == t1.format("YYYY-MM-DD HH:mm:ss,SSS")
assert second_log['asctime'] == t2.format("YYYY-MM-DD HH:mm:ss,SSS")
assert third_log['asctime'] == t3.format("YYYY-MM-DD HH:mm:ss,SSS")

0 comments on commit 2783dbd

Please sign in to comment.