From 3fe948a860a6eed2ee51a6f1be658a3ba260683f Mon Sep 17 00:00:00 2001 From: john-jac <75442233+john-jac@users.noreply.github.com> Date: Wed, 8 Sep 2021 12:40:40 -0700 Subject: [PATCH] sftp_to_s3 stream file option (#17609) --- .../providers/amazon/aws/transfers/sftp_to_s3.py | 15 ++++++++++++--- .../amazon/aws/transfers/test_sftp_to_s3.py | 10 +++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/sftp_to_s3.py b/airflow/providers/amazon/aws/transfers/sftp_to_s3.py index dad2d8540852b..f54aec4ea88c7 100644 --- a/airflow/providers/amazon/aws/transfers/sftp_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/sftp_to_s3.py @@ -47,6 +47,9 @@ class SFTPToS3Operator(BaseOperator): :param s3_key: The targeted s3 key. This is the specified path for uploading the file to S3. :type s3_key: str + :param use_temp_file: If True, copies file first to local, + if False streams file from SFTP to S3. + :type use_temp_file: bool """ template_fields = ('s3_key', 'sftp_path') @@ -59,6 +62,7 @@ def __init__( sftp_path: str, sftp_conn_id: str = 'ssh_default', s3_conn_id: str = 'aws_default', + use_temp_file: bool = True, **kwargs, ) -> None: super().__init__(**kwargs) @@ -67,6 +71,7 @@ def __init__( self.s3_bucket = s3_bucket self.s3_key = s3_key self.s3_conn_id = s3_conn_id + self.use_temp_file = use_temp_file @staticmethod def get_s3_key(s3_key: str) -> str: @@ -81,7 +86,11 @@ def execute(self, context) -> None: sftp_client = ssh_hook.get_conn().open_sftp() - with NamedTemporaryFile("w") as f: - sftp_client.get(self.sftp_path, f.name) + if self.use_temp_file: + with NamedTemporaryFile("w") as f: + sftp_client.get(self.sftp_path, f.name) - s3_hook.load_file(filename=f.name, key=self.s3_key, bucket_name=self.s3_bucket, replace=True) + s3_hook.load_file(filename=f.name, key=self.s3_key, bucket_name=self.s3_bucket, replace=True) + else: + with sftp_client.file(self.sftp_path, mode='rb') as data: + s3_hook.get_conn().upload_fileobj(data, self.s3_bucket, self.s3_key, Callback=self.log.info) diff --git a/tests/providers/amazon/aws/transfers/test_sftp_to_s3.py b/tests/providers/amazon/aws/transfers/test_sftp_to_s3.py index 8a62bf2b5bb33..b24c89b7f8d59 100644 --- a/tests/providers/amazon/aws/transfers/test_sftp_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_sftp_to_s3.py @@ -20,6 +20,7 @@ import boto3 from moto import mock_s3 +from parameterized import parameterized from airflow.models import DAG from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -66,9 +67,15 @@ def setUp(self): self.sftp_path = SFTP_PATH self.s3_key = S3_KEY + @parameterized.expand( + [ + (True,), + (False,), + ] + ) @mock_s3 @conf_vars({('core', 'enable_xcom_pickling'): 'True'}) - def test_sftp_to_s3_operation(self): + def test_sftp_to_s3_operation(self, use_temp_file=True): # Setting test_remote_file_content = ( "This is remote file content \n which is also multiline " @@ -98,6 +105,7 @@ def test_sftp_to_s3_operation(self): sftp_path=SFTP_PATH, sftp_conn_id=SFTP_CONN_ID, s3_conn_id=S3_CONN_ID, + use_temp_file=use_temp_file, task_id='test_sftp_to_s3', dag=self.dag, )