Skip to content

Commit

Permalink
feat(module/asg): Update Lambda runtime to Python 3.11 (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianczech authored Jul 11, 2024
1 parent da73784 commit 2de1226
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
2 changes: 1 addition & 1 deletion modules/asg/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ resource "aws_lambda_function" "this" {
role = aws_iam_role.this.arn
handler = "lambda.lambda_handler"
source_code_hash = data.archive_file.this.output_base64sha256
runtime = "python3.8"
runtime = "python3.11"
timeout = var.lambda_timeout
reserved_concurrent_executions = var.reserved_concurrent_executions
tracing_config {
Expand Down
48 changes: 30 additions & 18 deletions modules/asg/scripts/lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from xml.etree.ElementTree import Element

from boto3 import client
from botocore.exceptions import ClientError, ParamValidationError
from botocore.config import Config
from botocore.exceptions import ClientError, ParamValidationError

from panos.panorama import Panorama

Expand Down Expand Up @@ -290,11 +290,11 @@ def disable_source_dest_check(self, interface_id: str):
"""
self.logger.info(f"Disable source_dest_check for network interface {interface_id}")
self.ec2_client.modify_network_interface_attribute(
NetworkInterfaceId=interface_id,
SourceDestCheck={
'Value': False,
}
)
NetworkInterfaceId=interface_id,
SourceDestCheck={
'Value': False,
}
)

def modify_network_interface(self, interface_id: str, attachment_id: str, source_dest_check: bool = True):
"""
Expand Down Expand Up @@ -365,6 +365,7 @@ def ip_network_interface(self, instance_id: str, device_index: str):
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/describe_network_interfaces.html
:param instance_id: EC2 Instance id
:param device_index: Network interface index
:return: none
"""
description = self.ec2_client.describe_network_interfaces(
Expand All @@ -381,7 +382,7 @@ def ip_network_interface(self, instance_id: str, device_index: str):
)
try:
return description['NetworkInterfaces'][0]['PrivateIpAddress']
except IndexError as e:
except IndexError:
return None

def register_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: str):
Expand All @@ -406,7 +407,7 @@ def register_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: str)
},
]
)
except ParamValidationError as e:
except ParamValidationError:
self.logger.error(f"Unable to register target with IP {untrust_ip}")

def deregister_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: str):
Expand All @@ -430,7 +431,7 @@ def deregister_untrust_ip_as_target(self, ip_target_groups: list, untrust_ip: st
},
]
)
except ParamValidationError as e:
except ParamValidationError:
self.logger.error(f"Unable to deregister target with IP {untrust_ip}")

def panorama_cmd(self, panorama, cmd: str, xml: bool = True, cmd_xml: bool = True) -> Element:
Expand All @@ -454,8 +455,8 @@ def check_ssm_param(self, ssm_param_name: str) -> dict:
:ssm_param_name: Parameter name
:return: dict
"""
ssm_param_list = self.ssm_client.get_parameter(Name=ssm_param_name, WithDecryption=True).get("Parameter").get("Value"). \
replace("\'", "\"")
ssm_param_list = self.ssm_client.get_parameter(Name=ssm_param_name, WithDecryption=True). \
get("Parameter").get("Value").replace("\'", "\"")
return loads(ssm_param_list)

def delicense_fw(self, instance_id) -> bool:
Expand Down Expand Up @@ -489,13 +490,19 @@ def delicense_fw(self, instance_id) -> bool:
# Check if first Panorama is active - if not, the use second Panorama for de-licensing
if self.check_is_active_in_ha(panorama_hostname, panorama_username, panorama_password):
# De-license using active, first Panorama instance from Active-Passive HA cluster
delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname, panorama_username, panorama_password, panorama_lm_name)
delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname,
panorama_username, panorama_password,
panorama_lm_name)
else:
# De-license using active, second Panorama instance from Active-Passive HA cluster
delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname2, panorama_username, panorama_password, panorama_lm_name)
delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname2,
panorama_username, panorama_password,
panorama_lm_name)
else:
# De-license using the only 1 Panorama instance
delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname, panorama_username, panorama_password, panorama_lm_name)
delicensed = self.request_panorama_delicense_fw(vmseries_ip_address, panorama_hostname,
panorama_username, panorama_password,
panorama_lm_name)

return delicensed
else:
Expand Down Expand Up @@ -537,22 +544,26 @@ def check_is_active_in_ha(self, panorama_hostname, panorama_username, panorama_p
self.logger.info(f"Error while checking high-availability state for Panorama {panorama_hostname}")
return False

def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, panorama_username, panorama_password, panorama_lm_name) -> bool:
def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname, panorama_username,
panorama_password, panorama_lm_name) -> bool:
"""
Function used to de-license VM-Series using plugin sw_fw_license running on Panorama server
:param vmseries_ip_address: IP address of the MGMT interface for VM-Series
:param panorama_hostname: Hostname of the Panorama server
:param panorama_username: Account's name
:param panorama_password: Account's password
:param panorama_lm_name: License manager name (in plugin sw_fw_license)
:return: True if VM-Series was de-licensed correctly, False in other case
"""
try:
# Set status of delicensing
delicensed = False

# Connect to selected Panorama instance
self.logger.info(f"Connecting to '{panorama_hostname}' using user '{panorama_username}' to license manager '{panorama_lm_name}'")
self.logger.info(
f"Connecting to '{panorama_hostname}' using user '{panorama_username}' "
f"to license manager '{panorama_lm_name}'")
panorama = Panorama(hostname=panorama_hostname,
api_username=panorama_username,
api_password=panorama_password)
Expand All @@ -575,10 +586,11 @@ def request_panorama_delicense_fw(self, vmseries_ip_address, panorama_hostname,
serial_obj = fw.find("serial")
if serial_obj is not None:
serial = serial_obj.text
# If IP address is the same as destroyed VM and serial is not none, then delicense firewall
# If IP address is the same as destroyed VM and serial is not none, then delicense FW
if serial_obj.text is not None:
self.logger.info(f"De-licensing firewall: {serial} ...")
cmd = f"request plugins sw_fw_license deactivate license-manager \"{panorama_lm_name}\" devices member \"{serial}\""
cmd = (f"request plugins sw_fw_license deactivate license-manager "
f"\"{panorama_lm_name}\" devices member \"{serial}\"")
resp_parsed = self.panorama_cmd(panorama, cmd)
if resp_parsed.attrib["status"] == "success":
self.logger.info(f"De-licensing firewall: {serial} succeeded")
Expand Down

0 comments on commit 2de1226

Please sign in to comment.