Skip to content

Commit

Permalink
✨ Add Sftp source and Prefect tasks (#1039)
Browse files Browse the repository at this point in the history
* ✨ added sftp source file.

* ✨ added sftp task file.

* ✨ added salesforce flow file.

* ✨ added integration test file.

* ✨ added unit test file.

* 📌 added paramiko package in pyproject.

* 📝 updated docstrings.

* ✅ updated unit test file.

* 📝 updated commented code.

* sftplist bug related to search patterns, solved and columns packed issue solved

* 🐛 fixed a raise error bug.

* ⚡️ removed unusded function.

* ✅ updated integration test folder.

* 📝 updated comments.

* 📝 added feedback.

* 📝 updated comments.

* 🚧 Modified `rsa_key` description

* 🔥 Removed `credentials` param

* ♻️ Changed function name from `_get_file_object_file` to `_get_file_object`

* ♻️ Changed function name from `_list_directory` to `_ls`

* ♻️ Changed the way of handling file listing and recursive option

* ✅ Cleaned up tests for SFTP

* 🐛 Changed imported class name from `SftpCredentials` to `Sftp`

* ♻️ Adjusted `_ls` function

* ✅ Modified unit tests

* ⚡️ Removed `time.sleep()` from the `sftp` task

* 🐛 Added missing comma

* 🚧 Added requirements

* ✅ Updated tests and _ls function

* 🔥 Removed integration tests for SFTP

* 🎨 Formatted the code

* 🎨 Added `allowlist-secret`

* Update tests/unit/test_sftp.py

Co-authored-by: Michał Zawadzki <[email protected]>

* 🎨 Moved `pytest-mock` to dev-dependencies

* removed code

* 🐛 Updated `dummy_rsa_key` value

* ✅ Added tests to SFTP source

* 🎨 Removed extra lines

* 🚧Added `noqa`

* 🚧 Added `pragma: allowlist secret`

* 🔥 Removed `noqa: RUF100, S608`

---------

Co-authored-by: fdelgadodyvenia <[email protected]>
Co-authored-by: rziemianek <[email protected]>
Co-authored-by: Rafał Ziemianek <[email protected]>
Co-authored-by: Michał Zawadzki <[email protected]>
  • Loading branch information
5 people authored Sep 27, 2024
1 parent eb29241 commit 3de17df
Show file tree
Hide file tree
Showing 10 changed files with 680 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ dependencies = [
"pyarrow>=10.0, <10.1.0",
# numpy>=2.0 is not compatible with the old pyarrow v10.x.
"numpy>=1.23.4, <2.0",
"paramiko==2.11.0",
"defusedxml>=0.7.1",
"aiohttp>=3.10.5",
"pytest-mock>=3.14.0",
]
]
requires-python = ">=3.10"
readme = "README.md"

Expand Down Expand Up @@ -84,6 +84,7 @@ dev-dependencies = [
"ruff>=0.6.6",
"pytest-asyncio>=0.23.8",
"moto>=5.0.13",
"pytest-mock>=3.14.0",
]

[tool.pytest.ini_options]
Expand Down
9 changes: 9 additions & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ attrs==24.2.0
# via visions
babel==2.16.0
# via mkdocs-material
bcrypt==4.2.0
# via paramiko
beautifulsoup4==4.12.3
# via mkdocs-mermaid2-plugin
# via nbconvert
Expand Down Expand Up @@ -73,6 +75,7 @@ cffi==1.17.0
# via cairocffi
# via cryptography
# via pygit2
# via pynacl
charset-normalizer==3.3.2
# via requests
click==8.1.7
Expand All @@ -97,6 +100,7 @@ croniter==2.0.7
# via prefect
cryptography==43.0.0
# via moto
# via paramiko
# via prefect
cssselect2==0.7.0
# via cairosvg
Expand Down Expand Up @@ -362,6 +366,8 @@ pandas==2.2.2
# via visions
pandocfilters==1.5.1
# via nbconvert
paramiko==2.11.0
# via viadot2
parso==0.8.4
# via jedi
pathspec==0.12.1
Expand Down Expand Up @@ -426,6 +432,8 @@ pymdown-extensions==10.9
# via mkdocs-material
# via mkdocs-mermaid2-plugin
# via mkdocstrings
pynacl==1.5.0
# via paramiko
pyodbc==5.1.0
# via viadot2
pyparsing==3.1.2
Expand Down Expand Up @@ -553,6 +561,7 @@ six==1.16.0
# via bleach
# via jsbeautifier
# via kubernetes
# via paramiko
# via python-dateutil
# via rfc3339-validator
smmap==5.0.1
Expand Down
9 changes: 9 additions & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ attrs==24.2.0
# via jsonschema
# via referencing
# via visions
bcrypt==4.2.0
# via paramiko
beautifulsoup4==4.12.3
# via o365
cachetools==5.5.0
Expand All @@ -52,6 +54,7 @@ certifi==2024.7.4
cffi==1.17.0
# via cryptography
# via pygit2
# via pynacl
charset-normalizer==3.3.2
# via requests
click==8.1.7
Expand All @@ -68,6 +71,7 @@ coolname==2.2.0
croniter==2.0.7
# via prefect
cryptography==43.0.0
# via paramiko
# via prefect
dateparser==1.2.0
# via prefect
Expand Down Expand Up @@ -192,6 +196,8 @@ packaging==24.1
pandas==2.2.2
# via viadot2
# via visions
paramiko==2.11.0
# via viadot2
pathspec==0.12.1
# via prefect
pendulum==2.1.2
Expand Down Expand Up @@ -229,6 +235,8 @@ pygit2==1.14.1
# via viadot2
pygments==2.18.0
# via rich
pynacl==1.5.0
# via paramiko
pyodbc==5.1.0
# via viadot2
pytest==8.3.3
Expand Down Expand Up @@ -315,6 +323,7 @@ shellingham==1.5.4
# via typer
six==1.16.0
# via kubernetes
# via paramiko
# via python-dateutil
# via rfc3339-validator
sniffio==1.3.1
Expand Down
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .outlook_to_adls import outlook_to_adls
from .sap_to_parquet import sap_to_parquet
from .sap_to_redshift_spectrum import sap_to_redshift_spectrum
from .sftp_to_adls import sftp_to_adls
from .sharepoint_to_adls import sharepoint_to_adls
from .sharepoint_to_databricks import sharepoint_to_databricks
from .sharepoint_to_redshift_spectrum import sharepoint_to_redshift_spectrum
Expand Down Expand Up @@ -44,6 +45,7 @@
"outlook_to_adls",
"sap_to_parquet",
"sap_to_redshift_spectrum",
"sftp_to_adls",
"sharepoint_to_adls",
"sharepoint_to_databricks",
"sharepoint_to_redshift_spectrum",
Expand Down
64 changes: 64 additions & 0 deletions src/viadot/orchestration/prefect/flows/sftp_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Download data from a SFTP server to Azure Data Lake Storage."""

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import df_to_adls, sftp_to_df


@flow(
name="SFTP extraction to ADLS",
description="Extract data from a SFTP server and "
+ "load it into Azure Data Lake Storage.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def sftp_to_adls(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
file_name: str | None = None,
sep: str = "\t",
columns: list[str] | None = None,
adls_config_key: str | None = None,
adls_azure_key_vault_secret: str | None = None,
adls_path: str | None = None,
adls_path_overwrite: bool = False,
) -> None:
r"""Flow to download data from a SFTP server to Azure Data Lake.
Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
file_name (str, optional): Path to the file in SFTP server. Defaults to None.
sep (str, optional): The separator to use to read the CSV file.
Defaults to "\t".
columns (List[str], optional): Columns to read from the file. Defaults to None.
adls_config_key (str, optional): The key in the viadot config holding
relevant credentials. Defaults to None.
adls_azure_key_vault_secret (str, optional): The name of the Azure Key
Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
adls_path (str, optional): Azure Data Lake destination file path
(with file name). Defaults to None.
adls_path_overwrite (bool, optional): Whether to overwrite the file in ADLS.
Defaults to True.
"""
data_frame = sftp_to_df(
config_key=config_key,
azure_key_vault_secret=azure_key_vault_secret,
file_name=file_name,
sep=sep,
columns=columns,
)

return df_to_adls(
df=data_frame,
path=adls_path,
credentials_secret=adls_azure_key_vault_secret,
config_key=adls_config_key,
overwrite=adls_path_overwrite,
)
3 changes: 3 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .redshift_spectrum import df_to_redshift_spectrum
from .s3 import s3_upload_file
from .sap_rfc import sap_rfc_to_df
from .sftp import sftp_list, sftp_to_df
from .sharepoint import sharepoint_download_file, sharepoint_to_df
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df
Expand Down Expand Up @@ -47,6 +48,8 @@
"outlook_to_df",
"s3_upload_file",
"sap_rfc_to_df",
"sftp_list",
"sftp_to_df",
"sharepoint_download_file",
"sharepoint_to_df",
"sql_server_query",
Expand Down
88 changes: 88 additions & 0 deletions src/viadot/orchestration/prefect/tasks/sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Tasks from SFTP API."""

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Sftp


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sftp_to_df(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
file_name: str | None = None,
sep: str = "\t",
columns: list[str] | None = None,
) -> pd.DataFrame:
r"""Querying SFTP server and saving data as the data frame.
Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
file_name (str, optional): Path to the file in SFTP server. Defaults to None.
sep (str, optional): The separator to use to read the CSV file.
Defaults to "\t".
columns (List[str], optional): Columns to read from the file. Defaults to None.
Returns:
pd.DataFrame: The response data as a pandas DataFrame.
"""
if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

sftp = Sftp(
credentials=credentials,
config_key=config_key,
)
sftp.get_connection()

return sftp.to_df(file_name=file_name, sep=sep, columns=columns)


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def sftp_list(
config_key: str | None = None,
azure_key_vault_secret: str | None = None,
path: str | None = None,
recursive: bool = False,
matching_path: str | None = None,
) -> list[str]:
"""Listing files in the SFTP server.
Args:
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
azure_key_vault_secret (str, optional): The name of the Azure Key Vault secret
where credentials are stored. Defaults to None.
path (str, optional): Destination path from where to get the structure.
Defaults to None.
recursive (bool, optional): Get the structure in deeper folders.
Defaults to False.
matching_path (str, optional): Filtering folders to return by a regex pattern.
Defaults to None.
Returns:
files_list (list[str]): List of files in the SFTP server.
"""
if not (azure_key_vault_secret or config_key):
raise MissingSourceCredentialsError

if not config_key:
credentials = get_credentials(azure_key_vault_secret)

sftp = Sftp(
credentials=credentials,
config_key=config_key,
)
sftp.get_connection()

return sftp.get_files_list(
path=path, recursive=recursive, matching_path=matching_path
)
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .hubspot import Hubspot
from .mindful import Mindful
from .outlook import Outlook
from .sftp import Sftp
from .sharepoint import Sharepoint
from .sql_server import SQLServer
from .supermetrics import Supermetrics, SupermetricsCredentials
Expand All @@ -26,6 +27,7 @@
"Genesys",
"Hubspot",
"Mindful",
"Sftp",
"Outlook",
"SQLServer",
"Sharepoint",
Expand Down
Loading

0 comments on commit 3de17df

Please sign in to comment.