Skip to content

Commit

Permalink
Added methods to access real files
Browse files Browse the repository at this point in the history
- allow to add really existing files and directory trees to the fake
file system, with the contents read on demand
- see pytest-dev#170
  • Loading branch information
mrbean-bremen committed Apr 1, 2017
1 parent b7fc0b4 commit a52ac94
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ The release versions are PyPi releases.
## Version 3.2 (as yet unreleased)

#### New Features
* Added possibility to add lazily read real files to fake filesystem
* Added the CHANGES.md release notes to the release manifest

#### Fixes
Expand Down
123 changes: 122 additions & 1 deletion fake_filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4186,6 +4186,7 @@ def testDiskUsageOnFileCreation(self):

total_size = 100
self.filesystem.AddMountPoint('mount', total_size)

def create_too_large_file():
with fake_open('!mount!file', 'w') as dest:
dest.write('a' * (total_size + 1))
Expand All @@ -4199,7 +4200,6 @@ def create_too_large_file():

self.assertEqual(total_size, self.filesystem.GetDiskUsage('!mount').used)


def testFileSystemSizeAfterLargeFileCreation(self):
filesystem = fake_filesystem.FakeFilesystem(path_separator='!',
total_size=1024 * 1024 * 1024 * 100)
Expand Down Expand Up @@ -4444,5 +4444,126 @@ def testThatUncPathsAreAutoMounted(self):
self.assertEqual(5, self.filesystem.GetObject('!!foo!bar!bip!bop').st_dev)


class RealFileSystemAccessTest(TestCase):
def setUp(self):
# use the real path separator to work with the real file system
self.filesystem = fake_filesystem.FakeFilesystem()
self.fake_open = fake_filesystem.FakeFileOpen(self.filesystem)

def testAddNonExistingRealFileRaises(self):
nonexisting_path = os.path.join('nonexisting', 'test.txt')
self.assertRaises(OSError, self.filesystem.AddRealFile, nonexisting_path)
self.assertFalse(self.filesystem.Exists(nonexisting_path))

def testAddNonExistingRealDirectoryRaises(self):
nonexisting_path = '/nonexisting'
self.assertRaisesIOError(errno.ENOENT, self.filesystem.AddRealDirectory, nonexisting_path)
self.assertFalse(self.filesystem.Exists(nonexisting_path))

def checkFakeFileStat(self, fake_file, real_file_path):
self.assertTrue(self.filesystem.Exists(real_file_path))
real_stat = os.stat(real_file_path)
self.assertIsNone(fake_file._byte_contents)
self.assertEqual(fake_file.st_size, real_stat.st_size)
self.assertEqual(fake_file.st_ctime, real_stat.st_ctime)
self.assertEqual(fake_file.st_atime, real_stat.st_atime)
self.assertEqual(fake_file.st_mtime, real_stat.st_mtime)
self.assertEqual(fake_file.st_uid, real_stat.st_uid)
self.assertEqual(fake_file.st_gid, real_stat.st_gid)

def checkReadOnlyFile(self, fake_file, real_file_path):
with open(real_file_path, 'rb') as f:
real_contents = f.read()
self.assertEqual(fake_file.byte_contents, real_contents)
self.assertRaisesIOError(errno.EACCES, self.fake_open, real_file_path, 'w')

def checkWritableFile(self, fake_file, real_file_path):
with open(real_file_path, 'rb') as f:
real_contents = f.read()
self.assertEqual(fake_file.byte_contents, real_contents)
with self.fake_open(real_file_path, 'wb') as f:
f.write(b'test')
with open(real_file_path, 'rb') as f:
real_contents1 = f.read()
self.assertEqual(real_contents1, real_contents)
with self.fake_open(real_file_path, 'rb') as f:
fake_contents = f.read()
self.assertEqual(fake_contents, b'test')

def testAddExistingRealFileReadOnly(self):
real_file_path = __file__
fake_file = self.filesystem.AddRealFile(real_file_path)
self.checkFakeFileStat(fake_file, real_file_path)
self.assertEqual(fake_file.st_mode & 0o333, 0)
self.checkReadOnlyFile(fake_file, real_file_path)

def testAddExistingRealFileReadWrite(self):
real_file_path = os.path.realpath(__file__)
fake_file = self.filesystem.AddRealFile(real_file_path, read_only=False)

self.checkFakeFileStat(fake_file, real_file_path)
self.assertEqual(fake_file.st_mode, os.stat(real_file_path).st_mode)
self.checkWritableFile(fake_file, real_file_path)

def testAddExistingRealDirectoryReadOnly(self):
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
fake_dir = self.filesystem.AddRealDirectory(real_dir_path)
self.assertTrue(self.filesystem.Exists(real_dir_path))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_pathlib.py')))

file_path = os.path.join(real_dir_path, 'fake_tempfile.py')
fake_file = self.filesystem.ResolveObject(file_path)
self.checkFakeFileStat(fake_file, file_path)
self.checkReadOnlyFile(fake_file, file_path)

def testAddExistingRealDirectoryTree(self):
real_dir_path = os.path.dirname(__file__)
self.filesystem.AddRealDirectory(real_dir_path)
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))

def testAddExistingRealDirectoryReadWrite(self):
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.AddRealDirectory(real_dir_path, read_only=False)
self.assertTrue(self.filesystem.Exists(real_dir_path))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_pathlib.py')))

file_path = os.path.join(real_dir_path, 'pytest_plugin.py')
fake_file = self.filesystem.ResolveObject(file_path)
self.checkFakeFileStat(fake_file, file_path)
self.checkWritableFile(fake_file, file_path)

def testAddExistingRealPathsReadOnly(self):
real_file_path = os.path.realpath(__file__)
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.AddRealPaths([real_file_path, real_dir_path])

fake_file = self.filesystem.ResolveObject(real_file_path)
self.checkFakeFileStat(fake_file, real_file_path)
self.checkReadOnlyFile(fake_file, real_file_path)

real_file_path = os.path.join(real_dir_path, 'fake_filesystem_shutil.py')
fake_file = self.filesystem.ResolveObject(real_file_path)
self.checkFakeFileStat(fake_file, real_file_path)
self.checkReadOnlyFile(fake_file, real_file_path)

def testAddExistingRealPathsReadWrite(self):
real_file_path = os.path.realpath(__file__)
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.AddRealPaths([real_file_path, real_dir_path], read_only=False)

fake_file = self.filesystem.ResolveObject(real_file_path)
self.checkFakeFileStat(fake_file, real_file_path)
self.checkWritableFile(fake_file, real_file_path)

real_file_path = os.path.join(real_dir_path, 'fake_filesystem_glob.py')
fake_file = self.filesystem.ResolveObject(real_file_path)
self.checkFakeFileStat(fake_file, real_file_path)
self.checkWritableFile(fake_file, real_file_path)


if __name__ == '__main__':
unittest.main()
136 changes: 119 additions & 17 deletions pyfakefs/fake_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@

PERM_READ = 0o400 # Read permission bit.
PERM_WRITE = 0o200 # Write permission bit.
PERM_EXE = 0o100 # Write permission bit.
PERM_EXE = 0o100 # Execute permission bit.
PERM_DEF = 0o777 # Default permission bits.
PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
PERM_ALL = 0o7777 # All permission bits.
Expand Down Expand Up @@ -195,8 +195,8 @@ def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
"""
self.name = name
self.st_mode = st_mode
self.byte_contents = self._EncodeContents(contents, encoding)
self.st_size = len(self.byte_contents) if self.byte_contents else 0
self._byte_contents = self._EncodeContents(contents, encoding)
self.st_size = len(self._byte_contents) if self._byte_contents else 0
self.filesystem = filesystem
self.epoch = 0
self._st_ctime = time.time() # times are accessed through properties
Expand All @@ -210,6 +210,18 @@ def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
self.st_uid = None
self.st_gid = None

# members changed only by _CreateFile() to implement AddRealFile()
self.read_from_real_fs = False
self.file_path = None

@property
def byte_contents(self):
if self._byte_contents is None and self.read_from_real_fs:
path = self.filesystem
with io.open(self.file_path, 'rb') as f:
self._byte_contents = f.read()
return self._byte_contents

@property
def contents(self):
"""Return the byte contents as ACSII string (for testing convenience)."""
Expand Down Expand Up @@ -276,11 +288,11 @@ def SetLargeFileSize(self, st_size):
if self.filesystem:
self.filesystem.ChangeDiskUsage(st_size, self.name, self.st_dev)
self.st_size = st_size
self.byte_contents = None
self._byte_contents = None

def IsLargeFile(self):
"""Return True if this file was initialized with size but no contents."""
return self.byte_contents is None
return self._byte_contents is None and not self.read_from_real_fs

@staticmethod
def _EncodeContents(contents, encoding=None):
Expand All @@ -305,12 +317,12 @@ def _SetInitialContents(self, contents, encoding):
contents = self._EncodeContents(contents, encoding)
st_size = len(contents)

if self.byte_contents:
if self._byte_contents:
self.SetSize(0)
current_size = self.st_size or 0
if self.filesystem:
self.filesystem.ChangeDiskUsage(st_size - current_size, self.name, self.st_dev)
self.byte_contents = contents
self._byte_contents = contents
self.st_size = st_size
self.epoch += 1

Expand Down Expand Up @@ -358,15 +370,15 @@ def SetSize(self, st_size):
current_size = self.st_size or 0
if self.filesystem:
self.filesystem.ChangeDiskUsage(st_size - current_size, self.name, self.st_dev)
if self.byte_contents:
if self._byte_contents:
if st_size < current_size:
self.byte_contents = self.byte_contents[:st_size]
self._byte_contents = self._byte_contents[:st_size]
else:
if sys.version_info < (3, 0):
self.byte_contents = '%s%s' % (
self.byte_contents, '\0' * (st_size - current_size))
self._byte_contents = '%s%s' % (
self._byte_contents, '\0' * (st_size - current_size))
else:
self.byte_contents += b'\0' * (st_size - current_size)
self._byte_contents += b'\0' * (st_size - current_size)
self.st_size = st_size
self.epoch += 1

Expand Down Expand Up @@ -823,7 +835,7 @@ def GetOpenFile(self, file_des):
if not isinstance(file_des, int):
raise TypeError('an integer is required')
if (file_des >= len(self.open_files) or
self.open_files[file_des] is None):
self.open_files[file_des] is None):
raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
return self.open_files[file_des]

Expand Down Expand Up @@ -886,7 +898,7 @@ def CollapsePath(self, path):
continue
if component == '..':
if collapsed_path_components and (
collapsed_path_components[-1] != '..'):
collapsed_path_components[-1] != '..'):
# Remove an up-reference: directory/..
collapsed_path_components.pop()
continue
Expand Down Expand Up @@ -918,7 +930,7 @@ def NormalizeCase(self, path):
for component in path_components:
dir_name, current_dir = self._DirectoryContent(current_dir, component)
if current_dir is None or (
current_dir.byte_contents is None and current_dir.st_size == 0):
current_dir._byte_contents is None and current_dir.st_size == 0):
return path
normalized_components.append(dir_name)
normalized_path = self.path_separator.join(normalized_components)
Expand Down Expand Up @@ -1003,7 +1015,7 @@ def SplitDrive(self, path):
# UNC path handling is here since Python 2.7.8, back-ported from Python 3
if sys.version_info >= (2, 7, 8):
if (path[0:2] == self.path_separator * 2) and (
path[2:3] != self.path_separator):
path[2:3] != self.path_separator):
# UNC path handling - splits off the mount point instead of the drive
sep_index = path.find(self.path_separator, 2)
if sep_index == -1:
Expand Down Expand Up @@ -1583,7 +1595,7 @@ def CreateDirectory(self, directory_path, perm_bits=PERM_DEF):

def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
contents='', st_size=None, create_missing_dirs=True,
apply_umask=False, encoding=None):
apply_umask=False, encoding=None, read_from_real_fs=False):
"""Create file_path, including all the parent directories along the way.
This helper method can be used to set up tests more easily.
Expand All @@ -1597,6 +1609,8 @@ def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
apply_umask: whether or not the current umask must be applied on st_mode.
encoding: if contents is a unicode string, the encoding used for serialization.
New in pyfakefs 2.9.
read_from_real_fs: if True, the contents are read from the real file system on demand.
New in pyfakefs 3.2.
Returns:
the newly created FakeFile object.
Expand All @@ -1605,6 +1619,84 @@ def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
IOError: if the file already exists.
IOError: if the containing directory is required and missing.
"""
return self._CreateFile(file_path, st_mode, contents, st_size, create_missing_dirs, apply_umask, encoding)

def AddRealFile(self, file_path, read_only=True):
"""Create file_path, including all the parent directories along the way, for a file
existing in the real file system without reading the contents, which will be read on demand.
New in pyfakefs 3.2.
Args:
file_path: path to the existing file.
read_only: if set, the file is treated as read-only, e.g. a write access raises an exception;
otherwise, writing to the file changes the fake file only as usually.
Returns:
the newly created FakeFile object.
Raises:
OSError: if the file does not exist in the real file system.
OSError: if the file already exists in the fake file system.
"""
real_stat = os.stat(file_path)
# for read-only mode, remove the write/executable permission bits
mode = real_stat.st_mode & 0o777444 if read_only else real_stat.st_mode
return self._CreateFile(file_path, contents=None, read_from_real_fs=True,
st_mode=mode, real_stat=real_stat)

def AddRealDirectory(self, dir_path, read_only=True):
"""Create fake directory for the existing directory at path, and entries for all contained
files in the real file system.
New in pyfakefs 3.2.
Args:
dir_path: path to the existing directory.
read_only: if set, all files under the directory are treated as read-only,
e.g. a write access raises an exception;
otherwise, writing to the files changes the fake files only as usually.
Returns:
the newly created FakeDirectory object.
Raises:
OSError: if the directory does not exist in the real file system.
OSError: if the directory already exists in the fake file system.
"""
if not os.path.exists(dir_path):
raise IOError(errno.ENOENT, 'No such directory', dir_path)
self.CreateDirectory(dir_path)
for base, _, files in os.walk(dir_path):
for fileEntry in files:
self.AddRealFile(os.path.join(base, fileEntry), read_only)

def AddRealPaths(self, path_list, read_only=True):
"""Convenience method to add several files and directories from the real file system
in the fake file system. See `AddRealPath` for more information.
New in pyfakefs 3.2.
"""
for path in path_list:
if os.path.isdir(path):
self.AddRealDirectory(path, read_only)
else:
self.AddRealFile(path, read_only)

def _CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
contents='', st_size=None, create_missing_dirs=True,
apply_umask=False, encoding=None, read_from_real_fs=False, real_stat=None):
"""Create file_path, including all the parent directories along the way.
Args:
file_path: path to the file to create.
st_mode: the stat.S_IF constant representing the file type.
contents: the contents of the file.
st_size: file size; only valid if contents not given.
create_missing_dirs: if True, auto create missing directories.
apply_umask: whether or not the current umask must be applied on st_mode.
encoding: if contents is a unicode string, the encoding used for serialization.
New in pyfakefs 2.9.
read_from_real_fs: if True, the contents are reaf from the real file system on demand.
New in pyfakefs 3.2.
"""
file_path = self.NormalizePath(file_path)
if self.Exists(file_path):
raise IOError(errno.EEXIST,
Expand All @@ -1623,6 +1715,16 @@ def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
if apply_umask:
st_mode &= ~self.umask
file_object = FakeFile(new_file, st_mode, filesystem=self)
if read_from_real_fs:
file_object.st_ctime = real_stat.st_ctime
file_object.st_atime = real_stat.st_atime
file_object.st_mtime = real_stat.st_mtime
file_object.st_gid = real_stat.st_gid
file_object.st_uid = real_stat.st_uid
file_object.st_size = real_stat.st_size
file_object.read_from_real_fs = True
file_object.file_path = file_path

self.last_ino += 1
file_object.SetIno(self.last_ino)
self.AddObject(parent_directory, file_object)
Expand Down

0 comments on commit a52ac94

Please sign in to comment.