Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FTPSFileTransmitOperator #28318

Merged
merged 1 commit into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion airflow/providers/ftp/operators/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.compat.functools import cached_property
from airflow.models import BaseOperator
from airflow.providers.ftp.hooks.ftp import FTPHook
from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook


class FTPOperation:
Expand Down Expand Up @@ -130,3 +130,20 @@ def execute(self, context: Any) -> str | list[str] | None:
self.log.info("Starting to transfer file %s", file_msg)
self.hook.store_file(remote_filepath, local_filepath)
return self.local_filepath


class FTPSFileTransmitOperator(FTPFileTransmitOperator):
"""
FTPSFileTransmitOperator for transferring files from remote host to local or vice a versa.
This operator uses an FTPSHook to open ftps transport channel that serve as basis
for file transfer.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:FTPSFileTransmitOperator`
"""

@cached_property
def hook(self) -> FTPSHook:
"""Create and return an FTPSHook."""
return FTPSHook(ftp_conn_id=self.ftp_conn_id)
32 changes: 31 additions & 1 deletion docs/apache-airflow-providers-ftp/operators/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ FTPFileTransmitOperator


Use the FTPFileTransmitOperator to get or
pull files to/from an FTP server.
put files to/from an FTP server.

Using the Operator
^^^^^^^^^^^^^^^^^^
Expand All @@ -46,3 +46,33 @@ The below example shows how to use the FTPFileTransmitOperator to pull a file fr
:dedent: 4
:start-after: [START howto_operator_ftp_get]
:end-before: [END howto_operator_ftp_get]

.. _howto/operator:FTPSFileTransmitOperator:

FTPSFileTransmitOperator
=========================


Use the FTPSFileTransmitOperator to get or
put files to/from an FTPS server.

Using the Operator
^^^^^^^^^^^^^^^^^^

For parameter definition take a look at :class:`~airflow.providers.ftp.operators.FTPSFileTransmitOperator`.

The below example shows how to use the FTPSFileTransmitOperator to transfer a locally stored file to a remote FTPS Server:

.. exampleinclude:: /../../tests/system/providers/ftp/example_ftp.py
:language: python
:dedent: 4
:start-after: [START howto_operator_ftps_put]
:end-before: [END howto_operator_ftps_put]

The below example shows how to use the FTPSFileTransmitOperator to pull a file from a remote FTPS Server.

.. exampleinclude:: /../../tests/system/providers/ftp/example_ftp.py
:language: python
:dedent: 4
:start-after: [START howto_operator_ftps_get]
:end-before: [END howto_operator_ftps_get]
150 changes: 122 additions & 28 deletions tests/providers/ftp/operators/test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
# under the License.
from __future__ import annotations

import os
from unittest import mock

import pytest

from airflow.models import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.providers.ftp.operators.ftp import (
FTPFileTransmitOperator,
FTPOperation,
FTPSFileTransmitOperator,
)
from airflow.utils.timezone import datetime

DEFAULT_DATE = datetime(2017, 1, 1)
Expand All @@ -32,21 +35,16 @@

class TestFTPFileTransmitOperator:
def setup_method(self):
self.test_local_dir = "/tmp"
self.test_local_dir_int = "/tmp/interdir"
self.test_local_dir = "ftptmp"
self.test_remote_dir = "/ftphome"
self.test_remote_dir_int = "/ftphome/interdir"
self.test_local_filename = "test_local_file"
self.test_remote_filename = "test_remote_file"
self.test_local_filepath = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath = f"{self.test_remote_dir}/{self.test_remote_filename}"
self.test_local_filepath_int_dir = f"{self.test_local_dir_int}/{self.test_local_filename}"
self.test_local_filepath_int_dir = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath_int_dir = f"{self.test_remote_dir_int}/{self.test_remote_filename}"

def teardown_method(self):
if os.path.exists(self.test_local_dir_int):
os.rmdir(self.test_local_dir_int)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.create_directory")
def test_file_transfer_put(self, mock_create_dir, mock_put):
Expand All @@ -58,9 +56,8 @@ def test_file_transfer_put(self, mock_create_dir, mock_put):
operation=FTPOperation.PUT,
)
ftp_op.execute(None)
assert mock_put.call_count == 1
assert not mock_create_dir.called
mock_put.assert_called_with(self.test_remote_filepath, self.test_local_filepath)
mock_put.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.create_directory")
Expand All @@ -74,9 +71,8 @@ def test_file_transfer_with_intermediate_dir_put(self, mock_create_dir, mock_put
create_intermediate_dirs=True,
)
ftp_op.execute(None)
assert mock_put.call_count == 1
mock_create_dir.assert_called_with(self.test_remote_dir_int)
mock_put.assert_called_with(self.test_remote_filepath_int_dir, self.test_local_filepath)
mock_put.assert_called_once_with(self.test_remote_filepath_int_dir, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_file_transfer_get(self, mock_get):
Expand All @@ -88,23 +84,23 @@ def test_file_transfer_get(self, mock_get):
operation=FTPOperation.GET,
)
ftp_op.execute(None)
assert mock_get.call_count == 1
mock_get.assert_called_with(self.test_remote_filepath, self.test_local_filepath)
mock_get.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_file_transfer_with_intermediate_dir_get(self, mock_get):
def test_file_transfer_with_intermediate_dir_get(self, mock_get, tmp_path):
ftp_op = FTPFileTransmitOperator(
task_id="test_ftp_get_imm_dirs",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath_int_dir,
local_filepath=str(tmp_path / self.test_local_filepath_int_dir),
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
ftp_op.execute(None)
assert mock_get.call_count == 1
assert os.path.exists(self.test_local_dir_int)
mock_get.assert_called_with(self.test_remote_filepath, self.test_local_filepath_int_dir)
assert len(list(tmp_path.iterdir())) == 1
mock_get.assert_called_once_with(
self.test_remote_filepath, str(tmp_path / self.test_local_filepath_int_dir)
)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_multiple_paths_get(self, mock_get):
Expand All @@ -119,10 +115,8 @@ def test_multiple_paths_get(self, mock_get):
)
ftp_op.execute(None)
assert mock_get.call_count == 2
args0, _ = mock_get.call_args_list[0]
args1, _ = mock_get.call_args_list[1]
assert args0 == (remote_filepath[0], local_filepath[0])
assert args1 == (remote_filepath[1], local_filepath[1])
for count, (args, _) in enumerate(mock_get.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
def test_multiple_paths_put(self, mock_put):
Expand All @@ -137,10 +131,8 @@ def test_multiple_paths_put(self, mock_put):
)
ftp_op.execute(None)
assert mock_put.call_count == 2
args0, _ = mock_put.call_args_list[0]
args1, _ = mock_put.call_args_list[1]
assert args0 == (remote_filepath[0], local_filepath[0])
assert args1 == (remote_filepath[1], local_filepath[1])
for count, (args, _) in enumerate(mock_put.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
def test_arg_checking(self, mock_put):
Expand Down Expand Up @@ -184,3 +176,105 @@ def test_unequal_local_remote_file_paths(self):
local_filepath=["/tmp/test1", "/tmp/test2"],
remote_filepath="/tmp/test1",
)


class TestFTPSFileTransmitOperator:
def setup_method(self):
self.test_local_dir = "ftpstmp"
self.test_remote_dir = "/ftpshome"
self.test_remote_dir_int = "/ftpshome/interdir"
self.test_local_filename = "test_local_file"
self.test_remote_filename = "test_remote_file"
self.test_local_filepath = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath = f"{self.test_remote_dir}/{self.test_remote_filename}"
self.test_local_filepath_int_dir = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath_int_dir = f"{self.test_remote_dir_int}/{self.test_remote_filename}"

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.create_directory")
def test_file_transfer_put(self, mock_create_dir, mock_put):
ftps_op = FTPSFileTransmitOperator(
task_id="test_ftps_put",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.PUT,
)
ftps_op.execute(None)
assert not mock_create_dir.called
mock_put.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.create_directory")
def test_file_transfer_with_intermediate_dir_put(self, mock_create_dir, mock_put):
ftps_op = FTPSFileTransmitOperator(
task_id="test_ftps_put_imm_dirs",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath_int_dir,
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
)
ftps_op.execute(None)
mock_create_dir.assert_called_with(self.test_remote_dir_int)
mock_put.assert_called_once_with(self.test_remote_filepath_int_dir, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.retrieve_file")
def test_file_transfer_get(self, mock_get):
ftps_op = FTPSFileTransmitOperator(
task_id="test_ftps_get",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.GET,
)
ftps_op.execute(None)
mock_get.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_file_transfer_with_intermediate_dir_get(self, mock_get, tmp_path):
ftp_op = FTPFileTransmitOperator(
task_id="test_ftp_get_imm_dirs",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=str(tmp_path / self.test_local_filepath_int_dir),
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
ftp_op.execute(None)
assert len(list(tmp_path.iterdir())) == 1
mock_get.assert_called_once_with(
self.test_remote_filepath, str(tmp_path / self.test_local_filepath_int_dir)
)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.retrieve_file")
def test_multiple_paths_get(self, mock_get):
local_filepath = ["/tmp/ltest1", "/tmp/ltest2"]
remote_filepath = ["/tmp/rtest1", "/tmp/rtest2"]
ftps_op = FTPSFileTransmitOperator(
task_id="test_multiple_paths_get",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=local_filepath,
remote_filepath=remote_filepath,
operation=FTPOperation.GET,
)
ftps_op.execute(None)
assert mock_get.call_count == 2
for count, (args, _) in enumerate(mock_get.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.store_file")
def test_multiple_paths_put(self, mock_put):
local_filepath = ["/tmp/ltest1", "/tmp/ltest2"]
remote_filepath = ["/tmp/rtest1", "/tmp/rtest2"]
ftps_op = FTPSFileTransmitOperator(
task_id="test_multiple_paths_put",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=local_filepath,
remote_filepath=remote_filepath,
operation=FTPOperation.PUT,
)
ftps_op.execute(None)
assert mock_put.call_count == 2
for count, (args, _) in enumerate(mock_put.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])
35 changes: 31 additions & 4 deletions tests/system/providers/ftp/example_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the FTPFileTransmitOperator.
This is an example dag for using the FTPFileTransmitOperator and FTPSFileTransmitOperator.
"""
from __future__ import annotations

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.providers.ftp.operators.ftp import (
FTPFileTransmitOperator,
FTPOperation,
FTPSFileTransmitOperator,
)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_ftp_put_get"
DAG_ID = "example_ftp_ftps_put_get"

with DAG(
DAG_ID,
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "Ftp", "FtpFileTransmit"],
tags=["example", "Ftp", "FtpFileTransmit", "Ftps", "FtpsFileTransmit"],
) as dag:
# [START howto_operator_ftp_put]
ftp_put = FTPFileTransmitOperator(
Expand All @@ -57,7 +61,30 @@
)
# [END howto_operator_ftp_get]

# [START howto_operator_ftps_put]
ftps_put = FTPSFileTransmitOperator(
task_id="test_ftps_put",
ftp_conn_id="ftps_default",
local_filepath="/tmp/filepath",
remote_filepath="/remote_tmp/filepath",
operation=FTPOperation.PUT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to double check this is intentional and not supposed to be FTPSOperation.PUT

Copy link
Contributor Author

@RachitSharma2001 RachitSharma2001 Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is intentional, as FTPS still uses the same operations (PUT and GET) as FTPFileTransmitOperator. On second thought, I was wondering if that name is misleading. Do you think I should change the name to something like Operation or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be alright.

create_intermediate_dirs=True,
)
# [END howto_operator_ftps_put]

# [START howto_operator_ftps_get]
ftps_get = FTPSFileTransmitOperator(
task_id="test_ftps_get",
ftp_conn_id="ftps_default",
local_filepath="/tmp/filepath",
remote_filepath="/remote_tmp/filepath",
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
# [END howto_operator_ftps_get]

ftp_put >> ftp_get
ftps_put >> ftps_get

from tests.system.utils.watcher import watcher

Expand Down