diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py index c2d9d022fbcbc..f7c6ab710c43d 100644 --- a/airflow/providers/amazon/aws/operators/aws_lambda.py +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -15,88 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.lambda_function`.""" -import json -from typing import TYPE_CHECKING, Optional, Sequence +import warnings -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook +from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator # noqa -if TYPE_CHECKING: - from airflow.utils.context import Context - - -class AwsLambdaInvokeFunctionOperator(BaseOperator): - """ - Invokes an AWS Lambda function. - You can invoke a function synchronously (and wait for the response), - or asynchronously. - To invoke a function asynchronously, - set `invocation_type` to `Event`. For more details, - review the boto3 Lambda invoke docs. - - :param function_name: The name of the AWS Lambda function, version, or alias. - :param payload: The JSON string that you want to provide to your Lambda function as input. - :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". - :param qualifier: Specify a version or alias to invoke a published version of the function. - :param aws_conn_id: The AWS connection ID to use - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:AwsLambdaInvokeFunctionOperator` - - """ - - template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type') - ui_color = '#ff7300' - - def __init__( - self, - *, - function_name: str, - log_type: Optional[str] = None, - qualifier: Optional[str] = None, - invocation_type: Optional[str] = None, - client_context: Optional[str] = None, - payload: Optional[str] = None, - aws_conn_id: str = 'aws_default', - **kwargs, - ): - super().__init__(**kwargs) - self.function_name = function_name - self.payload = payload - self.log_type = log_type - self.qualifier = qualifier - self.invocation_type = invocation_type - self.client_context = client_context - self.aws_conn_id = aws_conn_id - - def execute(self, context: 'Context'): - """ - Invokes the target AWS Lambda function from Airflow. - - :return: The response payload from the function, or an error object. - """ - hook = LambdaHook(aws_conn_id=self.aws_conn_id) - success_status_codes = [200, 202, 204] - self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload) - response = hook.invoke_lambda( - function_name=self.function_name, - invocation_type=self.invocation_type, - log_type=self.log_type, - client_context=self.client_context, - payload=self.payload, - qualifier=self.qualifier, - ) - self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) - if response.get("StatusCode") not in success_status_codes: - raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) - payload_stream = response.get("Payload") - payload = payload_stream.read().decode() - if "FunctionError" in response: - raise ValueError( - 'Lambda function execution resulted in error', - {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload}, - ) - self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata")) - return payload +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.lambda_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py new file mode 100644 index 0000000000000..c2d9d022fbcbc --- /dev/null +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from typing import TYPE_CHECKING, Optional, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AwsLambdaInvokeFunctionOperator(BaseOperator): + """ + Invokes an AWS Lambda function. + You can invoke a function synchronously (and wait for the response), + or asynchronously. + To invoke a function asynchronously, + set `invocation_type` to `Event`. For more details, + review the boto3 Lambda invoke docs. + + :param function_name: The name of the AWS Lambda function, version, or alias. + :param payload: The JSON string that you want to provide to your Lambda function as input. + :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". + :param qualifier: Specify a version or alias to invoke a published version of the function. + :param aws_conn_id: The AWS connection ID to use + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AwsLambdaInvokeFunctionOperator` + + """ + + template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type') + ui_color = '#ff7300' + + def __init__( + self, + *, + function_name: str, + log_type: Optional[str] = None, + qualifier: Optional[str] = None, + invocation_type: Optional[str] = None, + client_context: Optional[str] = None, + payload: Optional[str] = None, + aws_conn_id: str = 'aws_default', + **kwargs, + ): + super().__init__(**kwargs) + self.function_name = function_name + self.payload = payload + self.log_type = log_type + self.qualifier = qualifier + self.invocation_type = invocation_type + self.client_context = client_context + self.aws_conn_id = aws_conn_id + + def execute(self, context: 'Context'): + """ + Invokes the target AWS Lambda function from Airflow. + + :return: The response payload from the function, or an error object. + """ + hook = LambdaHook(aws_conn_id=self.aws_conn_id) + success_status_codes = [200, 202, 204] + self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload) + response = hook.invoke_lambda( + function_name=self.function_name, + invocation_type=self.invocation_type, + log_type=self.log_type, + client_context=self.client_context, + payload=self.payload, + qualifier=self.qualifier, + ) + self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) + if response.get("StatusCode") not in success_status_codes: + raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) + payload_stream = response.get("Payload") + payload = payload_stream.read().decode() + if "FunctionError" in response: + raise ValueError( + 'Lambda function execution resulted in error', + {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload}, + ) + self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata")) + return payload diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 451ca102068e9..29921cc2d2d66 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -288,6 +288,7 @@ operators: - integration-name: AWS Lambda python-modules: - airflow.providers.amazon.aws.operators.aws_lambda + - airflow.providers.amazon.aws.operators.lambda_function - integration-name: Amazon Simple Storage Service (S3) python-modules: - airflow.providers.amazon.aws.operators.s3_bucket diff --git a/scripts/in_container/verify_providers.py b/scripts/in_container/verify_providers.py index e3d0220178e5c..e0b7468b8c342 100755 --- a/scripts/in_container/verify_providers.py +++ b/scripts/in_container/verify_providers.py @@ -247,6 +247,7 @@ class ProviderPackageDetails(NamedTuple): 'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.redshift_cluster`.', "This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3`.", "This module is deprecated. Please use `airflow.providers.tableau.sensors.tableau`.", + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.lambda_function`.", } diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 522e62da72f0e..5b76af905d63e 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -1131,6 +1131,10 @@ 'airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator', 'airflow.operators.s3_file_transform_operator.S3FileTransformOperator', ), + ( + "airflow.providers.amazon.aws.operators.lambda_function.AwsLambdaInvokeFunctionOperator", + "airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator", + ), ( 'airflow.providers.amazon.aws.operators.sagemaker.SageMakerBaseOperator', 'airflow.providers.amazon.aws.operators.sagemaker_base.SageMakerBaseOperator', diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda.py index b566f0cfb7a3f..f3199eab7b2d5 100644 --- a/tests/providers/amazon/aws/operators/test_lambda.py +++ b/tests/providers/amazon/aws/operators/test_lambda.py @@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook -from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator +from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator @mock_lambda diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py index 6944e7c0e0d54..a9e5e37b6c88a 100644 --- a/tests/system/providers/amazon/aws/example_lambda.py +++ b/tests/system/providers/amazon/aws/example_lambda.py @@ -24,7 +24,7 @@ from airflow import models from airflow.decorators import task from airflow.models.baseoperator import chain -from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator +from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder