Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove default owner #27541

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2794,9 +2794,10 @@ components:
readOnly: true
nullable: true
owner:
description: Name of the user who triggered these events a.
description: Name of the user who triggered these events.
type: string
readOnly: true
nullable: true
extra:
description: |
Other information that was not included in the other fields, e.g. the complete CLI command.
Expand Down Expand Up @@ -3384,6 +3385,7 @@ components:
owner:
type: string
readOnly: true
nullable: true
start_date:
type: string
format: 'date-time'
Expand Down
10 changes: 6 additions & 4 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ class Meta:
@staticmethod
def get_owners(obj: DagModel):
"""Convert owners attribute to DAG representation"""
if not getattr(obj, 'owners', None):
if obj.owners is None:
return []
return obj.owners.split(",")
else:
return obj.owners.split(",")

@staticmethod
def get_token(obj: DagModel):
Expand Down Expand Up @@ -126,9 +127,10 @@ def get_tags(obj: DAG):
@staticmethod
def get_owners(obj: DAG):
"""Convert owners attribute to DAG representation"""
if not getattr(obj, 'owner', None):
if obj.owner is None:
return []
return obj.owner.split(",")
else:
return obj.owner.split(",")

@staticmethod
def get_is_paused(obj: DAG):
Expand Down
8 changes: 0 additions & 8 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,6 @@
- name: operators
description: ~
options:
- name: default_owner
description: |
The default owner assigned to each new operator, unless
provided explicitly or passed via ``default_args``
version_added: ~
type: string
example: ~
default: "airflow"
- name: default_cpus
description: ~
version_added: ~
Expand Down
3 changes: 0 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,6 @@ username =
password =

[operators]
# The default owner assigned to each new operator, unless
# provided explicitly or passed via ``default_args``
default_owner = airflow
Copy link
Contributor

@eladkal eladkal Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen to users who customized their airflow.cfg with
default_owner = something

This will no longer work for this... so isn't this a breaking change that we can do only in Airflow 3?

Copy link
Contributor Author

@BasPH BasPH Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct. Not sure how to best deal with that, should I make a 2nd PR that deprecates the config which can be included with Airflow 2.5?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we deprecating this at all? I actually like the proposal of not using "airflow" as the default, but I don't see why that means we have to also deprecate the ability for folks to set a default owner at the config level at all? Perhaps they have a default that makes sense for their workflow or organizational structure.

IMHO, this config option should just no longer be a "mandatory" value, but still available to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some research within Astronomer and there's not a single customer setting this option. I figured we might as well remove it for the sake of simplifying Airflow configuration. WDYT?

Either way (remove/change to None), this should be treated as a breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the discussion in #27067 (comment) - this seems like a good candidate to remove IF we agree that we can treat "breaking" in a less strict and more "how likely it's going to break other's workflow" way.

Very good example of a case where we could take a risk and classify it as "non-breaking" (even if it is technically a removal).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got a bit lost in that very long message. Are you proposing to discuss the definition of "breaking change"?

In this PR it's clear --> user facing feature, so treat as breaking change, so change in Airflow 3.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Jarek that we could maybe convince ourselves that this is not a hard breaking change, but I actually think this feature is somewhat useful and don't see the motivation for getting rid of it.

We can still keep the ability to set that a globally default user, but just make the config no longer mandatory (since it is currently mandatory). That way you can remove it from the newly published configs, but those users who still have this setting in their configs will not have any regressions and that feature will continue to work.

default_cpus = 1
default_ram = 512
default_disk = 512
Expand Down
3 changes: 1 addition & 2 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
from airflow.models.operator import Operator
from airflow.models.taskinstance import TaskInstance

DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner")
DEFAULT_POOL_SLOTS: int = 1
DEFAULT_PRIORITY_WEIGHT: int = 1
DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
Expand Down Expand Up @@ -90,7 +89,7 @@ class AbstractOperator(LoggingMixin, DAGNode):
# Defines which files extensions to look for in the templated fields.
template_ext: Sequence[str]

owner: str
owner: str | None
task_id: str

outlets: list
Expand Down
5 changes: 2 additions & 3 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.abstractoperator import (
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
DEFAULT_PRIORITY_WEIGHT,
DEFAULT_QUEUE,
Expand Down Expand Up @@ -190,7 +189,7 @@ def partial(
task_group: TaskGroup | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
owner: str = DEFAULT_OWNER,
owner: str | None = None,
email: None | str | Iterable[str] = None,
params: dict | None = None,
resources: dict[str, Any] | None = None,
Expand Down Expand Up @@ -700,7 +699,7 @@ class derived from this one results in the creation of a task object,
def __init__(
self,
task_id: str,
owner: str = DEFAULT_OWNER,
owner: str | None = None,
email: str | Iterable[str] | None = None,
email_on_retry: bool = conf.getboolean('email', 'default_email_on_retry', fallback=True),
email_on_failure: bool = conf.getboolean('email', 'default_email_on_failure', fallback=True),
Expand Down
8 changes: 6 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,13 +1225,17 @@ def folder(self) -> str:
return os.path.dirname(self.fileloc)

@property
def owner(self) -> str:
def owner(self) -> str | None:
"""
Return list of all owners found in DAG tasks.

:return: Comma separated list of owners in DAG tasks
"""
return ", ".join({t.owner for t in self.tasks})
owners = list(filter(None, {t.owner for t in self.tasks}))
if owners:
return ", ".join(owners)
else:
return None

@property
def allow_future_exec_dates(self) -> bool:
Expand Down
5 changes: 2 additions & 3 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from airflow.exceptions import AirflowException, UnmappableOperator
from airflow.models.abstractoperator import (
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
DEFAULT_PRIORITY_WEIGHT,
DEFAULT_QUEUE,
Expand Down Expand Up @@ -372,8 +371,8 @@ def leaves(self) -> Sequence[AbstractOperator]:
return [self]

@property
def owner(self) -> str: # type: ignore[override]
return self.partial_kwargs.get("owner", DEFAULT_OWNER)
def owner(self) -> str | None: # type: ignore[override]
return self.partial_kwargs.get("owner", None)

@property
def email(self) -> None | str | Iterable[str]:
Expand Down
6 changes: 3 additions & 3 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,8 @@ export interface components {
* @description When the event was dispatched for an object having execution_date, the value of this field.
*/
execution_date?: string | null;
/** @description Name of the user who triggered these events a. */
owner?: string;
/** @description Name of the user who triggered these events. */
owner?: string | null;
/** @description Other information that was not included in the other fields, e.g. the complete CLI command. */
extra?: string | null;
};
Expand Down Expand Up @@ -1371,7 +1371,7 @@ export interface components {
Task: {
class_ref?: components["schemas"]["ClassReference"];
task_id?: string;
owner?: string;
owner?: string | null;
/** Format: date-time */
start_date?: string;
/** Format: date-time */
Expand Down
24 changes: 13 additions & 11 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,19 @@ <h2>{{ page_title }}</h2>
</div>
</td>
<td>
{% for owner in dag.owners.split(",") %}
<a class="label label-default"
{% if owner_links and owner.strip() in owner_links.get(dag.dag_id, {}) %}
href="{{ owner_links[dag.dag_id].get(owner.strip()) }}" target="_blank"
{% else %}
href="?search={{ owner | trim }}"
{% endif %}
style="margin: 3px 6px 3px 0;">
{{ owner | trim }}
</a>
{% endfor %}
{% if dag.owners is not none %}
{% for owner in dag.owners.split(",") %}
<a class="label label-default"
{% if owner_links and owner.strip() in owner_links.get(dag.dag_id, {}) %}
href="{{ owner_links[dag.dag_id].get(owner.strip()) }}" target="_blank"
{% else %}
href="?search={{ owner | trim }}"
{% endif %}
style="margin: 3px 6px 3px 0;">
{{ owner | trim }}
</a>
{% endfor %}
{% endif %}
</td>
<td style="padding:0; width:130px;">
{{ loading_dots(classes='js-loading-dag-run-stats text-muted') }}
Expand Down
10 changes: 5 additions & 5 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def test_should_respond_200(self, current_file_token):
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down Expand Up @@ -319,7 +319,7 @@ def test_should_response_200_with_doc_md_none(self, current_file_token):
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"owners": [],
"params": {},
"schedule_interval": {
"__type": "TimeDelta",
Expand Down Expand Up @@ -360,7 +360,7 @@ def test_should_response_200_for_null_start_date(self, current_file_token):
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"owners": [],
"params": {},
"schedule_interval": {
"__type": "TimeDelta",
Expand Down Expand Up @@ -404,7 +404,7 @@ def test_should_respond_200_serialized(self, current_file_token):
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down Expand Up @@ -458,7 +458,7 @@ def test_should_respond_200_serialized(self, current_file_token):
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ["airflow"],
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down
8 changes: 4 additions & 4 deletions tests/api_connexion/endpoints/test_event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_should_respond_200(self, log_model):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time.isoformat(),
"extra": None,
}
Expand Down Expand Up @@ -153,7 +153,7 @@ def test_should_respond_200(self, session, create_log_model):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time.isoformat(),
"extra": None,
},
Expand All @@ -163,7 +163,7 @@ def test_should_respond_200(self, session, create_log_model):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time_2.isoformat(),
"extra": None,
},
Expand Down Expand Up @@ -220,7 +220,7 @@ def test_order_eventlogs_by_owner(self, create_log_model, session):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time.isoformat(),
"extra": None,
},
Expand Down
14 changes: 7 additions & 7 deletions tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_should_respond_200(self):
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down Expand Up @@ -151,7 +151,7 @@ def test_mapped_task(self):
"extra_links": [],
"is_mapped": True,
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
Expand Down Expand Up @@ -196,7 +196,7 @@ def test_should_respond_200_serialized(self):
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_should_respond_200(self):
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_should_respond_200(self):
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
Expand Down Expand Up @@ -340,7 +340,7 @@ def test_get_tasks_mapped(self):
"extra_links": [],
"is_mapped": True,
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
Expand Down Expand Up @@ -369,7 +369,7 @@ def test_get_tasks_mapped(self):
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {},
"pool": "default_pool",
"pool_slots": 1.0,
Expand Down
6 changes: 3 additions & 3 deletions tests/api_connexion/schemas/test_event_log_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_serialize(self, task_instance):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time.isoformat(),
"extra": None,
}
Expand All @@ -78,7 +78,7 @@ def test_serialize(self, task_instance):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time.isoformat(),
"extra": None,
},
Expand All @@ -88,7 +88,7 @@ def test_serialize(self, task_instance):
"dag_id": "TEST_DAG_ID",
"task_id": "TEST_TASK_ID",
"execution_date": self.default_time.isoformat(),
"owner": "airflow",
"owner": None,
"when": self.default_time2.isoformat(),
"extra": None,
},
Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/schemas/test_task_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_serialize(self):
"end_date": "2020-06-26T00:00:00+00:00",
"execution_timeout": None,
"extra_links": [],
"owner": "airflow",
"owner": None,
"operator_name": "EmptyOperator",
"params": {},
"pool": "default_pool",
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_serialize(self):
"execution_timeout": None,
"extra_links": [],
"operator_name": "EmptyOperator",
"owner": "airflow",
"owner": None,
"params": {
"foo": {
"__class": "airflow.models.param.Param",
Expand Down
Loading