Skip to content

Commit

Permalink
Rename FileToWasbOperator to LocalFilesystemToWasbOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
eladkal committed Sep 9, 2021
1 parent 6acb9e1 commit 1f9fe99
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 71 deletions.
26 changes: 20 additions & 6 deletions airflow/contrib/operators/file_to_wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,31 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module is deprecated.
Please use :mod:`airflow.providers.microsoft.azure.transfers.file_to_wasb`.
"""
"""This module is deprecated. Please use :mod:`airflow.providers.microsoft.azure.transfers.local_to_wasb`."""

import warnings

from airflow.providers.microsoft.azure.transfers.file_to_wasb import FileToWasbOperator # noqa
from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator

warnings.warn(
"This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.file_to_wasb`.",
"This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb`.",
DeprecationWarning,
stacklevel=2,
)


class FileToWasbOperator(LocalFilesystemToWasbOperator):
"""
This class is deprecated.
Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb.LocalFilesystemToWasbOperator`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"""This class is deprecated.
Please use
`airflow.providers.microsoft.azure.transfers.local_to_wasb.LocalFilesystemToWasbOperator`.""",
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

from airflow.models import DAG
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.transfers.file_to_wasb import FileToWasbOperator
from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator
from airflow.utils.dates import days_ago

PATH_TO_UPLOAD_FILE = os.environ.get('AZURE_PATH_TO_UPLOAD_FILE', 'example-text.txt')

with DAG("example_file_to_wasb", schedule_interval="@once", start_date=days_ago(2)) as dag:
upload = FileToWasbOperator(
with DAG("example_local_to_wasb", schedule_interval="@once", start_date=days_ago(2)) as dag:
upload = LocalFilesystemToWasbOperator(
task_id="upload_file", file_path=PATH_TO_UPLOAD_FILE, container_name="mycontainer", blob_name='myblob'
)
delete = WasbDeleteBlobOperator(task_id="delete_file", container_name="mycontainer", blob_name="myblob")
Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/microsoft/azure/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ transfers:
- source-integration-name: Local
target-integration-name: Microsoft Azure Blob Storage
python-module: airflow.providers.microsoft.azure.transfers.file_to_wasb
- source-integration-name: Local
target-integration-name: Microsoft Azure Blob Storage
python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst
Expand Down
59 changes: 8 additions & 51 deletions airflow/providers/microsoft/azure/transfers/file_to_wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,14 @@
# specific language governing permissions and limitations
# under the License.
#
from typing import Optional
"""This module is deprecated. Please use :mod:`airflow.providers.microsoft.azure.transfers.local_to_wasb`."""

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
import warnings

from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator # noqa

class FileToWasbOperator(BaseOperator):
"""
Uploads a file to Azure Blob Storage.
:param file_path: Path to the file to load. (templated)
:type file_path: str
:param container_name: Name of the container. (templated)
:type container_name: str
:param blob_name: Name of the blob. (templated)
:type blob_name: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param load_options: Optional keyword arguments that
`WasbHook.load_file()` takes.
:type load_options: Optional[dict]
"""

template_fields = ('file_path', 'container_name', 'blob_name')

def __init__(
self,
*,
file_path: str,
container_name: str,
blob_name: str,
wasb_conn_id: str = 'wasb_default',
load_options: Optional[dict] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
if load_options is None:
load_options = {}
self.file_path = file_path
self.container_name = container_name
self.blob_name = blob_name
self.wasb_conn_id = wasb_conn_id
self.load_options = load_options

def execute(self, context: dict) -> None:
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
self.log.info(
'Uploading %s to wasb://%s as %s',
self.file_path,
self.container_name,
self.blob_name,
)
hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
warnings.warn(
"This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb`.",
DeprecationWarning,
stacklevel=2,
)
72 changes: 72 additions & 0 deletions airflow/providers/microsoft/azure/transfers/local_to_wasb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from typing import Optional

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook


class LocalFilesystemToWasbOperator(BaseOperator):
"""
Uploads a file to Azure Blob Storage.
:param file_path: Path to the file to load. (templated)
:type file_path: str
:param container_name: Name of the container. (templated)
:type container_name: str
:param blob_name: Name of the blob. (templated)
:type blob_name: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param load_options: Optional keyword arguments that
`WasbHook.load_file()` takes.
:type load_options: Optional[dict]
"""

template_fields = ('file_path', 'container_name', 'blob_name')

def __init__(
self,
*,
file_path: str,
container_name: str,
blob_name: str,
wasb_conn_id: str = 'wasb_default',
load_options: Optional[dict] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
if load_options is None:
load_options = {}
self.file_path = file_path
self.container_name = container_name
self.blob_name = blob_name
self.wasb_conn_id = wasb_conn_id
self.load_options = load_options

def execute(self, context: dict) -> None:
"""Upload a file to Azure Blob Storage."""
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
self.log.info(
'Uploading %s to wasb://%s as %s',
self.file_path,
self.container_name,
self.blob_name,
)
hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)
1 change: 1 addition & 0 deletions dev/provider_packages/prepare_provider_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,7 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin
# we imported it directly during module walk by the importlib library
KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
"This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.dynamodb`.",
"This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb`.",
"This module is deprecated. Please use `airflow.providers.tableau.operators.tableau_refresh_workbook`.",
"This module is deprecated. Please use `airflow.providers.tableau.sensors.tableau_job_status`.",
"This module is deprecated. Please use `airflow.providers.tableau.hooks.tableau`.",
Expand Down
2 changes: 1 addition & 1 deletion tests/deprecated_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,7 @@
'airflow.operators.mssql_to_hive.MsSqlToHiveTransfer',
),
(
'airflow.providers.microsoft.azure.transfers.file_to_wasb.FileToWasbOperator',
'airflow.providers.microsoft.azure.transfers.local_to_wasb.LocalFilesystemToWasbOperator',
'airflow.contrib.operators.file_to_wasb.FileToWasbOperator',
),
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from unittest import mock

from airflow.models.dag import DAG
from airflow.providers.microsoft.azure.transfers.file_to_wasb import FileToWasbOperator
from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator


class TestFileToWasbOperator(unittest.TestCase):
class TestLocalFilesystemToWasbOperator(unittest.TestCase):

_config = {
'file_path': 'file',
Expand All @@ -40,23 +40,23 @@ def setUp(self):
self.dag = DAG('test_dag_id', default_args=args)

def test_init(self):
operator = FileToWasbOperator(task_id='wasb_operator_1', dag=self.dag, **self._config)
operator = LocalFilesystemToWasbOperator(task_id='wasb_operator_1', dag=self.dag, **self._config)
assert operator.file_path == self._config['file_path']
assert operator.container_name == self._config['container_name']
assert operator.blob_name == self._config['blob_name']
assert operator.wasb_conn_id == self._config['wasb_conn_id']
assert operator.load_options == {}
assert operator.retries == self._config['retries']

operator = FileToWasbOperator(
operator = LocalFilesystemToWasbOperator(
task_id='wasb_operator_2', dag=self.dag, load_options={'timeout': 2}, **self._config
)
assert operator.load_options == {'timeout': 2}

@mock.patch('airflow.providers.microsoft.azure.transfers.file_to_wasb.WasbHook', autospec=True)
@mock.patch('airflow.providers.microsoft.azure.transfers.local_to_wasb.WasbHook', autospec=True)
def test_execute(self, mock_hook):
mock_instance = mock_hook.return_value
operator = FileToWasbOperator(
operator = LocalFilesystemToWasbOperator(
task_id='wasb_sensor', dag=self.dag, load_options={'timeout': 2}, **self._config
)
operator.execute(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pytest

from airflow.providers.microsoft.azure.example_dags.example_file_to_wasb import PATH_TO_UPLOAD_FILE
from airflow.providers.microsoft.azure.example_dags.example_local_to_wasb import PATH_TO_UPLOAD_FILE
from tests.test_utils.azure_system_helpers import (
AZURE_DAG_FOLDER,
AzureSystemTest,
Expand All @@ -33,7 +33,7 @@

@pytest.mark.backend('postgres', 'mysql')
@pytest.mark.credential_file(WASB_DEFAULT_KEY)
class FileToWasbSystem(AzureSystemTest):
class LocalToWasbSystem(AzureSystemTest):
def setUp(self):
super().setUp()
with open(PATH_TO_UPLOAD_FILE, 'w+') as file:
Expand All @@ -44,5 +44,5 @@ def tearDown(self):
super().tearDown()

@provide_wasb_default_connection(CREDENTIALS_PATH)
def test_run_example_file_to_wasb(self):
self.run_dag('example_file_to_wasb', AZURE_DAG_FOLDER)
def test_run_example_local_to_wasb(self):
self.run_dag('example_local_to_wasb', AZURE_DAG_FOLDER)

0 comments on commit 1f9fe99

Please sign in to comment.