diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 9982a5a63c49f..0b1a456c83504 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -141,10 +141,10 @@ def _read(self, ti, try_number, metadata=None): if 'last_log_timestamp' in metadata: last_log_ts = timezone.parse(metadata['last_log_timestamp']) if cur_ts.diff(last_log_ts).in_minutes() >= 5 or 'max_offset' in metadata \ - and offset >= metadata['max_offset']: + and int(offset) >= int(metadata['max_offset']): metadata['end_of_log'] = True - if offset != next_offset or 'last_log_timestamp' not in metadata: + if int(offset) != int(next_offset) or 'last_log_timestamp' not in metadata: metadata['last_log_timestamp'] = str(cur_ts) # If we hit the end of the log, remove the actual end_of_log message