Skip to content

Commit

Permalink
Incorrect try number subtraction producing invalid span id for OTEL a…
Browse files Browse the repository at this point in the history
…irflow (issue apache#41501) (apache#41502)

* Fix for issue apache#39336

* removed unnecessary import
  • Loading branch information
howardyoo authored and romsharon98 committed Aug 20, 2024
1 parent 8cba019 commit c48892e
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 10 deletions.
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)

self.change_state(key, TaskInstanceState.SUCCESS, info)

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def execute_async(
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(command))

local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def execute_async(
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(self.commands_to_run))

def sync(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) ->
span.set_attribute("hostname", ti.hostname)
span.set_attribute("log_url", ti.log_url)
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number - 1)
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
Expand Down
7 changes: 1 addition & 6 deletions airflow/traces/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from airflow.traces import NO_TRACE_ID
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from airflow.models import DagRun, TaskInstance
Expand Down Expand Up @@ -75,12 +74,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
"""Generate span id from the task instance."""
dag_run = ti.dag_run
if ti.state == TaskInstanceState.SUCCESS or ti.state == TaskInstanceState.FAILED:
try_number = ti.try_number - 1
else:
try_number = ti.try_number
return _gen_id(
[dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)],
[dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
as_int,
SPAN_ID,
)
Expand Down

0 comments on commit c48892e

Please sign in to comment.