Skip to content

Commit

Permalink
Merge pull request #69 from freedomofpress/path-check
Browse files Browse the repository at this point in the history
validate paths for all tarfile types
  • Loading branch information
emkll authored Mar 26, 2021
2 parents 3491a30 + e6fdccd commit fa2188c
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 18 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.6
* Further validate target paths

## 0.2.5
* Sets restrictive permissions, validates target paths

Expand Down
2 changes: 1 addition & 1 deletion securedrop_export/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.5
0.2.6
2 changes: 1 addition & 1 deletion securedrop_export/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.5'
__version__ = '0.2.6'
2 changes: 1 addition & 1 deletion securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def extract_tarball(self):
logger.info(
"Extracting tarball {} into {}".format(self.archive, self.tmpdir)
)
safe_extractall(self.archive, self.tmpdir, self.tmpdir)
safe_extractall(self.archive, self.tmpdir)
except Exception as ex:
logger.error("Unable to extract tarball: {}".format(ex))
self.exit_gracefully(ExportStatus.ERROR_EXTRACTION.value)
Expand Down
22 changes: 13 additions & 9 deletions securedrop_export/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@ def safe_mkdir(
check_all_permissions(relative_path, base_path)


def safe_extractall(archive_file_path: str, dest_path: str, base_path: str) -> None:
def safe_extractall(archive_file_path: str, dest_path: str) -> None:
"""
Safely extract a file specified by archive_file_path to dest_path.
"""
safe_mkdir(base_path, dest_path)

with tarfile.open(archive_file_path) as tar:
# Tarfile types include:
#
Expand All @@ -65,13 +63,19 @@ def safe_extractall(archive_file_path: str, dest_path: str, base_path: str) -> N
# Block device
# Character device
for file_info in tar.getmembers():
file_info.mode = 0o600
if file_info.isdir():
file_info.mode = 0o700
elif file_info.islnk() or file_info.issym():
file_info.mode = 0o700 if file_info.isdir() else 0o600

check_path_traversal(file_info.name)

# If the path is relative then we don't need to check that it resolves to dest_path
if Path(file_info.name).is_absolute():
relative_filepath(file_info.name, dest_path)

if file_info.islnk() or file_info.issym():
check_path_traversal(file_info.linkname)
else:
check_path_traversal(file_info.name)
# If the path is relative then we don't need to check that it resolves to dest_path
if Path(file_info.linkname).is_absolute():
relative_filepath(file_info.linkname, dest_path)

tar.extractall(dest_path)

Expand Down
234 changes: 228 additions & 6 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@


def test_extract_tarball():
"""
Check that we can successfully extract a valid tarball.
"""
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
Expand Down Expand Up @@ -54,6 +57,9 @@ def test_extract_tarball():


def test_extract_tarball_with_symlink():
"""
Check that we can successfully extract a valid tarball that contains a valid symlink.
"""
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
Expand All @@ -79,6 +85,12 @@ def test_extract_tarball_with_symlink():


def test_extract_tarball_raises_if_doing_path_traversal():
"""
Check that we do not allow tarfile member file to do path traversal via TarInfo.name.
"""
if os.path.exists("/tmp/traversed"):
os.remove("/tmp/traversed")

with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
Expand All @@ -100,15 +112,225 @@ def test_extract_tarball_raises_if_doing_path_traversal():
submission.extract_tarball()

assert not os.path.exists('/tmp/traversed')
assert not os.path.exists(os.path.join(submission.tmpdir, "tmp", "traversed"))


def test_extract_tarball_raises_if_doing_path_traversal_with_dir():
"""
Check that we do not allow tarfile member directory to do path traversal via TarInfo.name.
"""
if os.path.exists("/tmp/traversed/"):
os.rmdir("/tmp/traversed/")

if os.path.exists("/tmp/traversed"):
os.remove("/tmp/traversed")

with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
metadata = {"device": "disk", "encryption_method": "luks", "encryption_key": "test"}
metadata_str = json.dumps(metadata)
metadata_bytes = BytesIO(metadata_str.encode("utf-8"))
metadata_file_info = tarfile.TarInfo("metadata.json")
metadata_file_info.size = len(metadata_str)
archive.addfile(metadata_file_info, metadata_bytes)
dir_info = tarfile.TarInfo("../../../../../../../../../tmp/traversed")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o777
archive.addfile(dir_info)
archive.close()

submission = export.SDExport(archive_path, TEST_CONFIG)

with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists('/tmp/traversed')


def test_extract_tarball_raises_if_doing_path_traversal_with_symlink():
"""
This is a contrived path-traversal check because /tmp/traversed2 would have to be created as
another tafile member, so it would be extracted to the extraction path and not to /tmp.
However, it allows us to test that we raise if there is a path traversal attempt via a symlink.
Check that we do not allow tarfile member symlink to do path traversal via TarInfo.name.
"""
if os.path.exists("/tmp/traversed/"):
os.rmdir("/tmp/traversed/")

if os.path.exists("/tmp/traversed"):
os.remove("/tmp/traversed")

with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
metadata = {"device": "disk", "encryption_method": "luks", "encryption_key": "test"}
metadata_str = json.dumps(metadata)
metadata_bytes = BytesIO(metadata_str.encode("utf-8"))
metadata_file_info = tarfile.TarInfo("metadata.json")
metadata_file_info.size = len(metadata_str)
archive.addfile(metadata_file_info, metadata_bytes)
content = b"test"
symlink_info = tarfile.TarInfo("symlink")
symlink_info.size = len(content)
symlink_info.type = tarfile.SYMTYPE
symlink_info.name = "../../../../../../../../../tmp/traversed"
archive.addfile(symlink_info, BytesIO(content))
archive.close()

submission = export.SDExport(archive_path, TEST_CONFIG)

with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists('/tmp/traversed')


def test_extract_tarball_raises_if_doing_path_traversal_with_symlink_linkname():
"""
Check that we do not allow tarfile member symlink to do path traversal via TarInfo.linkname.
"""
if os.path.exists("/tmp/traversed/"):
os.rmdir("/tmp/traversed/")

if os.path.exists("/tmp/traversed"):
os.remove("/tmp/traversed")

with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
metadata = {"device": "disk", "encryption_method": "luks", "encryption_key": "test"}
metadata_str = json.dumps(metadata)
metadata_bytes = BytesIO(metadata_str.encode("utf-8"))
metadata_file_info = tarfile.TarInfo("metadata.json")
metadata_file_info.size = len(metadata_str)
archive.addfile(metadata_file_info, metadata_bytes)
content = b"test"
symlink_info = tarfile.TarInfo("symlink")
symlink_info.size = len(content)
symlink_info.type = tarfile.SYMTYPE
symlink_info.linkname = "../../../../../../../../../tmp/traversed"
archive.addfile(symlink_info, BytesIO(content))
archive.close()

submission = export.SDExport(archive_path, TEST_CONFIG)

with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists('/tmp/traversed')


def test_extract_tarball_raises_if_name_has_unsafe_absolute_path():
"""
Check that we do not allow tarfile member file to specify an unsafe absolute path via
TarInfo.name.
"""
if os.path.exists("/tmp/unsafe"):
os.remove("/tmp/unsafe")

with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
metadata = {"device": "disk", "encryption_method": "luks", "encryption_key": "test"}
metadata_str = json.dumps(metadata)
metadata_bytes = BytesIO(metadata_str.encode("utf-8"))
metadata_file_info = tarfile.TarInfo("metadata.json")
metadata_file_info.size = len(metadata_str)
archive.addfile(metadata_file_info, metadata_bytes)
content = b"test"
file_info = tarfile.TarInfo("/tmp/unsafe")
file_info.size = len(content)
file_info.mode = 0o777
archive.addfile(file_info, BytesIO(content))
archive.close()

submission = export.SDExport(archive_path, TEST_CONFIG)

with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists('/tmp/unsafe')


def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink():
"""
Check that we do not allow tarfile member symlink to specify an unsafe absolute path via
TarInfo.name.
"""
if os.path.exists("/tmp/unsafe"):
os.remove("/tmp/unsafe")

tmp = tempfile.gettempdir()
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
symlink_path = os.path.join(temp_dir, "symlink")

os.system(f"ln -s {tmp}/unsafe {symlink_path}") # create symlink to "/tmp/unsafe"

with tarfile.open(archive_path, "w:gz") as archive:
metadata = {"device": "disk", "encryption_method": "luks", "encryption_key": "test"}
metadata_str = json.dumps(metadata)
metadata_bytes = BytesIO(metadata_str.encode("utf-8"))
metadata_file_info = tarfile.TarInfo("metadata.json")
metadata_file_info.size = len(metadata_str)
archive.addfile(metadata_file_info, metadata_bytes)
archive.add(symlink_path, "symlink")
archive.close()

submission = export.SDExport(archive_path, TEST_CONFIG)

with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists('/tmp/unsafe')


def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink_to_dir():
"""
Check that we do not allow tarfile member symlink to specify an unsafe absolute path via
TarInfo.name.
Note: Same test as `test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink`
but checks that symlinks to absolute directories are also caught.
"""
if os.path.exists("/tmp/unsafe"):
os.remove("/tmp/unsafe")

tmp = tempfile.gettempdir()
with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
symlink_path = os.path.join(temp_dir, "symlink")
file_path = os.path.join(temp_dir, "unsafe")

with open(file_path, "w") as file:
file.write("some-content")

os.system(f"ln -s {tmp} {symlink_path}") # create symlink to "/tmp"

with tarfile.open(archive_path, "w:gz") as archive:
metadata = {"device": "disk", "encryption_method": "luks", "encryption_key": "test"}
metadata_str = json.dumps(metadata)
metadata_bytes = BytesIO(metadata_str.encode("utf-8"))
metadata_file_info = tarfile.TarInfo("metadata.json")
metadata_file_info.size = len(metadata_str)
archive.addfile(metadata_file_info, metadata_bytes)
archive.add(symlink_path, "symlink")
archive.add(file_path, "symlink/unsafe")
archive.close()

submission = export.SDExport(archive_path, TEST_CONFIG)

with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists('/tmp/unsafe')


def test_extract_tarball_raises_if_linkname_has_unsafe_absolute_path():
"""
Check that we do not allow tarfile member file to specify an unsafe absolute path via
TarInfo.linkname.
"""
if os.path.exists("/tmp/unsafe"):
os.remove("/tmp/unsafe")

with tempfile.TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, "archive.sd-export")
with tarfile.open(archive_path, "w:gz") as archive:
Expand All @@ -122,7 +344,7 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink():
symlink_info = tarfile.TarInfo("symlink")
symlink_info.size = len(content)
symlink_info.type = tarfile.SYMTYPE
symlink_info.linkname = "../../../../../../../../../tmp/traversed2"
symlink_info.linkname = "/tmp/unsafe"
archive.addfile(symlink_info, BytesIO(content))
archive.close()

Expand All @@ -131,7 +353,7 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink():
with pytest.raises(SystemExit):
submission.extract_tarball()

assert not os.path.exists(os.path.join(submission.tmpdir, "symlink"))
assert not os.path.exists('/tmp/unsafe')


def test_exit_gracefully_no_exception(capsys):
Expand Down

0 comments on commit fa2188c

Please sign in to comment.