Skip to content

Commit

Permalink
Added unit tests for functions in manageVolume and ec2_dev_2_volid, s…
Browse files Browse the repository at this point in the history
…eparated ec2_dev_2_volid main function into multiple functions, fixed typo in manageVolume (#2445)

Signed-off-by: Judy Ng <[email protected]>
  • Loading branch information
judysng authored Sep 13, 2023
1 parent 71c252b commit 05ebede
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,7 @@ def validate_device_name(device_name):
return True


def main():
syslog.syslog("Starting ec2_dev_2_volid.py script")
# Get dev
try:
dev = str(sys.argv[1])
validate_device_name(dev)
syslog.syslog(f"Input block device is {dev}")
except IndexError:
syslog.syslog(syslog.LOG_ERR, "Provide block device i.e. xvdf")

# Convert dev to mapping format
def adapt_device_name(dev):
if "nvme" in dev:
# For newer instances which expose EBS volumes as NVMe devices, translate the
# device name so boto can discover it.
Expand All @@ -70,45 +60,21 @@ def main():
else:
dev = dev.replace("xvd", "sd")
dev = "/dev/" + dev
return dev

# Get IMDSv2 token
token = get_imdsv2_token()

# Get instance ID
instance_id = requests.get(
"http://169.254.169.254/latest/meta-data/instance-id",
headers=token,
timeout=METADATA_REQUEST_TIMEOUT,
).text

# Get region
region = requests.get(
"http://169.254.169.254/latest/meta-data/placement/availability-zone",
headers=token,
timeout=METADATA_REQUEST_TIMEOUT,
).text
region = region[:-1]

# Parse configuration file to read proxy settings
def parse_proxy_config():
config = configparser.RawConfigParser()
config.read("/etc/boto.cfg")
proxy_config = Config()
if config.has_option("Boto", "proxy") and config.has_option("Boto", "proxy_port"):
proxy = config.get("Boto", "proxy")
proxy_port = config.get("Boto", "proxy_port")
proxy_config = Config(proxies={"https": f"{proxy}:{proxy_port}"})
return proxy_config

# Configure the AWS CA bundle.
# In US isolated regions the dedicated CA bundle will be used.
# In any other region, the default bundle will be used (None stands for the default settings).
# Note: We want to apply a more general solution that applies to every region,
# but for the time being this is enough to support US isolated regions without
# impacting the other ones.
ca_bundle = f"/etc/pki/{region}/certs/ca-bundle.pem" if region.startswith("us-iso") else None

# Connect to AWS using boto
ec2 = boto3.client("ec2", region_name=region, config=proxy_config, verify=ca_bundle)

def get_device_volume_id(ec2, dev, instance_id):
# Poll for blockdevicemapping
devices = ec2.describe_instance_attribute(InstanceId=instance_id, Attribute="blockDeviceMapping").get(
"BlockDeviceMappings"
Expand All @@ -127,8 +93,48 @@ def main():
dev_map = dict((d.get("DeviceName"), d) for d in devices)
loop_count += 1

# Return volume ID
volume_id = dev_map.get(dev).get("Ebs").get("VolumeId")
return dev_map.get(dev).get("Ebs").get("VolumeId")


def get_metadata_value(token, metadata_path):
return requests.get(
metadata_path,
headers=token,
timeout=METADATA_REQUEST_TIMEOUT,
).text


def main():
syslog.syslog("Starting ec2_dev_2_volid.py script")
try:
dev = str(sys.argv[1])
validate_device_name(dev)
syslog.syslog(f"Input block device is {dev}")
except IndexError:
syslog.syslog(syslog.LOG_ERR, "Provide block device i.e. xvdf")

dev = adapt_device_name(dev)

token = get_imdsv2_token()

instance_id = get_metadata_value(token, "http://169.254.169.254/latest/meta-data/instance-id")

region = get_metadata_value(token, "http://169.254.169.254/latest/meta-data/placement/availability-zone")
region = region[:-1]

proxy_config = parse_proxy_config()

# Configure the AWS CA bundle.
# In US isolated regions the dedicated CA bundle will be used.
# In any other region, the default bundle will be used (None stands for the default settings).
# Note: We want to apply a more general solution that applies to every region,
# but for the time being this is enough to support US isolated regions without
# impacting the other ones.
ca_bundle = f"/etc/pki/{region}/certs/ca-bundle.pem" if region.startswith("us-iso") else None

ec2 = boto3.client("ec2", region_name=region, config=proxy_config, verify=ca_bundle)

volume_id = get_device_volume_id(ec2, dev, instance_id)
print(volume_id)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def validate_device_name(device_name):
return True


def convert_dev(dev):
def adapt_device_name(dev):
# Translate the device name as provided by the OS to the one used by EC2
# FIXME This approach could be broken in some OS variants, see # pylint: disable=fixme
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html#identify-nvme-ebs-device
Expand Down Expand Up @@ -91,7 +91,7 @@ def get_imdsv2_token():

def attach_volume(volume_id, instance_id, ec2):
# Generate a list of system paths minus the root path
paths = [convert_dev(device) for device in get_all_devices()]
paths = [adapt_device_name(device) for device in get_all_devices()]

# List of possible block devices
block_devices = [
Expand Down Expand Up @@ -175,7 +175,6 @@ def detach_volume(volume_id, ec2):


def parse_proxy_config():
"""Parse configuration file to read proxy settings."""
config = configparser.RawConfigParser()
config.read("/etc/boto.cfg")
proxy_config = Config()
Expand All @@ -186,38 +185,33 @@ def parse_proxy_config():
return proxy_config


def handle_volume(volume_id, attach, detach):
# Get IMDSv2 token
token = get_imdsv2_token()

# Get instance ID
instance_id = requests.get(
"http://169.254.169.254/latest/meta-data/instance-id",
def get_metadata_value(token, metadata_path):
return requests.get(
metadata_path,
headers=token,
timeout=METADATA_REQUEST_TIMEOUT,
).text

# Get region
region = requests.get(
"http://169.254.169.254/latest/meta-data/placement/availability-zone",
headers=token,
timeout=METADATA_REQUEST_TIMEOUT,
).text

def handle_volume(volume_id, attach, detach):
token = get_imdsv2_token()

instance_id = get_metadata_value(token, "http://169.254.169.254/latest/meta-data/instance-id")

region = get_metadata_value(token, "http://169.254.169.254/latest/meta-data/placement/availability-zone")
region = region[:-1]

# Parse configuration file to read proxy settings
proxy_config = parse_proxy_config()

# Connect to AWS using boto
ec2 = boto3.client("ec2", region_name=region, config=proxy_config)

if attach and is_volume_avaialble(ec2, volume_id):
if attach and is_volume_available(ec2, volume_id):
attach_volume(volume_id, instance_id, ec2)
elif detach and is_volume_attached(ec2, volume_id):
detach_volume(volume_id, ec2)


def is_volume_avaialble(ec2, volume_id):
def is_volume_available(ec2, volume_id):
try:
state = ec2.describe_volumes(VolumeIds=[volume_id]).get("Volumes")[0].get("State")
if state == "available":
Expand Down
115 changes: 114 additions & 1 deletion test/unit/ec2_udev_rules/test_ec2_dev_2_volid.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and limitations under the License.
import pytest
from assertpy import assert_that
from ec2_dev_2_volid import validate_device_name
from ec2_dev_2_volid import adapt_device_name, get_device_volume_id, get_imdsv2_token, validate_device_name


@pytest.mark.parametrize(
Expand All @@ -31,3 +31,116 @@ def test_validate_device_name(device_name, raises):
assert_that(validate_device_name).raises(ValueError).when_called_with(device_name).contains("invalid pattern")
else:
assert_that(validate_device_name(device_name)).is_true()


@pytest.mark.parametrize(
("status_code", "content", "expected_value"),
[(200, {"key": "value"}, {"X-aws-ec2-metadata-token": {"key": "value"}}), (400, {"key": "value"}, {})],
)
def test_get_imdsv2_token(mocker, status_code, content, expected_value):
mock = mocker.Mock()
mocker.patch("requests.put", mock)
mock.return_value.status_code = status_code
mock.return_value.content = content
assert_that(get_imdsv2_token()).is_equal_to(expected_value)


@pytest.mark.parametrize(
("dev", "expected_name", "raises"),
[
("nvme0n1", "sdf", True),
("nvme0n1p1", "sdf", True),
("nvme0n1p128", "sdf", True),
("xvd0n1", "/dev/sd0n1", False),
("xvd0n1p1", "/dev/sd0n1p1", False),
],
)
def test_adapt_device_name(mocker, dev, expected_name, raises, capsys):
mocker.patch("os.popen", mocker.mock_open(read_data=":sdf"))
if raises:
with pytest.raises(SystemExit) as e:
adapt_device_name(dev)
captured = capsys.readouterr()
assert_that(expected_name).is_equal_to(captured.out)
assert_that(e.value.code).is_equal_to(0)
else:
assert_that(adapt_device_name(dev)).matches(expected_name)


@pytest.fixture(name="ec2_mock")
def fixture_ec2_mock(mocker):
mock = mocker.MagicMock()
mocker.patch("boto3.client", mock)
return mock


@pytest.mark.parametrize(
("dev", "block", "output_value"),
[
(
"/dev/sda1",
{
"InstanceId": "i-1234567890abcdef0",
"BlockDeviceMappings": [
{
"DeviceName": "/dev/sda1",
"Ebs": {
"Status": "attached",
"DeleteOnTermination": True,
"VolumeId": "vol-049df61146c4d7901",
"AttachTime": "2013-05-17T22:42:34.000Z",
},
},
{
"DeviceName": "/dev/sdf",
"Ebs": {
"Status": "attached",
"DeleteOnTermination": False,
"VolumeId": "vol-049df61146c4d7901",
"AttachTime": "2013-09-10T23:07:00.000Z",
},
},
],
},
"vol-049df61146c4d7901",
),
(
"/dev/sda2",
{
"InstanceId": "i-1234567890abcdef0",
"BlockDeviceMappings": [
{
"DeviceName": "/dev/sda1",
"Ebs": {
"Status": "attached",
"DeleteOnTermination": True,
"VolumeId": "vol-049df61146c4d7901",
"AttachTime": "2013-05-17T22:42:34.000Z",
},
},
{
"DeviceName": "/dev/sdf",
"Ebs": {
"Status": "attached",
"DeleteOnTermination": False,
"VolumeId": "vol-049df61146c4d7901",
"AttachTime": "2013-09-10T23:07:00.000Z",
},
},
],
},
SystemExit,
),
],
)
def test_get_device_volume_id(mocker, ec2_mock, dev, block, output_value):
mocker.patch("time.sleep", return_value=None)
ec2_mock.describe_instance_attribute.return_value = block
if output_value == SystemExit:
with pytest.raises(SystemExit) as e:
get_device_volume_id(ec2_mock, dev, 1)
assert_that(e.type).is_equal_to(SystemExit)
assert_that(e.value.code).is_equal_to(1)
else:
volume_id = get_device_volume_id(ec2_mock, dev, 1)
assert_that(volume_id).is_equal_to(output_value)
Loading

0 comments on commit 05ebede

Please sign in to comment.