Skip to content

Commit

Permalink
Added support for basic modes in os.open()
Browse files Browse the repository at this point in the history
- added support for O_RDONLY, O_WRONLY, O_RDWR, O_APPEND, O_TRUNC and O_EXCL (O_CREAT was already implemented)
- added support for opening directories read-only for Posix systems
- fixes pytest-dev#204
  • Loading branch information
mrbean-bremen committed Jun 11, 2017
1 parent b157fd2 commit 2390971
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ The release versions are PyPi releases.
## Version 3.3 (as yet unreleased)

#### New Features
* Added support for basic modes in fake `os.open()` ([#204](../../issues/204)).

#### Infrastructure

Expand Down
225 changes: 169 additions & 56 deletions fake_filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
"""Unittest for fake_filesystem module."""

import errno
import io
import locale
import os
import re
import stat
import sys
import time
Expand Down Expand Up @@ -825,51 +825,6 @@ def testFdopenMode(self):
exception = OSError if sys.version_info < (3, 0) else IOError
self.assertRaises(exception, self.os.fdopen, 0, 'w')

def testLowLevelOpenCreate(self):
file_path = 'file1'
# this is the low-level open, not FakeFileOpen
fileno = self.os.open(file_path, self.os.O_CREAT)
self.assertEqual(0, fileno)
self.assertTrue(self.os.path.exists(file_path))

def testLowLevelOpenCreateMode(self):
file_path = 'file1'
fileno = self.os.open(file_path, self.os.O_CREAT, 0o700)
self.assertEqual(0, fileno)
self.assertTrue(self.os.path.exists(file_path))
self.assertModeEqual(0o700, self.os.stat(file_path).st_mode)

def testLowLevelOpenCreateModeUnsupported(self):
file_path = 'file1'
fake_flag = 0b100000000000000000000000
self.assertRaises(NotImplementedError, self.os.open, file_path, fake_flag)

def testLowLevelWriteRead(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents='orig contents')
new_contents = '1234567890abcdef'
fake_open = fake_filesystem.FakeFileOpen(self.filesystem)

fh = fake_open(file_path, 'w')
fileno = fh.fileno()

self.assertEqual(len(new_contents), self.os.write(fileno, new_contents))
self.assertEqual(new_contents,
self.filesystem.GetObject(file_path).contents)
self.os.close(fileno)

fh = fake_open(file_path, 'r')
fileno = fh.fileno()
self.assertEqual('', self.os.read(fileno, 0))
self.assertEqual(new_contents[0:2], self.os.read(fileno, 2))
self.assertEqual(new_contents[2:10], self.os.read(fileno, 8))
self.assertEqual(new_contents[10:], self.os.read(fileno, 100))
self.assertEqual('', self.os.read(fileno, 10))
self.os.close(fileno)

self.assertRaisesOSError(errno.EBADF, self.os.write, fileno, new_contents)
self.assertRaisesOSError(errno.EBADF, self.os.read, fileno, 10)

def testFstat(self):
directory = 'xyzzy'
file_path = '%s/plugh' % directory
Expand Down Expand Up @@ -2289,6 +2244,156 @@ def testOpenUmaskApplied(self):
self.assertModeEqual(0o640, self.os.stat('file2').st_mode)


class FakeOsModuleLowLevelFileOpTest(FakeOsModuleTestBase):
"""Test low level functions `os.open()`, `os.read()` and `os.write()`."""

def setUp(self):
super(FakeOsModuleLowLevelFileOpTest, self).setUp()
if sys.version_info < (3, ):
self.operation_error = IOError
else:
self.operation_error = io.UnsupportedOperation

def testLowLevelOpenReadOnly(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_RDONLY)
self.assertEqual(0, file_des)
self.assertEqual(b'contents', self.os.read(file_des, 8))
self.assertRaises(self.operation_error, self.os.write, file_des, b'test')

def testLowLevelOpenWriteOnly(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_WRONLY)
self.assertEqual(0, file_des)
self.assertRaises(self.operation_error, self.os.read, file_des, 5)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'testents', file_obj.byte_contents)

def testLowLevelOpenReadWrite(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_RDWR)
self.assertEqual(0, file_des)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'testents', file_obj.byte_contents)

def testLowLevelOpenRaisesIfDoesNotExist(self):
file_path = 'file1'
self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_RDONLY)
self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_WRONLY)
self.assertRaisesIOError(errno.ENOENT, self.os.open, file_path, os.O_RDWR)

def testLowLevelOpenTruncate(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_RDWR | os.O_TRUNC)
self.assertEqual(0, file_des)
self.assertEqual(b'', self.os.read(file_des, 8))
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'test', file_obj.byte_contents)

def testLowLevelOpenAppend(self):
file_path = 'file1'
file_obj = self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))

file_des = self.os.open(file_path, os.O_WRONLY | os.O_APPEND)
self.assertEqual(0, file_des)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertEqual(b'contentstest', file_obj.byte_contents)

def testLowLevelOpenCreate(self):
file_path = 'file1'
file_des = self.os.open(file_path, os.O_RDWR | os.O_CREAT)
self.assertEqual(0, file_des)
self.assertTrue(self.os.path.exists(file_path))
self.assertEqual(4, self.os.write(file_des, b'test'))
file_obj = self.filesystem.GetObject(file_path)
self.assertEqual(b'test', file_obj.byte_contents)

def testLowLevelOpenCreateMode(self):
file_path = 'file1'
file_des = self.os.open(file_path, os.O_WRONLY | os.O_CREAT, 0o700)
self.assertEqual(0, file_des)
self.assertTrue(self.os.path.exists(file_path))
self.assertRaises(self.operation_error, self.os.read, file_des, 5)
self.assertEqual(4, self.os.write(file_des, b'test'))
self.assertModeEqual(0o700, self.os.stat(file_path).st_mode)

@unittest.skipIf(sys.version_info < (3, 3), 'Exclusive mode new in Python 3.3')
def testLowLevelOpenExclusive(self):
file_path = 'file1'
fileno = self.os.open(file_path, os.O_RDWR | os.O_EXCL)
self.assertEqual(0, fileno)
self.assertTrue(self.os.path.exists(file_path))

@unittest.skipIf(sys.version_info < (3, 3), 'Exclusive mode new in Python 3.3')
def testLowLevelOpenExclusiveRaisesIfFileExists(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents=b'contents',
st_mode=(stat.S_IFREG | 0o666))
self.assertRaisesIOError(errno.EEXIST, self.os.open, file_path, os.O_RDWR | os.O_EXCL)
self.assertRaisesIOError(errno.EEXIST, self.os.open, file_path, os.O_RDWR | os.O_EXCL)

def testOpenDirectoryRaisesUnderWindows(self):
self.filesystem.is_windows_fs = True
dir_path = '/dir'
self.filesystem.CreateDirectory(dir_path)
self.assertRaisesOSError(errno.EPERM, self.os.open, dir_path, os.O_RDONLY)
self.assertRaisesOSError(errno.EPERM, self.os.open, dir_path, os.O_WRONLY)
self.assertRaisesOSError(errno.EPERM, self.os.open, dir_path, os.O_RDWR)

def testOpenDirectoryForWritingRaisesUnderPosix(self):
self.filesystem.is_windows_fs = False
dir_path = '/dir'
self.filesystem.CreateDirectory(dir_path)
self.assertRaisesIOError(errno.EISDIR, self.os.open, dir_path, os.O_WRONLY)
self.assertRaisesIOError(errno.EISDIR, self.os.open, dir_path, os.O_RDWR)

def testOpenDirectoryReadOnlyUnderPosix(self):
self.filesystem.is_windows_fs = False
dir_path = '/dir'
self.filesystem.CreateDirectory(dir_path)
file_des = self.os.open(dir_path, os.O_RDONLY)
self.assertEqual(0, file_des)

def testLowLevelWriteRead(self):
file_path = 'file1'
self.filesystem.CreateFile(file_path, contents=b'orig contents')
new_contents = b'1234567890abcdef'
fake_open = fake_filesystem.FakeFileOpen(self.filesystem)

fh = fake_open(file_path, 'wb')
fileno = fh.fileno()

self.assertEqual(len(new_contents), self.os.write(fileno, new_contents))
self.assertEqual(new_contents,
self.filesystem.GetObject(file_path).byte_contents)
self.os.close(fileno)

fh = fake_open(file_path, 'rb')
fileno = fh.fileno()
self.assertEqual(b'', self.os.read(fileno, 0))
self.assertEqual(new_contents[0:2], self.os.read(fileno, 2))
self.assertEqual(new_contents[2:10], self.os.read(fileno, 8))
self.assertEqual(new_contents[10:], self.os.read(fileno, 100))
self.assertEqual(b'', self.os.read(fileno, 10))
self.os.close(fileno)

self.assertRaisesOSError(errno.EBADF, self.os.write, fileno, new_contents)
self.assertRaisesOSError(errno.EBADF, self.os.read, fileno, 10)


@unittest.skipIf(sys.version_info < (3, 5), 'os.scandir was introduced in Python 3.5')
class FakeScandirTest(FakeOsModuleTestBase):
def setUp(self):
Expand Down Expand Up @@ -3069,7 +3174,10 @@ def testIterateOverFile(self):
def testOpenDirectoryError(self):
directory_path = 'foo!bar'
self.filesystem.CreateDirectory(directory_path)
self.assertRaisesIOError(errno.EISDIR, self.file.__call__, directory_path)
if self.filesystem.is_windows_fs:
self.assertRaisesOSError(errno.EPERM, self.file.__call__, directory_path)
else:
self.assertRaisesIOError(errno.EISDIR, self.file.__call__, directory_path)

def testCreateFileWithWrite(self):
contents = [
Expand Down Expand Up @@ -3591,7 +3699,6 @@ def testReadStrErrorModes(self):
contents = f.read()
self.assertEqual(r'\xd9\xe4\xea \xc8\xc7\xc8\xc7', contents)


def testWriteAndReadStr(self):
str_contents = u'علي بابا'
with self.open(self.file_path, 'w', encoding='arabic') as f:
Expand Down Expand Up @@ -4644,32 +4751,38 @@ def testAddExistingRealDirectoryReadOnly(self):
def testAddExistingRealDirectoryTree(self):
real_dir_path = os.path.dirname(__file__)
self.filesystem.add_real_directory(real_dir_path)
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))
self.assertTrue(
self.filesystem.Exists(os.path.join(real_dir_path, 'fake_filesystem_test.py')))
self.assertTrue(
self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(
self.filesystem.Exists(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))

def testGetObjectFromLazilyAddedRealDirectory(self):
self.filesystem.is_case_sensitive = True
real_dir_path = os.path.dirname(__file__)
self.filesystem.add_real_directory(real_dir_path)
self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))
self.assertTrue(self.filesystem.GetObject(
os.path.join(real_dir_path, 'pyfakefs', 'fake_filesystem.py')))
self.assertTrue(
self.filesystem.GetObject(os.path.join(real_dir_path, 'pyfakefs', '__init__.py')))

def testAddExistingRealDirectoryLazily(self):
disk_size = 1024*1024*1024
disk_size = 1024 * 1024 * 1024
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.SetDiskUsage(disk_size, real_dir_path)
self.filesystem.add_real_directory(real_dir_path)

# the directory contents have not been read, the the disk usage has not changed
self.assertEqual(disk_size, self.filesystem.GetDiskUsage(real_dir_path).free)
# checking for existence shall read the directory contents
self.assertTrue(self.filesystem.GetObject(os.path.join(real_dir_path, 'fake_filesystem.py')))
self.assertTrue(
self.filesystem.GetObject(os.path.join(real_dir_path, 'fake_filesystem.py')))
# so now the free disk space shall have decreased
self.assertGreater(disk_size, self.filesystem.GetDiskUsage(real_dir_path).free)

def testAddExistingRealDirectoryNotLazily(self):
disk_size = 1024*1024*1024
disk_size = 1024 * 1024 * 1024
real_dir_path = os.path.join(os.path.dirname(__file__), 'pyfakefs')
self.filesystem.SetDiskUsage(disk_size, real_dir_path)
self.filesystem.add_real_directory(real_dir_path, lazy_read=False)
Expand Down
Loading

0 comments on commit 2390971

Please sign in to comment.