Skip to content

Commit

Permalink
Added support for basic modes in os.open()
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbean-bremen committed Jun 11, 2017
1 parent b157fd2 commit d1a0de0
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ The release versions are PyPi releases.
## Version 3.3 (as yet unreleased)

#### New Features
* Added support for basic modes in fake `os.open()` ([#204](../../issues/204)).

#### Infrastructure

Expand Down
198 changes: 143 additions & 55 deletions fake_filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
"""Unittest for fake_filesystem module."""

import errno
import io
import locale
import os
import re
import stat
import sys
import time
Expand Down Expand Up @@ -825,51 +825,6 @@ def testFdopenMode(self):
exception = OSError if sys.version_info < (3, 0) else IOError
self.assertRaises(exception, self.os.fdopen, 0, 'w')

def testLowLevelOpenCreate(self):
file_path = 'file1'
# this is the low-level open, not FakeFileOpen
fileno = self.os.open(file_path, self.os.O_CREAT)
self.assertEqual(0, fileno)
self.assertTrue(self.os.path.exists(file_path))

def testLowLevelOpenCreateMode(self):
file_path = 'file1'
fileno = self.os.open(file_path, self.os.O_CREAT, 0o700)
self.assertEqual(0, fileno)
self.assertTrue(self.os.path.exists(file_path))
self.assertModeEqual(0o700, self.os.stat(file_path).st_mode)

def testLowLevelOpenCreateModeUnsupported(self):
file_path = 'file1'
fake_flag = 0b100000000000000000000000
self.assertRaises(NotImplementedError, self.os.open, file_path, fake_flag)

def testLowLevelWriteRead(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents='orig contents')
new_contents = '1234567890abcdef'
fake_open = fake_filesystem.FakeFileOpen(self.filesystem)

fh = fake_open(file_path, 'w')
fileno = fh.fileno()

self.assertEqual(len(new_contents), self.os.write(fileno, new_contents))
self.assertEqual(new_contents,
self.filesystem.GetObject(file_path).contents)
self.os.close(fileno)

fh = fake_open(file_path, 'r')
fileno = fh.fileno()
self.assertEqual('', self.os.read(fileno, 0))
self.assertEqual(new_contents[0:2], self.os.read(fileno, 2))
self.assertEqual(new_contents[2:10], self.os.read(fileno, 8))
self.assertEqual(new_contents[10:], self.os.read(fileno, 100))
self.assertEqual('', self.os.read(fileno, 10))
self.os.close(fileno)

self.assertRaisesOSError(errno.EBADF, self.os.write, fileno, new_contents)
self.assertRaisesOSError(errno.EBADF, self.os.read, fileno, 10)

def testFstat(self):
directory = 'xyzzy'
file_path = '%s/plugh' % directory
Expand Down Expand Up @@ -2289,6 +2244,134 @@ def testOpenUmaskApplied(self):
self.assertModeEqual(0o640, self.os.stat('file2').st_mode)


class FakeOsModuleLowLevelFileOpTest(FakeOsModuleTestBase):
"""Test low level functions `os.open()`, `os.read()` and `os.write()`."""

def setUp(self):
super(FakeOsModuleLowLevelFileOpTest, self).setUp()
if sys.version_info < (3, ):
self.operation_error = IOError
else:
self.operation_error = io.UnsupportedOperation

def testLowLevelOpenReadOnly(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_RDONLY)
self.assertEqual(0, file_des)
self.assertEqual(b'contents', self.os.read(file_des, 8))
self.assertRaises(self.operation_error, self.os.write, file_des, b'test')

def testLowLevelOpenWriteOnly(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_WRONLY)
self.assertEqual(0, file_des)
self.assertRaises(self.operation_error, self.os.read, file_des, 5)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'testents', file_obj.byte_contents)

def testLowLevelOpenReadWrite(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_RDWR)
self.assertEqual(0, file_des)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'testents', file_obj.byte_contents)

def testLowLevelOpenRaisesIfDoesNotExist(self):
file_path = 'file1'
self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_RDONLY)
self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_WRONLY)
self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_RDWR)

def testLowLevelOpenTruncate(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_RDWR | os.O_TRUNC)
self.assertEqual(0, file_des)
self.assertEqual(b'', self.os.read(file_des, 8))
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'test', file_obj.byte_contents)

def testLowLevelOpenAppend(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_WRONLY | os.O_APPEND)
self.assertEqual(0, file_des)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'contentstest', file_obj.byte_contents)

def testLowLevelOpenCreate(self):
file_path = 'file1'
file_des = self.os.open(file_path, os.O_RDWR | os.O_CREAT)
self.assertEqual(0, file_des)
self.assertTrue(self.os.path.exists(file_path))
self.assertEqual(4, self.os.write(file_des, b'test'))
file_obj = self.filesystem.GetObject(file_path)
self.assertEqual(b'test', file_obj.byte_contents)

def testLowLevelOpenCreateMode(self):
file_path = 'file1'
file_des = self.os.open(file_path, os.O_WRONLY | os.O_CREAT, 0o700)
self.assertEqual(0, file_des)
self.assertTrue(self.os.path.exists(file_path))
self.assertRaises(self.operation_error, self.os.read, file_des, 5)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertModeEqual(0o700, self.os.stat(file_path).st_mode)

@unittest.skipIf(sys.version_info < (3, 3), 'Exclusive mode new in Python 3.3')
def testLowLevelOpenExclusive(self):
file_path = 'file1'
fileno = self.os.open(file_path, os.O_RDWR | os.O_EXCL)
self.assertEqual(0, fileno)
self.assertTrue(self.os.path.exists(file_path))

@unittest.skipIf(sys.version_info < (3, 3), 'Exclusive mode new in Python 3.3')
def testLowLevelOpenExclusiveRaisesIfFileExists(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))
self.assertRaisesIOError(errno.EEXIST, self.os.open, file_path, os.O_RDWR | os.O_EXCL)
self.assertRaisesIOError(errno.EEXIST, self.os.open, file_path, os.O_RDWR | os.O_EXCL)

def testLowLevelWriteRead(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents=b'orig contents')
new_contents = b'1234567890abcdef'
fake_open = fake_filesystem.FakeFileOpen(self.filesystem)

fh = fake_open(file_path, 'wb')
fileno = fh.fileno()

self.assertEqual(len(new_contents), self.os.write(fileno, new_contents))
self.assertEqual(new_contents,
self.filesystem.GetObject(file_path).byte_contents)
self.os.close(fileno)

fh = fake_open(file_path, 'rb')
fileno = fh.fileno()
self.assertEqual(b'', self.os.read(fileno, 0))
self.assertEqual(new_contents[0:2], self.os.read(fileno, 2))
self.assertEqual(new_contents[2:10], self.os.read(fileno, 8))
self.assertEqual(new_contents[10:], self.os.read(fileno, 100))
self.assertEqual(b'', self.os.read(fileno, 10))
self.os.close(fileno)

self.assertRaisesOSError(errno.EBADF, self.os.write, fileno, new_contents)
self.assertRaisesOSError(errno.EBADF, self.os.read, fileno, 10)


@unittest.skipIf(sys.version_info < (3, 5), 'os.scandir was introduced in Python 3.5')
class FakeScandirTest(FakeOsModuleTestBase):
def setUp(self):
Expand Down Expand Up @@ -3591,7 +3674,6 @@ def testReadStrErrorModes(self):
contents = f.read()
self.assertEqual(r'\xd9\xe4\xea \xc8\xc7\xc8\xc7', contents)


def testWriteAndReadStr(self):
str_contents = u'علي بابا'
with self.open(self.file_path, 'w', encoding='arabic') as f:
Expand Down Expand Up @@ -4644,32 +4726,38 @@ def testAddExistingRealDirectoryReadOnly(self):
def testAddExistingRealDirectoryTree(self):
real_dir_path = os.path.dirname(__file__)
self.filesystem.add_real_directory(real_dir_path)
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))
self.assertTrue(
self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py')))
self.assertTrue(
self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(
self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))

def testGetObjectFromLazilyAddedRealDirectory(self):
self.filesystem.is_case_sensitive = True
real_dir_path = os.path.dirname(__file__)
self.filesystem.add_real_directory(real_dir_path)
self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))
self.assertTrue(self.filesystem.GetObject(
os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(
self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))

def testAddExistingRealDirectoryLazily(self):
disk_size = 1024*1024*1024
disk_size = 1024 * 1024 * 1024
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.SetDiskUsage(disk_size, real_dir_path)
self.filesystem.add_real_directory(real_dir_path)

# the directory contents have not been read, the the disk usage has not changed
self.assertEqual(disk_size, self.filesystem.GetDiskUsage(real_dir_path).free)
# checking for existence shall read the directory contents
self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'fake_filesystem.py')))
self.assertTrue(
self.filesystem.GetObject(os.path.join(real_dir_path, 'fake_filesystem.py')))
# so now the free disk space shall have decreased
self.assertGreater(disk_size, self.filesystem.GetDiskUsage(real_dir_path).free)

def testAddExistingRealDirectoryNotLazily(self):
disk_size = 1024*1024*1024
disk_size = 1024 * 1024 * 1024
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.SetDiskUsage(disk_size, real_dir_path)
self.filesystem.add_real_directory(real_dir_path, lazy_read=False)
Expand Down
53 changes: 36 additions & 17 deletions pyfakefs/fake_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@
'r+': (True, True, True, False, False, False),
'w+': (False, True, True, True, False, False),
'a+': (False, True, True, False, True, False),
'w!': (False, False, True, False, False, False),
'w!+': (False, True, True, False, False, False),
}
if sys.version_info >= (3, 3):
_OPEN_MODE_MAP['x'] = (False, False, True, False, False, True)
_OPEN_MODE_MAP['x+'] = (False, True, True, False, False, True)

_MAX_LINK_DEPTH = 20

Expand Down Expand Up @@ -2889,29 +2892,45 @@ def _fdopen_ver2(self, file_des, mode='r', bufsize=None): # pylint: disable=unu
def open(self, file_path, flags, mode=None):
"""Return the file descriptor for a FakeFile.
WARNING: This implementation only implements creating a file. Please fill
out the remainder for your needs.
Args:
file_path: the path to the file
flags: low-level bits to indicate io operation
mode: bits to define default permissions
file_path: the path to the file
flags: low-level bits to indicate io operation
mode: bits to define default permissions
Note: only basic modes are supported, OS-specific modes are ignored
Returns:
A file descriptor.
A file descriptor.
Raises:
OSError: if the path cannot be found
ValueError: if invalid mode is given
NotImplementedError: if an unsupported flag is passed in
IOError: if the path cannot be found
ValueError: if invalid mode is given
"""
if flags & os.O_CREAT:
fake_file = FakeFileOpen(self.filesystem)(file_path, 'w')
if mode:
self.chmod(file_path, mode)
return fake_file.fileno()
else:
raise NotImplementedError('FakeOsModule.open')
str_flags = 'r'
# todo: directories
if flags & os.O_WRONLY or flags & os.O_RDWR:
if not flags & (os.O_CREAT | os.O_EXCL):
if not self.filesystem.Exists(file_path):
raise IOError(errno.ENOENT,
'File does not exist in fake filesystem',
file_path)
if flags & os.O_EXCL:
str_flags = 'x'
elif flags & os.O_APPEND:
str_flags = 'a'
elif flags & os.O_TRUNC:
str_flags = 'w'
else:
# non-existing mode to map the behavior of O_WRONLY
# this is the same as 'r+', but without read permission
str_flags = 'w!'
if flags & os.O_RDWR:
str_flags += '+'
# low level open is always binary
str_flags += 'b'
fake_file = FakeFileOpen(self.filesystem)(file_path, str_flags)
if mode:
self.chmod(file_path, mode)
return fake_file.fileno()

def close(self, file_des):
"""Close a file descriptor.
Expand Down

0 comments on commit d1a0de0

Please sign in to comment.