From 2de122614728277c3b46abcccc619acd667abbd5 Mon Sep 17 00:00:00 2001 From: Sebastian Czech Date: Thu, 11 Jul 2024 14:48:59 +0200 Subject: [PATCH] feat(module/asg): Update Lambda runtime to Python 3.11 (#59) --- modules/asg/main.tf | 2 +- modules/asg/scripts/lambda.py | 48 ++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/modules/asg/main.tf b/modules/asg/main.tf index 0cd6a8d..0ce2d7d 100644 --- a/modules/asg/main.tf +++ b/modules/asg/main.tf @@ -310,7 +310,7 @@ resource "aws_lambda_function" "this" { role = aws_iam_role.this.arn handler = "lambda.lambda_handler" source_code_hash = data.archive_file.this.output_base64sha256 - runtime = "python3.8" + runtime = "python3.11" timeout = var.lambda_timeout reserved_concurrent_executions = var.reserved_concurrent_executions tracing_config { diff --git a/modules/asg/scripts/lambda.py b/modules/asg/scripts/lambda.py index a9e3745..b8f1a7b 100644 --- a/modules/asg/scripts/lambda.py +++ b/modules/asg/scripts/lambda.py @@ -5,8 +5,8 @@ from xml.etree.ElementTree import Element from boto3 import client -from botocore.exceptions import ClientError, ParamValidationError from botocore.config import Config +from botocore.exceptions import ClientError, ParamValidationError from panos.panorama import Panorama @@ -290,11 +290,11 @@ def disable_source_dest_check(self, interface_id: str): """ self.logger.info(f"Disable source_dest_check for network interface {interface_id}") self.ec2_client.modify_network_interface_attribute( - NetworkInterfaceId=interface_id, - SourceDestCheck={ - 'Value': False, - } - ) + NetworkInterfaceId=interface_id, + SourceDestCheck={ + 'Value': False, + } + ) def modify_network_interface(self, interface_id: str, attachment_id: str, source_dest_check: bool = True): """ @@ -365,6 +365,7 @@ def ip_network_interface(self, instance_id: str, device_index: str): https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/describe_network_interfaces.html :param instance_id: EC2 Instance id + :param device_index: Network interface index :return: none """ description = self.ec2_client.describe_network_interfaces( @@ -381,7 +382,7 @@ def ip_network_interface(self, instance_id: str, device_index: str): ) try: return description['NetworkInterfaces'][0]['PrivateIpAddress'] - except IndexError as e: + except IndexError: return None def register_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: str): @@ -406,7 +407,7 @@ def register_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: str) }, ] ) - except ParamValidationError as e: + except ParamValidationError: self.logger.error(f"Unable to register target with IP {untrust_ip}") def deregister_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: str): @@ -430,7 +431,7 @@ def deregister_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: st }, ] ) - except ParamValidationError as e: + except ParamValidationError: self.logger.error(f"Unable to deregister target with IP {untrust_ip}") def panorama_cmd(self, panorama, cmd: str, xml: bool = True, cmd_xml: bool = True) -> Element: @@ -454,8 +455,8 @@ def check_ssm_param(self, ssm_param_name: str) -> dict: :ssm_param_name: Parameter name :return: dict """ - ssm_param_list = self.ssm_client.get_parameter(Name=ssm_param_name, WithDecryption=True).get("Parameter").get("Value"). \ - replace("\'", "\"") + ssm_param_list = self.ssm_client.get_parameter(Name=ssm_param_name, WithDecryption=True). \ + get("Parameter").get("Value").replace("\'", "\"") return loads(ssm_param_list) def delicense_fw(self, instance_id) -> bool: @@ -489,13 +490,19 @@ def delicense_fw(self, instance_id) -> bool: # Check if first Panorama is active - if not, the use second Panorama for de-licensing if self.check_is_active_in_ha(panorama_hostname, panorama_username, panorama_password): # De-license using active, first Panorama instance from Active-Passive HA cluster - delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname, panorama_username, panorama_password, panorama_lm_name) + delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname, + panorama_username, panorama_password, + panorama_lm_name) else: # De-license using active, second Panorama instance from Active-Passive HA cluster - delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname2, panorama_username, panorama_password, panorama_lm_name) + delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname2, + panorama_username, panorama_password, + panorama_lm_name) else: # De-license using the only 1 Panorama instance - delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname, panorama_username, panorama_password, panorama_lm_name) + delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname, + panorama_username, panorama_password, + panorama_lm_name) return delicensed else: @@ -537,7 +544,8 @@ def check_is_active_in_ha(self, panorama_hostname, panorama_username, panorama_p self.logger.info(f"Error while checking high-availability state for Panorama {panorama_hostname}") return False - def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, panorama_username, panorama_password, panorama_lm_name) -> bool: + def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, panorama_username, + panorama_password, panorama_lm_name) -> bool: """ Function used to de-license VM-Series using plugin sw_fw_license running on Panorama server @@ -545,6 +553,7 @@ def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, :param panorama_hostname: Hostname of the Panorama server :param panorama_username: Account's name :param panorama_password: Account's password + :param panorama_lm_name: License manager name (in plugin sw_fw_license) :return: True if VM-Series was de-licensed correctly, False in other case """ try: @@ -552,7 +561,9 @@ def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, delicensed = False # Connect to selected Panorama instance - self.logger.info(f"Connecting to '{panorama_hostname}' using user '{panorama_username}' to license manager '{panorama_lm_name}'") + self.logger.info( + f"Connecting to '{panorama_hostname}' using user '{panorama_username}' " + f"to license manager '{panorama_lm_name}'") panorama = Panorama(hostname=panorama_hostname, api_username=panorama_username, api_password=panorama_password) @@ -575,10 +586,11 @@ def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, serial_obj = fw.find("serial") if serial_obj is not None: serial = serial_obj.text - # If IP address is the same as destroyed VM and serial is not none, then delicense firewall + # If IP address is the same as destroyed VM and serial is not none, then delicense FW if serial_obj.text is not None: self.logger.info(f"De-licensing firewall: {serial} ...") - cmd = f"request plugins sw_fw_license deactivate license-manager \"{panorama_lm_name}\" devices member \"{serial}\"" + cmd = (f"request plugins sw_fw_license deactivate license-manager " + f"\"{panorama_lm_name}\" devices member \"{serial}\"") resp_parsed = self.panorama_cmd(panorama, cmd) if resp_parsed.attrib["status"] == "success": self.logger.info(f"De-licensing firewall: {serial} succeeded")