From d1a0de0d6e0e9e227077e28f7ae6a8f67f05a2b4 Mon Sep 17 00:00:00 2001 From: mrbean-bremen Date: Sun, 11 Jun 2017 20:13:37 +0200 Subject: [PATCH] Added support for basic modes in os.open() - see #204 --- CHANGES.md | 1 + fake_filesystem_test.py | 198 ++++++++++++++++++++++++++---------- pyfakefs/fake_filesystem.py | 53 ++++++---- 3 files changed, 180 insertions(+), 72 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8ae992c7..35011786 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ The release versions are PyPi releases. ## Version 3.3 (as yet unreleased) #### New Features + * Added support for basic modes in fake `os.open()` ([#204](../../issues/204)). #### Infrastructure diff --git a/fake_filesystem_test.py b/fake_filesystem_test.py index 84fcd466..a1a8cb44 100755 --- a/fake_filesystem_test.py +++ b/fake_filesystem_test.py @@ -17,9 +17,9 @@ """Unittest for fake_filesystem module.""" import errno +import io import locale import os -import re import stat import sys import time @@ -825,51 +825,6 @@ def testFdopenMode(self): exception = OSError if sys.version_info < (3, 0) else IOError self.assertRaises(exception, self.os.fdopen, 0, 'w') - def testLowLevelOpenCreate(self): - file_path = 'file1' - # this is the low-level open, not FakeFileOpen - fileno = self.os.open(file_path, self.os.O_CREAT) - self.assertEqual(0, fileno) - self.assertTrue(self.os.path.exists(file_path)) - - def testLowLevelOpenCreateMode(self): - file_path = 'file1' - fileno = self.os.open(file_path, self.os.O_CREAT, 0o700) - self.assertEqual(0, fileno) - self.assertTrue(self.os.path.exists(file_path)) - self.assertModeEqual(0o700, self.os.stat(file_path).st_mode) - - def testLowLevelOpenCreateModeUnsupported(self): - file_path = 'file1' - fake_flag = 0b100000000000000000000000 - self.assertRaises(NotImplementedError, self.os.open, file_path, fake_flag) - - def testLowLevelWriteRead(self): - file_path = 'file1' - self.filesystem.CreateFile(file_path, contents='orig contents') - new_contents = '1234567890abcdef' - fake_open = fake_filesystem.FakeFileOpen(self.filesystem) - - fh = fake_open(file_path, 'w') - fileno = fh.fileno() - - self.assertEqual(len(new_contents), self.os.write(fileno, new_contents)) - self.assertEqual(new_contents, - self.filesystem.GetObject(file_path).contents) - self.os.close(fileno) - - fh = fake_open(file_path, 'r') - fileno = fh.fileno() - self.assertEqual('', self.os.read(fileno, 0)) - self.assertEqual(new_contents[0:2], self.os.read(fileno, 2)) - self.assertEqual(new_contents[2:10], self.os.read(fileno, 8)) - self.assertEqual(new_contents[10:], self.os.read(fileno, 100)) - self.assertEqual('', self.os.read(fileno, 10)) - self.os.close(fileno) - - self.assertRaisesOSError(errno.EBADF, self.os.write, fileno, new_contents) - self.assertRaisesOSError(errno.EBADF, self.os.read, fileno, 10) - def testFstat(self): directory = 'xyzzy' file_path = '%s/plugh' % directory @@ -2289,6 +2244,134 @@ def testOpenUmaskApplied(self): self.assertModeEqual(0o640, self.os.stat('file2').st_mode) +class FakeOsModuleLowLevelFileOpTest(FakeOsModuleTestBase): + """Test low level functions `os.open()`, `os.read()` and `os.write()`.""" + + def setUp(self): + super(FakeOsModuleLowLevelFileOpTest, self).setUp() + if sys.version_info < (3, ): + self.operation_error = IOError + else: + self.operation_error = io.UnsupportedOperation + + def testLowLevelOpenReadOnly(self): + file_path = 'file1' + self.filesystem.CreateFile(file_path, contents=b'contents', + st_mode=(stat.S_IFREG | 0o666)) + + file_des = self.os.open(file_path, os.O_RDONLY) + self.assertEqual(0, file_des) + self.assertEqual(b'contents', self.os.read(file_des, 8)) + self.assertRaises(self.operation_error, self.os.write, file_des, b'test') + + def testLowLevelOpenWriteOnly(self): + file_path = 'file1' + file_obj = self.filesystem.CreateFile(file_path, contents=b'contents', + st_mode=(stat.S_IFREG | 0o666)) + + file_des = self.os.open(file_path, os.O_WRONLY) + self.assertEqual(0, file_des) + self.assertRaises(self.operation_error, self.os.read, file_des, 5) + self.assertEqual(4, self.os.write(file_des, b'test')) + self.assertEqual(b'testents', file_obj.byte_contents) + + def testLowLevelOpenReadWrite(self): + file_path = 'file1' + file_obj = self.filesystem.CreateFile(file_path, contents=b'contents', + st_mode=(stat.S_IFREG | 0o666)) + + file_des = self.os.open(file_path, os.O_RDWR) + self.assertEqual(0, file_des) + self.assertEqual(4, self.os.write(file_des, b'test')) + self.assertEqual(b'testents', file_obj.byte_contents) + + def testLowLevelOpenRaisesIfDoesNotExist(self): + file_path = 'file1' + self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_RDONLY) + self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_WRONLY) + self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_RDWR) + + def testLowLevelOpenTruncate(self): + file_path = 'file1' + file_obj = self.filesystem.CreateFile(file_path, contents=b'contents', + st_mode=(stat.S_IFREG | 0o666)) + + file_des = self.os.open(file_path, os.O_RDWR | os.O_TRUNC) + self.assertEqual(0, file_des) + self.assertEqual(b'', self.os.read(file_des, 8)) + self.assertEqual(4, self.os.write(file_des, b'test')) + self.assertEqual(b'test', file_obj.byte_contents) + + def testLowLevelOpenAppend(self): + file_path = 'file1' + file_obj = self.filesystem.CreateFile(file_path, contents=b'contents', + st_mode=(stat.S_IFREG | 0o666)) + + file_des = self.os.open(file_path, os.O_WRONLY | os.O_APPEND) + self.assertEqual(0, file_des) + self.assertEqual(4, self.os.write(file_des, b'test')) + self.assertEqual(b'contentstest', file_obj.byte_contents) + + def testLowLevelOpenCreate(self): + file_path = 'file1' + file_des = self.os.open(file_path, os.O_RDWR | os.O_CREAT) + self.assertEqual(0, file_des) + self.assertTrue(self.os.path.exists(file_path)) + self.assertEqual(4, self.os.write(file_des, b'test')) + file_obj = self.filesystem.GetObject(file_path) + self.assertEqual(b'test', file_obj.byte_contents) + + def testLowLevelOpenCreateMode(self): + file_path = 'file1' + file_des = self.os.open(file_path, os.O_WRONLY | os.O_CREAT, 0o700) + self.assertEqual(0, file_des) + self.assertTrue(self.os.path.exists(file_path)) + self.assertRaises(self.operation_error, self.os.read, file_des, 5) + self.assertEqual(4, self.os.write(file_des, b'test')) + self.assertModeEqual(0o700, self.os.stat(file_path).st_mode) + + @unittest.skipIf(sys.version_info < (3, 3), 'Exclusive mode new in Python 3.3') + def testLowLevelOpenExclusive(self): + file_path = 'file1' + fileno = self.os.open(file_path, os.O_RDWR | os.O_EXCL) + self.assertEqual(0, fileno) + self.assertTrue(self.os.path.exists(file_path)) + + @unittest.skipIf(sys.version_info < (3, 3), 'Exclusive mode new in Python 3.3') + def testLowLevelOpenExclusiveRaisesIfFileExists(self): + file_path = 'file1' + self.filesystem.CreateFile(file_path, contents=b'contents', + st_mode=(stat.S_IFREG | 0o666)) + self.assertRaisesIOError(errno.EEXIST, self.os.open, file_path, os.O_RDWR | os.O_EXCL) + self.assertRaisesIOError(errno.EEXIST, self.os.open, file_path, os.O_RDWR | os.O_EXCL) + + def testLowLevelWriteRead(self): + file_path = 'file1' + self.filesystem.CreateFile(file_path, contents=b'orig contents') + new_contents = b'1234567890abcdef' + fake_open = fake_filesystem.FakeFileOpen(self.filesystem) + + fh = fake_open(file_path, 'wb') + fileno = fh.fileno() + + self.assertEqual(len(new_contents), self.os.write(fileno, new_contents)) + self.assertEqual(new_contents, + self.filesystem.GetObject(file_path).byte_contents) + self.os.close(fileno) + + fh = fake_open(file_path, 'rb') + fileno = fh.fileno() + self.assertEqual(b'', self.os.read(fileno, 0)) + self.assertEqual(new_contents[0:2], self.os.read(fileno, 2)) + self.assertEqual(new_contents[2:10], self.os.read(fileno, 8)) + self.assertEqual(new_contents[10:], self.os.read(fileno, 100)) + self.assertEqual(b'', self.os.read(fileno, 10)) + self.os.close(fileno) + + self.assertRaisesOSError(errno.EBADF, self.os.write, fileno, new_contents) + self.assertRaisesOSError(errno.EBADF, self.os.read, fileno, 10) + + @unittest.skipIf(sys.version_info < (3, 5), 'os.scandir was introduced in Python 3.5') class FakeScandirTest(FakeOsModuleTestBase): def setUp(self): @@ -3591,7 +3674,6 @@ def testReadStrErrorModes(self): contents = f.read() self.assertEqual(r'\xd9\xe4\xea \xc8\xc7\xc8\xc7', contents) - def testWriteAndReadStr(self): str_contents = u'علي بابا' with self.open(self.file_path, 'w', encoding='arabic') as f: @@ -4644,19 +4726,24 @@ def testAddExistingRealDirectoryReadOnly(self): def testAddExistingRealDirectoryTree(self): real_dir_path = os.path.dirname(__file__) self.filesystem.add_real_directory(real_dir_path) - self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py'))) - self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py'))) - self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py'))) + self.assertTrue( + self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py'))) + self.assertTrue( + self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py'))) + self.assertTrue( + self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py'))) def testGetObjectFromLazilyAddedRealDirectory(self): self.filesystem.is_case_sensitive = True real_dir_path = os.path.dirname(__file__) self.filesystem.add_real_directory(real_dir_path) - self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py'))) - self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', '__init__.py'))) + self.assertTrue(self.filesystem.GetObject( + os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py'))) + self.assertTrue( + self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', '__init__.py'))) def testAddExistingRealDirectoryLazily(self): - disk_size = 1024*1024*1024 + disk_size = 1024 * 1024 * 1024 real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs') self.filesystem.SetDiskUsage(disk_size, real_dir_path) self.filesystem.add_real_directory(real_dir_path) @@ -4664,12 +4751,13 @@ def testAddExistingRealDirectoryLazily(self): # the directory contents have not been read, the the disk usage has not changed self.assertEqual(disk_size, self.filesystem.GetDiskUsage(real_dir_path).free) # checking for existence shall read the directory contents - self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'fake_filesystem.py'))) + self.assertTrue( + self.filesystem.GetObject(os.path.join(real_dir_path, 'fake_filesystem.py'))) # so now the free disk space shall have decreased self.assertGreater(disk_size, self.filesystem.GetDiskUsage(real_dir_path).free) def testAddExistingRealDirectoryNotLazily(self): - disk_size = 1024*1024*1024 + disk_size = 1024 * 1024 * 1024 real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs') self.filesystem.SetDiskUsage(disk_size, real_dir_path) self.filesystem.add_real_directory(real_dir_path, lazy_read=False) diff --git a/pyfakefs/fake_filesystem.py b/pyfakefs/fake_filesystem.py index f8ce8fee..dd755c6f 100644 --- a/pyfakefs/fake_filesystem.py +++ b/pyfakefs/fake_filesystem.py @@ -126,9 +126,12 @@ 'r+': (True, True, True, False, False, False), 'w+': (False, True, True, True, False, False), 'a+': (False, True, True, False, True, False), + 'w!': (False, False, True, False, False, False), + 'w!+': (False, True, True, False, False, False), } if sys.version_info >= (3, 3): _OPEN_MODE_MAP['x'] = (False, False, True, False, False, True) + _OPEN_MODE_MAP['x+'] = (False, True, True, False, False, True) _MAX_LINK_DEPTH = 20 @@ -2889,29 +2892,45 @@ def _fdopen_ver2(self, file_des, mode='r', bufsize=None): # pylint: disable=unu def open(self, file_path, flags, mode=None): """Return the file descriptor for a FakeFile. - WARNING: This implementation only implements creating a file. Please fill - out the remainder for your needs. - Args: - file_path: the path to the file - flags: low-level bits to indicate io operation - mode: bits to define default permissions + file_path: the path to the file + flags: low-level bits to indicate io operation + mode: bits to define default permissions + Note: only basic modes are supported, OS-specific modes are ignored Returns: - A file descriptor. + A file descriptor. Raises: - OSError: if the path cannot be found - ValueError: if invalid mode is given - NotImplementedError: if an unsupported flag is passed in + IOError: if the path cannot be found + ValueError: if invalid mode is given """ - if flags & os.O_CREAT: - fake_file = FakeFileOpen(self.filesystem)(file_path, 'w') - if mode: - self.chmod(file_path, mode) - return fake_file.fileno() - else: - raise NotImplementedError('FakeOsModule.open') + str_flags = 'r' + # todo: directories + if flags & os.O_WRONLY or flags & os.O_RDWR: + if not flags & (os.O_CREAT | os.O_EXCL): + if not self.filesystem.Exists(file_path): + raise IOError(errno.ENOENT, + 'File does not exist in fake filesystem', + file_path) + if flags & os.O_EXCL: + str_flags = 'x' + elif flags & os.O_APPEND: + str_flags = 'a' + elif flags & os.O_TRUNC: + str_flags = 'w' + else: + # non-existing mode to map the behavior of O_WRONLY + # this is the same as 'r+', but without read permission + str_flags = 'w!' + if flags & os.O_RDWR: + str_flags += '+' + # low level open is always binary + str_flags += 'b' + fake_file = FakeFileOpen(self.filesystem)(file_path, str_flags) + if mode: + self.chmod(file_path, mode) + return fake_file.fileno() def close(self, file_des): """Close a file descriptor.