Skip to content

Commit

Permalink
bpo-38453: Ensure ntpath.realpath correctly resolves relative paths (p…
Browse files Browse the repository at this point in the history
…ythonGH-16967)

Ensure isabs() is always True for \\?\ prefixed paths
Avoid unnecessary usage of readlink() to avoid resolving broken links incorrectly
Ensure shutil tests run in test directory
  • Loading branch information
zooba authored and Jake Taylor committed Dec 5, 2019
1 parent e519e91 commit 3810001
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 45 deletions.
49 changes: 41 additions & 8 deletions Lib/ntpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ def normcase(s):
def isabs(s):
"""Test whether a path is absolute"""
s = os.fspath(s)
# Paths beginning with \\?\ are always absolute, but do not
# necessarily contain a drive.
if isinstance(s, bytes):
if s.replace(b'/', b'\\').startswith(b'\\\\?\\'):
return True
else:
if s.replace('/', '\\').startswith('\\\\?\\'):
return True
s = splitdrive(s)[1]
return len(s) > 0 and s[0] in _get_bothseps(s)

Expand Down Expand Up @@ -526,10 +534,7 @@ def abspath(path):
# realpath is a no-op on systems without _getfinalpathname support.
realpath = abspath
else:
def _readlink_deep(path, seen=None):
if seen is None:
seen = set()

def _readlink_deep(path):
# These error codes indicate that we should stop reading links and
# return the path we currently have.
# 1: ERROR_INVALID_FUNCTION
Expand All @@ -546,10 +551,22 @@ def _readlink_deep(path, seen=None):
# 4393: ERROR_REPARSE_TAG_INVALID
allowed_winerror = 1, 2, 3, 5, 21, 32, 50, 67, 87, 4390, 4392, 4393

seen = set()
while normcase(path) not in seen:
seen.add(normcase(path))
try:
old_path = path
path = _nt_readlink(path)
# Links may be relative, so resolve them against their
# own location
if not isabs(path):
# If it's something other than a symlink, we don't know
# what it's actually going to be resolved against, so
# just return the old path.
if not islink(old_path):
path = old_path
break
path = normpath(join(dirname(old_path), path))
except OSError as ex:
if ex.winerror in allowed_winerror:
break
Expand Down Expand Up @@ -579,23 +596,31 @@ def _getfinalpathname_nonstrict(path):
# Non-strict algorithm is to find as much of the target directory
# as we can and join the rest.
tail = ''
seen = set()
while path:
try:
path = _readlink_deep(path, seen)
path = _getfinalpathname(path)
return join(path, tail) if tail else path
except OSError as ex:
if ex.winerror not in allowed_winerror:
raise
try:
# The OS could not resolve this path fully, so we attempt
# to follow the link ourselves. If we succeed, join the tail
# and return.
new_path = _readlink_deep(path)
if new_path != path:
return join(new_path, tail) if tail else new_path
except OSError:
# If we fail to readlink(), let's keep traversing
pass
path, name = split(path)
# TODO (bpo-38186): Request the real file name from the directory
# entry using FindFirstFileW. For now, we will return the path
# as best we have it
if path and not name:
return abspath(path + tail)
return path + tail
tail = join(name, tail) if tail else name
return abspath(tail)
return tail

def realpath(path):
path = normpath(path)
Expand All @@ -604,12 +629,20 @@ def realpath(path):
unc_prefix = b'\\\\?\\UNC\\'
new_unc_prefix = b'\\\\'
cwd = os.getcwdb()
# bpo-38081: Special case for realpath(b'nul')
if normcase(path) == normcase(os.fsencode(devnull)):
return b'\\\\.\\NUL'
else:
prefix = '\\\\?\\'
unc_prefix = '\\\\?\\UNC\\'
new_unc_prefix = '\\\\'
cwd = os.getcwd()
# bpo-38081: Special case for realpath('nul')
if normcase(path) == normcase(devnull):
return '\\\\.\\NUL'
had_prefix = path.startswith(prefix)
if not had_prefix and not isabs(path):
path = join(cwd, path)
try:
path = _getfinalpathname(path)
initial_winerror = 0
Expand Down
52 changes: 40 additions & 12 deletions Lib/test/test_ntpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,16 @@ def test_realpath_broken_symlinks(self):
ABSTFN + r"\missing")
self.assertPathEqual(ntpath.realpath(r"broken\foo"),
ABSTFN + r"\missing\foo")
# bpo-38453: We no longer recursively resolve segments of relative
# symlinks that the OS cannot resolve.
self.assertPathEqual(ntpath.realpath(r"broken1"),
ABSTFN + r"\missing\bar")
ABSTFN + r"\broken\bar")
self.assertPathEqual(ntpath.realpath(r"broken1\baz"),
ABSTFN + r"\missing\bar\baz")
ABSTFN + r"\broken\bar\baz")
self.assertPathEqual(ntpath.realpath("broken2"),
ABSTFN + r"\missing")
ABSTFN + r"\self\self\missing")
self.assertPathEqual(ntpath.realpath("broken3"),
ABSTFN + r"\missing")
ABSTFN + r"\subdir\parent\subdir\parent\missing")
self.assertPathEqual(ntpath.realpath("broken4"),
ABSTFN + r"\missing")
self.assertPathEqual(ntpath.realpath("broken5"),
Expand All @@ -304,13 +306,13 @@ def test_realpath_broken_symlinks(self):
self.assertPathEqual(ntpath.realpath(rb"broken\foo"),
os.fsencode(ABSTFN + r"\missing\foo"))
self.assertPathEqual(ntpath.realpath(rb"broken1"),
os.fsencode(ABSTFN + r"\missing\bar"))
os.fsencode(ABSTFN + r"\broken\bar"))
self.assertPathEqual(ntpath.realpath(rb"broken1\baz"),
os.fsencode(ABSTFN + r"\missing\bar\baz"))
os.fsencode(ABSTFN + r"\broken\bar\baz"))
self.assertPathEqual(ntpath.realpath(b"broken2"),
os.fsencode(ABSTFN + r"\missing"))
os.fsencode(ABSTFN + r"\self\self\missing"))
self.assertPathEqual(ntpath.realpath(rb"broken3"),
os.fsencode(ABSTFN + r"\missing"))
os.fsencode(ABSTFN + r"\subdir\parent\subdir\parent\missing"))
self.assertPathEqual(ntpath.realpath(b"broken4"),
os.fsencode(ABSTFN + r"\missing"))
self.assertPathEqual(ntpath.realpath(b"broken5"),
Expand All @@ -319,8 +321,8 @@ def test_realpath_broken_symlinks(self):
@support.skip_unless_symlink
@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_symlink_loops(self):
# Bug #930024, return the path unchanged if we get into an infinite
# symlink loop.
# Symlink loops are non-deterministic as to which path is returned, but
# it will always be the fully resolved path of one member of the cycle
ABSTFN = ntpath.abspath(support.TESTFN)
self.addCleanup(support.unlink, ABSTFN)
self.addCleanup(support.unlink, ABSTFN + "1")
Expand All @@ -332,8 +334,6 @@ def test_realpath_symlink_loops(self):
os.symlink(ABSTFN, ABSTFN)
self.assertPathEqual(ntpath.realpath(ABSTFN), ABSTFN)

# cycles are non-deterministic as to which path is returned, but
# it will always be the fully resolved path of one member of the cycle
os.symlink(ABSTFN + "1", ABSTFN + "2")
os.symlink(ABSTFN + "2", ABSTFN + "1")
expected = (ABSTFN + "1", ABSTFN + "2")
Expand Down Expand Up @@ -402,6 +402,34 @@ def test_realpath_symlink_prefix(self):
def test_realpath_nul(self):
tester("ntpath.realpath('NUL')", r'\\.\NUL')

@unittest.skipUnless(HAVE_GETFINALPATHNAME, 'need _getfinalpathname')
def test_realpath_cwd(self):
ABSTFN = ntpath.abspath(support.TESTFN)

support.unlink(ABSTFN)
support.rmtree(ABSTFN)
os.mkdir(ABSTFN)
self.addCleanup(support.rmtree, ABSTFN)

test_dir_long = ntpath.join(ABSTFN, "MyVeryLongDirectoryName")
test_dir_short = ntpath.join(ABSTFN, "MYVERY~1")
test_file_long = ntpath.join(test_dir_long, "file.txt")
test_file_short = ntpath.join(test_dir_short, "file.txt")

os.mkdir(test_dir_long)

with open(test_file_long, "wb") as f:
f.write(b"content")

self.assertPathEqual(test_file_long, ntpath.realpath(test_file_short))

with support.change_cwd(test_dir_long):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
with support.change_cwd(test_dir_long.lower()):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))
with support.change_cwd(test_dir_short):
self.assertPathEqual(test_file_long, ntpath.realpath("file.txt"))

def test_expandvars(self):
with support.EnvironmentVarGuard() as env:
env.clear()
Expand Down
49 changes: 24 additions & 25 deletions Lib/test/test_shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ def supports_file2file_sendfile():
srcname = None
dstname = None
try:
with tempfile.NamedTemporaryFile("wb", delete=False) as f:
with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
srcname = f.name
f.write(b"0123456789")

with open(srcname, "rb") as src:
with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
dstname = dst.name
infd = src.fileno()
outfd = dst.fileno()
Expand Down Expand Up @@ -162,12 +162,12 @@ def _maxdataOK():

class BaseTest:

def mkdtemp(self):
def mkdtemp(self, prefix=None):
"""Create a temporary directory that will be cleaned up.
Returns the path of the directory.
"""
d = tempfile.mkdtemp()
d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
self.addCleanup(support.rmtree, d)
return d

Expand Down Expand Up @@ -231,6 +231,7 @@ def test_rmtree_fails_on_junctions(self):
os.mkdir(dir_)
link = os.path.join(tmp, 'link')
_winapi.CreateJunction(dir_, link)
self.addCleanup(support.unlink, link)
self.assertRaises(OSError, shutil.rmtree, link)
self.assertTrue(os.path.exists(dir_))
self.assertTrue(os.path.lexists(link))
Expand Down Expand Up @@ -267,7 +268,7 @@ def test_rmtree_works_on_junctions(self):

def test_rmtree_errors(self):
# filename is guaranteed not to exist
filename = tempfile.mktemp()
filename = tempfile.mktemp(dir=self.mkdtemp())
self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
# test that ignore_errors option is honored
shutil.rmtree(filename, ignore_errors=True)
Expand Down Expand Up @@ -401,7 +402,7 @@ def _raiser(*args, **kwargs):

def test_rmtree_dont_delete_file(self):
# When called on a file instead of a directory, don't delete it.
handle, path = tempfile.mkstemp()
handle, path = tempfile.mkstemp(dir=self.mkdtemp())
os.close(handle)
self.assertRaises(NotADirectoryError, shutil.rmtree, path)
os.remove(path)
Expand Down Expand Up @@ -438,8 +439,8 @@ def test_rmtree_on_junction(self):
class TestCopyTree(BaseTest, unittest.TestCase):

def test_copytree_simple(self):
src_dir = tempfile.mkdtemp()
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
self.addCleanup(shutil.rmtree, src_dir)
self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
write_file((src_dir, 'test.txt'), '123')
Expand All @@ -457,8 +458,8 @@ def test_copytree_simple(self):
self.assertEqual(actual, '456')

def test_copytree_dirs_exist_ok(self):
src_dir = tempfile.mkdtemp()
dst_dir = tempfile.mkdtemp()
src_dir = self.mkdtemp()
dst_dir = self.mkdtemp()
self.addCleanup(shutil.rmtree, src_dir)
self.addCleanup(shutil.rmtree, dst_dir)

Expand Down Expand Up @@ -517,9 +518,9 @@ def test_copytree_with_exclude(self):
# creating data
join = os.path.join
exists = os.path.exists
src_dir = tempfile.mkdtemp()
src_dir = self.mkdtemp()
try:
dst_dir = join(tempfile.mkdtemp(), 'destination')
dst_dir = join(self.mkdtemp(), 'destination')
write_file((src_dir, 'test.txt'), '123')
write_file((src_dir, 'test.tmp'), '123')
os.mkdir(join(src_dir, 'test_dir'))
Expand Down Expand Up @@ -579,7 +580,7 @@ def _filter(src, names):
shutil.rmtree(os.path.dirname(dst_dir))

def test_copytree_retains_permissions(self):
tmp_dir = tempfile.mkdtemp()
tmp_dir = self.mkdtemp()
src_dir = os.path.join(tmp_dir, 'source')
os.mkdir(src_dir)
dst_dir = os.path.join(tmp_dir, 'destination')
Expand All @@ -591,6 +592,7 @@ def test_copytree_retains_permissions(self):
write_file((src_dir, 'restrictive.txt'), '456')
os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
self.addCleanup(support.rmtree, restrictive_subdir)
os.chmod(restrictive_subdir, 0o600)

shutil.copytree(src_dir, dst_dir)
Expand All @@ -609,8 +611,8 @@ def test_copytree_winerror(self, mock_patch):
# When copying to VFAT, copystat() raises OSError. On Windows, the
# exception object has a meaningful 'winerror' attribute, but not
# on other operating systems. Do not assume 'winerror' is set.
src_dir = tempfile.mkdtemp()
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
self.addCleanup(shutil.rmtree, src_dir)
self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))

Expand All @@ -628,10 +630,8 @@ def custom_cpfun(a, b):
self.assertEqual(b, os.path.join(dst, 'foo'))

flag = []
src = tempfile.mkdtemp()
self.addCleanup(support.rmtree, src)
dst = tempfile.mktemp()
self.addCleanup(support.rmtree, dst)
src = self.mkdtemp()
dst = tempfile.mktemp(dir=self.mkdtemp())
with open(os.path.join(src, 'foo'), 'w') as f:
f.close()
shutil.copytree(src, dst, copy_function=custom_cpfun)
Expand Down Expand Up @@ -1624,8 +1624,7 @@ def check_chown(path, uid=None, gid=None):
class TestWhich(BaseTest, unittest.TestCase):

def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
self.addCleanup(support.rmtree, self.temp_dir)
self.temp_dir = self.mkdtemp(prefix="Tmp")
# Give the temp_file an ".exe" suffix for all.
# It's needed on Windows and not harmful on other platforms.
self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Expand Down Expand Up @@ -1857,7 +1856,7 @@ def test_move_file_to_dir_other_fs(self):

def test_move_dir(self):
# Move a dir to another location on the same filesystem.
dst_dir = tempfile.mktemp()
dst_dir = tempfile.mktemp(dir=self.mkdtemp())
try:
self._check_move_dir(self.src_dir, dst_dir, dst_dir)
finally:
Expand Down Expand Up @@ -2156,7 +2155,7 @@ def test_win_impl(self):

# If file size < 1 MiB memoryview() length must be equal to
# the actual file size.
with tempfile.NamedTemporaryFile(delete=False) as f:
with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
f.write(b'foo')
fname = f.name
self.addCleanup(support.unlink, fname)
Expand All @@ -2165,7 +2164,7 @@ def test_win_impl(self):
self.assertEqual(m.call_args[0][2], 3)

# Empty files should not rely on readinto() variant.
with tempfile.NamedTemporaryFile(delete=False) as f:
with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
pass
fname = f.name
self.addCleanup(support.unlink, fname)
Expand Down Expand Up @@ -2231,7 +2230,7 @@ def test_same_file(self):
self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)

def test_non_existent_src(self):
name = tempfile.mktemp()
name = tempfile.mktemp(dir=os.getcwd())
with self.assertRaises(FileNotFoundError) as cm:
shutil.copyfile(name, "new")
self.assertEqual(cm.exception.filename, name)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure ntpath.realpath() correctly resolves relative paths.

0 comments on commit 3810001

Please sign in to comment.