Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DSIP-13][python] New mechanism file plugins to Python API #11360

Merged
merged 76 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
99afedc
tem store
xdu-chenrj Jul 29, 2022
1469f40
tem store
xdu-chenrj Jul 29, 2022
0eddb15
tem store
xdu-chenrj Jul 29, 2022
0c3f063
temporary store
xdu-chenrj Jul 29, 2022
1d7b747
zancun
xdu-chenrj Aug 4, 2022
6d68f9c
fix test error
zhongjiajie Aug 4, 2022
3b8697a
local res
xdu-chenrj Aug 8, 2022
d5150e6
local_res
xdu-chenrj Aug 8, 2022
d1633cc
Merge branch 'soc' of https://github.com/xdu-chenrj/dolphinscheduler …
xdu-chenrj Aug 8, 2022
90ae557
local_res_plugin
xdu-chenrj Aug 8, 2022
1546570
local_resource_plugin
xdu-chenrj Aug 8, 2022
dc34667
local_resource_plugin
xdu-chenrj Aug 8, 2022
bd7b506
Update dolphinscheduler-python/pydolphinscheduler/src/pydolphinschedu…
xdu-chenrj Aug 9, 2022
579163b
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Aug 10, 2022
91416db
fix conversation 1
xdu-chenrj Aug 10, 2022
da119fa
fix conversation 3
xdu-chenrj Aug 10, 2022
99a17c2
Update dolphinscheduler-python/pydolphinscheduler/src/pydolphinschedu…
xdu-chenrj Aug 10, 2022
3c7e2cb
Merge branch 'soc' of https://github.com/xdu-chenrj/dolphinscheduler …
xdu-chenrj Aug 10, 2022
a590f55
fix conversation 5
xdu-chenrj Aug 10, 2022
ccd0dea
fix conversation 6
xdu-chenrj Aug 10, 2022
b6d00c4
Update dolphinscheduler-python/pydolphinscheduler/docs/source/resourc…
xdu-chenrj Aug 10, 2022
242ebe8
fix conversation 7
xdu-chenrj Aug 10, 2022
7e080f1
fix conversation 8
xdu-chenrj Aug 10, 2022
05fcedc
Update dolphinscheduler-python/pydolphinscheduler/src/pydolphinschedu…
xdu-chenrj Aug 10, 2022
bbaa7c2
fix conversation 9
xdu-chenrj Aug 10, 2022
5f002d3
fix conversation 10
xdu-chenrj Aug 11, 2022
cb7b9b8
fix conversation 9 and 14
xdu-chenrj Aug 11, 2022
ed8a0f1
Update dolphinscheduler-python/pydolphinscheduler/src/pydolphinschedu…
xdu-chenrj Aug 11, 2022
68dc230
fix conversation 15
xdu-chenrj Aug 11, 2022
7229187
fix conversation 14 and 17
xdu-chenrj Aug 11, 2022
9f00ee7
fix conversation 14 and 17
xdu-chenrj Aug 11, 2022
5b85ff9
fix remote conflicts
xdu-chenrj Aug 11, 2022
2333ff1
Update dolphinscheduler-python/pydolphinscheduler/docs/source/resourc…
xdu-chenrj Aug 11, 2022
e9c311b
fix conversation 19
xdu-chenrj Aug 11, 2022
93f3742
Merge branch 'soc' of https://github.com/xdu-chenrj/dolphinscheduler …
xdu-chenrj Aug 11, 2022
ddc6e77
revert tutorial.py
xdu-chenrj Aug 11, 2022
911632a
fix local.rst
xdu-chenrj Aug 11, 2022
35d4b58
fix develop.rst
xdu-chenrj Aug 11, 2022
2bbac8b
fix task.py
xdu-chenrj Aug 11, 2022
a0ac326
fix develop.rst,how-to-use.rst,index.rst,resource-plugin.rst and Reso…
xdu-chenrj Aug 11, 2022
ade9eb7
fix ResourcePlugin and its test
xdu-chenrj Aug 11, 2022
6aec0eb
fix ResourcePlugin and its test
xdu-chenrj Aug 11, 2022
e9aa2d4
fix error exception
xdu-chenrj Aug 11, 2022
faf7860
fix import order
xdu-chenrj Aug 11, 2022
93c51c5
format code style
xdu-chenrj Aug 11, 2022
20acc0a
reformat files
xdu-chenrj Aug 11, 2022
4ba1c84
fix tests/resource_plugin missing a valid license header
xdu-chenrj Aug 11, 2022
f66d877
merge
xdu-chenrj Aug 11, 2022
41f559b
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Aug 12, 2022
21c43f0
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Aug 13, 2022
af167a4
fix black code style
xdu-chenrj Aug 13, 2022
182df27
fix flake8 code style
xdu-chenrj Aug 13, 2022
45cad6d
fix some test
xdu-chenrj Aug 13, 2022
44c7b22
fix tox e doc build
xdu-chenrj Aug 13, 2022
2205336
fix test_shell
xdu-chenrj Aug 14, 2022
61176b5
fix ci error
zhongjiajie Aug 14, 2022
4167a0a
fix ci error
zhongjiajie Aug 14, 2022
2726a2b
fix windows tmp file cannot be written
xdu-chenrj Aug 14, 2022
3c92777
fix windows tmp file cannot be written
xdu-chenrj Aug 14, 2022
663cbb2
fix tutorial_resource_plugin
xdu-chenrj Aug 14, 2022
fd37037
try to fix error
zhongjiajie Aug 15, 2022
9c77bf4
remove unnessnary change
xdu-chenrj Aug 15, 2022
5494243
fix tutorial_resource_plugin describe
xdu-chenrj Aug 15, 2022
73c12eb
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Aug 15, 2022
8f48bfb
remove unnessnary change
xdu-chenrj Aug 16, 2022
e8f0a2a
remove unnessnary change
xdu-chenrj Aug 16, 2022
8f6b999
Merge branch 'soc' of https://github.com/xdu-chenrj/dolphinscheduler …
xdu-chenrj Aug 16, 2022
5c98bf7
remove unnessary changes
xdu-chenrj Aug 16, 2022
31529fa
temporary storary
xdu-chenrj Aug 18, 2022
e76c1a7
temporary storary
xdu-chenrj Aug 18, 2022
80f9655
Merge branch 'soc' of https://github.com/xdu-chenrj/dolphinscheduler …
xdu-chenrj Aug 20, 2022
235f085
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Aug 20, 2022
fed8ef9
restructure
xdu-chenrj Aug 20, 2022
9a938ac
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Aug 28, 2022
170e683
Merge remote-tracking branch 'upstream/dev' into soc
xdu-chenrj Sep 6, 2022
e0bdcff
Modified RST file: develop and resource-plugin
xdu-chenrj Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ then go and see :doc:`tutorial` for more detail.
cli
config
api
resources_plugin/index

Indices and tables
==================
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

How to develop
==============

When you want to create a new resource plugin, you need to add a new class in the module `resources_plugin`.

The resource plug-in class needs to inherit the abstract class `ResourcePlugin` and implement its abstract method `read_file` function.

The parameter of the `__init__` function of `ResourcePlugin` is the prefix of STR type. You can override this function when necessary.

The `read_file` function parameter of `ResourcePlugin` is the file suffix of STR type, and its return value is the file content, if it exists and is readable.


Example
-------
- Method `__init__`: Initiation method with `param`:`prefix`

.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
:start-after: [start init_method]
:end-before: [end init_method]

- Method `read_file`: Get content from the given URI, The function parameter is the suffix of the file path.

The file prefix has been initialized in init of the resource plug-in.

The prefix plus suffix is the absolute path of the file in this resource.

.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
:start-after: [start read_file_method]
:end-before: [end read_file_method]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Resources_plugin
================

In this section

.. toctree::
:maxdepth: 1

develop
resource-plugin
local
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Local
=====

`Local` is a local resource plugin for pydolphinscheduler.

When using a local resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition,
such as `resource_plugin=Local("/tmp")`.


For the specific use of resource plugins, you can see `How to use` in :doc:`./resource-plugin`

Dive Into
---------

.. automodule:: pydolphinscheduler.resources_plugin.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

ResourcePlugin
==============

`ResourcePlugin` is an abstract class of resource plug-in parameters of task subclass and workflow.
All resource plugins need to inherit and override its abstract methods.

Code
----
.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
:start-after: [start resource_plugin_definition]
:end-before: [end resource_plugin_definition]

Dive Into
---------
It has the following key functions.

- Method `__init__`: The `__init__` function has STR type parameter `prefix`, which means the prefix of the resource.

You can rewrite this function if necessary.

.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
:start-after: [start init_method]
:end-before: [end init_method]

- Method `read_file`: Get content from the given URI, The function parameter is the suffix of the file path.

The file prefix has been initialized in init of the resource plug-in.

The prefix plus suffix is the absolute path of the file in this resource.

It is an abstract function. You must rewrite it

.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
:start-after: [start abstractmethod read_file]
:end-before: [end abstractmethod read_file]

.. automodule:: pydolphinscheduler.core.resource_plugin

How to use
----------
Resource plug-ins can be used in task subclasses and workflows. You can use the resource plug-ins by adding the `resource_plugin` parameter when they are initialized.
For example, local resource plug-ins, add `resource_plugin = Local("/tmp")`.

The resource plug-ins we currently support is `local`.

Here is an example.

.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_resource_plugin.py
:start-after: [start workflow_declare]
:end-before: [end task_declare]

When the resource_plugin parameter is defined in both the task subclass and the workflow, the resource_plugin defined in the task subclass is used first.

If the task subclass does not define resource_plugin, but the resource_plugin is defined in the workflow, the resource_plugin in the workflow is used.

Of course, if neither the task subclass nor the workflow specifies resource_plugin, the command at this time will be executed as a script,

in other words, we are forward compatible.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pydolphinscheduler import configuration
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base, Project, Tenant, User
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
resource_list: Optional[List[Resource]] = None,
):
super().__init__(name, description)
Expand All @@ -134,6 +136,7 @@ def __init__(
self._release_state = release_state
self.param = param
self.tasks: dict = {}
self.resource_plugin = resource_plugin
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""DolphinScheduler ResourcePlugin object."""


from abc import ABCMeta, abstractmethod


# [start resource_plugin_definition]
class ResourcePlugin(object, metaclass=ABCMeta):
"""ResourcePlugin object, declare resource plugin for task and workflow to dolphinscheduler.

:param prefix: A string representing the prefix of ResourcePlugin.

"""

# [start init_method]
def __init__(self, prefix: str, *args, **kwargs):
self.prefix = prefix

# [end init_method]

# [start abstractmethod read_file]
@abstractmethod
def read_file(self, suf: str):
"""Get the content of the file.

The address of the file is the prefix of the resource plugin plus the parameter suf.
"""

# [end abstractmethod read_file]


# [end resource_plugin_definition]
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
ProcessDefinitionContext,
)
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException
from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base

Expand Down Expand Up @@ -101,6 +102,9 @@ class Task(Base):

_task_custom_attr: set = set()

ext: set = None
ext_attr: str = None

DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}

def __init__(
Expand All @@ -124,6 +128,7 @@ def __init__(
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
):

super().__init__(name, description)
Expand Down Expand Up @@ -166,6 +171,8 @@ def __init__(
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
self.resource_plugin = resource_plugin
self.get_content()

@property
def process_definition(self) -> Optional[ProcessDefinition]:
Expand Down Expand Up @@ -229,6 +236,44 @@ def task_params(self) -> Optional[Dict]:
custom_attr |= self._task_custom_attr
return self.get_define_custom(custom_attr=custom_attr)

def get_plugin(self):
"""Return the resource plug-in.

according to parameter resource_plugin and parameter
process_definition.resource_plugin.
"""
if self.resource_plugin is None:
if self.process_definition.resource_plugin is not None:
return self.process_definition.resource_plugin
else:
raise PyResPluginException(
"The execution command of this task is a file, but the resource plugin is empty"
)
else:
return self.resource_plugin

def get_content(self):
"""Get the file content according to the resource plugin."""
if self.ext_attr is None and self.ext is None:
return

_ext_attr = getattr(self, self.ext_attr)

if _ext_attr is not None:
if _ext_attr.endswith(tuple(self.ext)):
res = self.get_plugin()
content = res.read_file(_ext_attr)
setattr(self, self.ext_attr.lstrip("_"), content)
else:
index = _ext_attr.rfind(".")
if index != -1:
raise ValueError(
"This task does not support files with suffix {}, only supports {}".format(
_ext_attr[index:], ",".join(str(suf) for suf in self.ext)
)
)
setattr(self, self.ext_attr.lstrip("_"), _ext_attr)

def __hash__(self):
return hash(self.code)

Expand Down
Loading