From e1ca29457bf74d6c295a207605aa28b0368e4e71 Mon Sep 17 00:00:00 2001 From: mrbean-bremen Date: Sat, 1 Apr 2017 16:25:09 +0200 Subject: [PATCH] Added methods to access real files - allow to add really existing files and directory trees to the fake file system, with the contents read on demand - see #170 --- CHANGES.md | 1 + fake_filesystem_test.py | 133 ++++++++++++++++++++++++++++- pyfakefs/fake_filesystem.py | 164 ++++++++++++++++++++++++++++++------ 3 files changed, 270 insertions(+), 28 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 37550b1a..47e99662 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ The release versions are PyPi releases. ## Version 3.2 (as yet unreleased) #### New Features + * Added possibility to add lazily read real files to fake filesystem * Added the CHANGES.md release notes to the release manifest #### Fixes diff --git a/fake_filesystem_test.py b/fake_filesystem_test.py index 6dd9dc5f..74252c60 100755 --- a/fake_filesystem_test.py +++ b/fake_filesystem_test.py @@ -4186,6 +4186,7 @@ def testDiskUsageOnFileCreation(self): total_size = 100 self.filesystem.AddMountPoint('mount', total_size) + def create_too_large_file(): with fake_open('!mount!file', 'w') as dest: dest.write('a' * (total_size + 1)) @@ -4199,7 +4200,6 @@ def create_too_large_file(): self.assertEqual(total_size, self.filesystem.GetDiskUsage('!mount').used) - def testFileSystemSizeAfterLargeFileCreation(self): filesystem = fake_filesystem.FakeFilesystem(path_separator='!', total_size=1024 * 1024 * 1024 * 100) @@ -4444,5 +4444,136 @@ def testThatUncPathsAreAutoMounted(self): self.assertEqual(5, self.filesystem.GetObject('!!foo!bar!bip!bop').st_dev) +class RealFileSystemAccessTest(TestCase): + def setUp(self): + # use the real path separator to work with the real file system + self.filesystem = fake_filesystem.FakeFilesystem() + self.fake_open = fake_filesystem.FakeFileOpen(self.filesystem) + + def testAddNonExistingRealFileRaises(self): + nonexisting_path = os.path.join('nonexisting', 'test.txt') + self.assertRaises(OSError, self.filesystem.AddRealFile, nonexisting_path) + self.assertFalse(self.filesystem.Exists(nonexisting_path)) + + def testAddNonExistingRealDirectoryRaises(self): + nonexisting_path = '/nonexisting' + self.assertRaisesIOError(errno.ENOENT, self.filesystem.AddRealDirectory, nonexisting_path) + self.assertFalse(self.filesystem.Exists(nonexisting_path)) + + def testExistingFakeFileRaises(self): + real_file_path = __file__ + self.filesystem.CreateFile(real_file_path) + self.assertRaisesIOError(errno.EEXIST, self.filesystem.AddRealFile, real_file_path) + + def testExistingFakeDirectoryRaises(self): + real_dir_path = os.path.dirname(__file__) + self.filesystem.CreateDirectory(real_dir_path) + self.assertRaisesOSError(errno.EEXIST, self.filesystem.AddRealDirectory, real_dir_path) + + def checkFakeFileStat(self, fake_file, real_file_path): + self.assertTrue(self.filesystem.Exists(real_file_path)) + real_stat = os.stat(real_file_path) + self.assertIsNone(fake_file._byte_contents) + self.assertEqual(fake_file.st_size, real_stat.st_size) + self.assertEqual(fake_file.st_ctime, real_stat.st_ctime) + self.assertEqual(fake_file.st_atime, real_stat.st_atime) + self.assertEqual(fake_file.st_mtime, real_stat.st_mtime) + self.assertEqual(fake_file.st_uid, real_stat.st_uid) + self.assertEqual(fake_file.st_gid, real_stat.st_gid) + + def checkReadOnlyFile(self, fake_file, real_file_path): + with open(real_file_path, 'rb') as f: + real_contents = f.read() + self.assertEqual(fake_file.byte_contents, real_contents) + self.assertRaisesIOError(errno.EACCES, self.fake_open, real_file_path, 'w') + + def checkWritableFile(self, fake_file, real_file_path): + with open(real_file_path, 'rb') as f: + real_contents = f.read() + self.assertEqual(fake_file.byte_contents, real_contents) + with self.fake_open(real_file_path, 'wb') as f: + f.write(b'test') + with open(real_file_path, 'rb') as f: + real_contents1 = f.read() + self.assertEqual(real_contents1, real_contents) + with self.fake_open(real_file_path, 'rb') as f: + fake_contents = f.read() + self.assertEqual(fake_contents, b'test') + + def testAddExistingRealFileReadOnly(self): + real_file_path = __file__ + fake_file = self.filesystem.AddRealFile(real_file_path) + self.checkFakeFileStat(fake_file, real_file_path) + self.assertEqual(fake_file.st_mode & 0o333, 0) + self.checkReadOnlyFile(fake_file, real_file_path) + + def testAddExistingRealFileReadWrite(self): + real_file_path = os.path.realpath(__file__) + fake_file = self.filesystem.AddRealFile(real_file_path, read_only=False) + + self.checkFakeFileStat(fake_file, real_file_path) + self.assertEqual(fake_file.st_mode, os.stat(real_file_path).st_mode) + self.checkWritableFile(fake_file, real_file_path) + + def testAddExistingRealDirectoryReadOnly(self): + real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs') + fake_dir = self.filesystem.AddRealDirectory(real_dir_path) + self.assertTrue(self.filesystem.Exists(real_dir_path)) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem.py'))) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_pathlib.py'))) + + file_path = os.path.join(real_dir_path, 'fake_tempfile.py') + fake_file = self.filesystem.ResolveObject(file_path) + self.checkFakeFileStat(fake_file, file_path) + self.checkReadOnlyFile(fake_file, file_path) + + def testAddExistingRealDirectoryTree(self): + real_dir_path = os.path.dirname(__file__) + self.filesystem.AddRealDirectory(real_dir_path) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py'))) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py'))) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py'))) + + def testAddExistingRealDirectoryReadWrite(self): + real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs') + self.filesystem.AddRealDirectory(real_dir_path, read_only=False) + self.assertTrue(self.filesystem.Exists(real_dir_path)) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem.py'))) + self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_pathlib.py'))) + + file_path = os.path.join(real_dir_path, 'pytest_plugin.py') + fake_file = self.filesystem.ResolveObject(file_path) + self.checkFakeFileStat(fake_file, file_path) + self.checkWritableFile(fake_file, file_path) + + def testAddExistingRealPathsReadOnly(self): + real_file_path = os.path.realpath(__file__) + real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs') + self.filesystem.AddRealPaths([real_file_path, real_dir_path]) + + fake_file = self.filesystem.ResolveObject(real_file_path) + self.checkFakeFileStat(fake_file, real_file_path) + self.checkReadOnlyFile(fake_file, real_file_path) + + real_file_path = os.path.join(real_dir_path, 'fake_filesystem_shutil.py') + fake_file = self.filesystem.ResolveObject(real_file_path) + self.checkFakeFileStat(fake_file, real_file_path) + self.checkReadOnlyFile(fake_file, real_file_path) + + def testAddExistingRealPathsReadWrite(self): + real_file_path = os.path.realpath(__file__) + real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs') + self.filesystem.AddRealPaths([real_file_path, real_dir_path], read_only=False) + + fake_file = self.filesystem.ResolveObject(real_file_path) + self.checkFakeFileStat(fake_file, real_file_path) + self.checkWritableFile(fake_file, real_file_path) + + real_file_path = os.path.join(real_dir_path, 'fake_filesystem_glob.py') + fake_file = self.filesystem.ResolveObject(real_file_path) + self.checkFakeFileStat(fake_file, real_file_path) + self.checkWritableFile(fake_file, real_file_path) + + if __name__ == '__main__': unittest.main() diff --git a/pyfakefs/fake_filesystem.py b/pyfakefs/fake_filesystem.py index ca1613b4..318e7794 100644 --- a/pyfakefs/fake_filesystem.py +++ b/pyfakefs/fake_filesystem.py @@ -112,7 +112,7 @@ PERM_READ = 0o400 # Read permission bit. PERM_WRITE = 0o200 # Write permission bit. -PERM_EXE = 0o100 # Write permission bit. +PERM_EXE = 0o100 # Execute permission bit. PERM_DEF = 0o777 # Default permission bits. PERM_DEF_FILE = 0o666 # Default permission bits (regular file) PERM_ALL = 0o7777 # All permission bits. @@ -195,8 +195,8 @@ def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE, """ self.name = name self.st_mode = st_mode - self.byte_contents = self._EncodeContents(contents, encoding) - self.st_size = len(self.byte_contents) if self.byte_contents else 0 + self._byte_contents = self._EncodeContents(contents, encoding) + self.st_size = len(self._byte_contents) if self._byte_contents else 0 self.filesystem = filesystem self.epoch = 0 self._st_ctime = time.time() # times are accessed through properties @@ -210,6 +210,17 @@ def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE, self.st_uid = None self.st_gid = None + # members changed only by _CreateFile() to implement AddRealFile() + self.read_from_real_fs = False + self.file_path = None + + @property + def byte_contents(self): + if self._byte_contents is None and self.read_from_real_fs: + with io.open(self.file_path, 'rb') as f: + self._byte_contents = f.read() + return self._byte_contents + @property def contents(self): """Return the byte contents as ACSII string (for testing convenience).""" @@ -276,11 +287,11 @@ def SetLargeFileSize(self, st_size): if self.filesystem: self.filesystem.ChangeDiskUsage(st_size, self.name, self.st_dev) self.st_size = st_size - self.byte_contents = None + self._byte_contents = None def IsLargeFile(self): """Return True if this file was initialized with size but no contents.""" - return self.byte_contents is None + return self._byte_contents is None and not self.read_from_real_fs @staticmethod def _EncodeContents(contents, encoding=None): @@ -305,12 +316,12 @@ def _SetInitialContents(self, contents, encoding): contents = self._EncodeContents(contents, encoding) st_size = len(contents) - if self.byte_contents: + if self._byte_contents: self.SetSize(0) current_size = self.st_size or 0 if self.filesystem: self.filesystem.ChangeDiskUsage(st_size - current_size, self.name, self.st_dev) - self.byte_contents = contents + self._byte_contents = contents self.st_size = st_size self.epoch += 1 @@ -358,15 +369,15 @@ def SetSize(self, st_size): current_size = self.st_size or 0 if self.filesystem: self.filesystem.ChangeDiskUsage(st_size - current_size, self.name, self.st_dev) - if self.byte_contents: + if self._byte_contents: if st_size < current_size: - self.byte_contents = self.byte_contents[:st_size] + self._byte_contents = self._byte_contents[:st_size] else: if sys.version_info < (3, 0): - self.byte_contents = '%s%s' % ( - self.byte_contents, '\0' * (st_size - current_size)) + self._byte_contents = '%s%s' % ( + self._byte_contents, '\0' * (st_size - current_size)) else: - self.byte_contents += b'\0' * (st_size - current_size) + self._byte_contents += b'\0' * (st_size - current_size) self.st_size = st_size self.epoch += 1 @@ -823,7 +834,7 @@ def GetOpenFile(self, file_des): if not isinstance(file_des, int): raise TypeError('an integer is required') if (file_des >= len(self.open_files) or - self.open_files[file_des] is None): + self.open_files[file_des] is None): raise OSError(errno.EBADF, 'Bad file descriptor', file_des) return self.open_files[file_des] @@ -886,7 +897,7 @@ def CollapsePath(self, path): continue if component == '..': if collapsed_path_components and ( - collapsed_path_components[-1] != '..'): + collapsed_path_components[-1] != '..'): # Remove an up-reference: directory/.. collapsed_path_components.pop() continue @@ -918,7 +929,7 @@ def NormalizeCase(self, path): for component in path_components: dir_name, current_dir = self._DirectoryContent(current_dir, component) if current_dir is None or ( - current_dir.byte_contents is None and current_dir.st_size == 0): + current_dir._byte_contents is None and current_dir.st_size == 0): return path normalized_components.append(dir_name) normalized_path = self.path_separator.join(normalized_components) @@ -1003,7 +1014,7 @@ def SplitDrive(self, path): # UNC path handling is here since Python 2.7.8, back-ported from Python 3 if sys.version_info >= (2, 7, 8): if (path[0:2] == self.path_separator * 2) and ( - path[2:3] != self.path_separator): + path[2:3] != self.path_separator): # UNC path handling - splits off the mount point instead of the drive sep_index = path.find(self.path_separator, 2) if sep_index == -1: @@ -1589,21 +1600,110 @@ def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE, This helper method can be used to set up tests more easily. Args: - file_path: path to the file to create. - st_mode: the stat.S_IF constant representing the file type. - contents: the contents of the file. - st_size: file size; only valid if contents not given. - create_missing_dirs: if True, auto create missing directories. - apply_umask: whether or not the current umask must be applied on st_mode. - encoding: if contents is a unicode string, the encoding used for serialization. - New in pyfakefs 2.9. + file_path: path to the file to create. + st_mode: the stat.S_IF constant representing the file type. + contents: the contents of the file. + st_size: file size; only valid if contents not given. + create_missing_dirs: if True, auto create missing directories. + apply_umask: whether or not the current umask must be applied on st_mode. + encoding: if contents is a unicode string, the encoding used for serialization. + New in pyfakefs 2.9. Returns: - the newly created FakeFile object. + the newly created FakeFile object. + + Raises: + IOError: if the file already exists. + IOError: if the containing directory is required and missing. + """ + return self._CreateFile(file_path, st_mode, contents, st_size, create_missing_dirs, apply_umask, encoding) + + def AddRealFile(self, file_path, read_only=True): + """Create file_path, including all the parent directories along the way, for a file + existing in the real file system without reading the contents, which will be read on demand. + New in pyfakefs 3.2. + + Args: + file_path: path to the existing file. + read_only: if set, the file is treated as read-only, e.g. a write access raises an exception; + otherwise, writing to the file changes the fake file only as usually. + + Returns: + the newly created FakeFile object. + + Raises: + OSError: if the file does not exist in the real file system. + IOError: if the file already exists in the fake file system. + """ + real_stat = os.stat(file_path) + # for read-only mode, remove the write/executable permission bits + mode = real_stat.st_mode & 0o777444 if read_only else real_stat.st_mode + return self._CreateFile(file_path, contents=None, read_from_real_fs=True, + st_mode=mode, real_stat=real_stat) + + def AddRealDirectory(self, dir_path, read_only=True): + """Create fake directory for the existing directory at path, and entries for all contained + files in the real file system. + New in pyfakefs 3.2. + + Args: + dir_path: path to the existing directory. + read_only: if set, all files under the directory are treated as read-only, + e.g. a write access raises an exception; + otherwise, writing to the files changes the fake files only as usually. + + Returns: + the newly created FakeDirectory object. Raises: - IOError: if the file already exists. - IOError: if the containing directory is required and missing. + OSError: if the directory does not exist in the real file system. + IOError: if the directory already exists in the fake file system. + """ + if not os.path.exists(dir_path): + raise IOError(errno.ENOENT, 'No such directory', dir_path) + self.CreateDirectory(dir_path) + for base, _, files in os.walk(dir_path): + for fileEntry in files: + self.AddRealFile(os.path.join(base, fileEntry), read_only) + + def AddRealPaths(self, path_list, read_only=True): + """Convenience method to add several files and directories from the real file system + in the fake file system. See `AddRealFile()` and `AddRealDirectory()`. + New in pyfakefs 3.2. + + Args: + path_list: list of file and directory paths in the real file system. + read_only: if set, all files and files under under the directories are treated as read-only, + e.g. a write access raises an exception; + otherwise, writing to the files changes the fake files only as usually. + + Raises: + OSError: if any of the files and directories in the list does not exist in the real file system. + OSError: if any of the files and directories in the list already exists in the fake file system. + """ + for path in path_list: + if os.path.isdir(path): + self.AddRealDirectory(path, read_only) + else: + self.AddRealFile(path, read_only) + + def _CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE, + contents='', st_size=None, create_missing_dirs=True, + apply_umask=False, encoding=None, read_from_real_fs=False, real_stat=None): + """Create file_path, including all the parent directories along the way. + + Args: + file_path: path to the file to create. + st_mode: the stat.S_IF constant representing the file type. + contents: the contents of the file. + st_size: file size; only valid if contents not given. + create_missing_dirs: if True, auto create missing directories. + apply_umask: whether or not the current umask must be applied on st_mode. + encoding: if contents is a unicode string, the encoding used for serialization. + New in pyfakefs 2.9. + read_from_real_fs: if True, the contents are reaf from the real file system on demand. + New in pyfakefs 3.2. + real_stat: used in combination with read_from_real_fs; stat result of the real file """ file_path = self.NormalizePath(file_path) if self.Exists(file_path): @@ -1623,6 +1723,16 @@ def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE, if apply_umask: st_mode &= ~self.umask file_object = FakeFile(new_file, st_mode, filesystem=self) + if read_from_real_fs: + file_object.st_ctime = real_stat.st_ctime + file_object.st_atime = real_stat.st_atime + file_object.st_mtime = real_stat.st_mtime + file_object.st_gid = real_stat.st_gid + file_object.st_uid = real_stat.st_uid + file_object.st_size = real_stat.st_size + file_object.read_from_real_fs = True + file_object.file_path = file_path + self.last_ino += 1 file_object.SetIno(self.last_ino) self.AddObject(parent_directory, file_object)