Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

check if already unlocked and mounted #39

Merged
merged 1 commit into from
Dec 16, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,25 +277,37 @@ def unlock_luks_volume(self, encryption_key):
if 'UUID' in items[0]:
self.encrypted_device = 'luks-' + items[1]

# the luks device is not already unlocked
if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)):
logger.debug('Unlocking luks volume {}'.format(self.encrypted_device))
p = subprocess.Popen(
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.debug('Passing key')
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
logger.error('Bad phassphrase for {}'.format(self.encrypted_device))
self.exit_gracefully(ExportStatus.USB_BAD_PASSPHRASE.value)
# the luks device is already unlocked
if os.path.exists(os.path.join('/dev/mapper/', self.encrypted_device)):
logger.debug('Device already unlocked')
return
sssoleileraaa marked this conversation as resolved.
Show resolved Hide resolved

logger.debug('Unlocking luks volume {}'.format(self.encrypted_device))
p = subprocess.Popen(
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.debug('Passing key')
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
logger.error('Bad phassphrase for {}'.format(self.encrypted_device))
self.exit_gracefully(ExportStatus.USB_BAD_PASSPHRASE.value)
except subprocess.CalledProcessError:
self.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED)

def mount_volume(self):
# If the drive is already mounted then we don't need to mount it again
output = subprocess.check_output(
["lsblk", "-o", "MOUNTPOINT", "--noheadings", self.device])
mountpoint = output.decode('utf-8').strip()
if mountpoint:
logger.debug('The device is already mounted')
self.mountpoint = mountpoint
return

# mount target not created, create folder
if not os.path.exists(self.mountpoint):
self.safe_check_call(
Expand Down Expand Up @@ -329,16 +341,19 @@ def copy_submission(self):
except (subprocess.CalledProcessError, OSError):
self.exit_gracefully(ExportStatus.ERROR_USB_WRITE.value)
finally:
# Finally, we sync the filesystem, unmount the drive and lock the
# luks volume, and exit 0
logger.info('Syncing filesystems')
subprocess.check_call(["sync"])
logger.info('Unmounting drive from {}'.format(self.mountpoint))
subprocess.check_call(["sudo", "umount", self.mountpoint])
logger.info('Locking luks volume {}'.format(self.encrypted_device))
subprocess.check_call(
["sudo", "cryptsetup", "luksClose", self.encrypted_device]
)

if os.path.exists(self.mountpoint):
logger.info('Unmounting drive from {}'.format(self.mountpoint))
subprocess.check_call(["sudo", "umount", self.mountpoint])

if os.path.exists(os.path.join('/dev/mapper', self.encrypted_device)):
logger.info('Locking luks volume {}'.format(self.encrypted_device))
subprocess.check_call(
["sudo", "cryptsetup", "luksClose", self.encrypted_device]
)

logger.info('Deleting temporary directory {}'.format(self.tmpdir))
subprocess.check_call(["rm", "-rf", self.tmpdir])
sys.exit(0)
Expand Down