Skip to content

Commit

Permalink
pythonGH-73991: Add pathlib.Path.copy() (python#119058)
Browse files Browse the repository at this point in the history
Add a `Path.copy()` method that copies the content of one file to another.

This method is similar to `shutil.copyfile()` but differs in the following ways:

- Uses `fcntl.FICLONE` where available (see pythonGH-81338)
- Uses `os.copy_file_range` where available (see pythonGH-81340)
- Uses `_winapi.CopyFile2` where available, even though this copies more metadata than the other implementations. This makes `WindowsPath.copy()` more similar to `shutil.copy2()`.

The method is presently _less_ specified than the `shutil` functions to allow OS-specific optimizations that might copy more or less metadata.

Incorporates code from pythonGH-81338 and pythonGH-93152.

Co-authored-by: Eryk Sun <[email protected]>
  • Loading branch information
2 people authored and mrahtz committed Jun 30, 2024
1 parent 338496d commit 7e37fbe
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 2 deletions.
18 changes: 16 additions & 2 deletions Doc/library/pathlib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1429,8 +1429,22 @@ Creating files and directories
available. In previous versions, :exc:`NotImplementedError` was raised.


Renaming and deleting
^^^^^^^^^^^^^^^^^^^^^
Copying, renaming and deleting
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. method:: Path.copy(target)

Copy the contents of this file to the *target* file. If *target* specifies
a file that already exists, it will be replaced.

.. note::
This method uses operating system functionality to copy file content
efficiently. The OS might also copy some metadata, such as file
permissions. After the copy is complete, users may wish to call
:meth:`Path.chmod` to set the permissions of the target file.

.. versionadded:: 3.14


.. method:: Path.rename(target)

Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ os
by :func:`os.unsetenv`, or made outside Python in the same process.
(Contributed by Victor Stinner in :gh:`120057`.)

pathlib
-------

* Add :meth:`pathlib.Path.copy`, which copies the content of one file to
another, like :func:`shutil.copyfile`.
(Contributed by Barney Gale in :gh:`73991`.)

symtable
--------

Expand Down
30 changes: 30 additions & 0 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import posixpath
from glob import _GlobberBase, _no_recurse_symlinks
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from ._os import copyfileobj


__all__ = ["UnsupportedOperation"]
Expand Down Expand Up @@ -563,6 +564,15 @@ def samefile(self, other_path):
return (st.st_ino == other_st.st_ino and
st.st_dev == other_st.st_dev)

def _samefile_safe(self, other_path):
"""
Like samefile(), but returns False rather than raising OSError.
"""
try:
return self.samefile(other_path)
except (OSError, ValueError):
return False

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
Expand Down Expand Up @@ -780,6 +790,26 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
raise UnsupportedOperation(self._unsupported_msg('mkdir()'))

def copy(self, target):
"""
Copy the contents of this file to the given target.
"""
if not isinstance(target, PathBase):
target = self.with_segments(target)
if self._samefile_safe(target):
raise OSError(f"{self!r} and {target!r} are the same file")
with self.open('rb') as source_f:
try:
with target.open('wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not target.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {target}') from e
else:
raise

def rename(self, target):
"""
Rename this path to the target path.
Expand Down
16 changes: 16 additions & 0 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
grp = None

from ._abc import UnsupportedOperation, PurePathBase, PathBase
from ._os import copyfile


__all__ = [
Expand Down Expand Up @@ -780,6 +781,21 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
if not exist_ok or not self.is_dir():
raise

if copyfile:
def copy(self, target):
"""
Copy the contents of this file to the given target.
"""
try:
target = os.fspath(target)
except TypeError:
if isinstance(target, PathBase):
# Target is an instance of PathBase but not os.PathLike.
# Use generic implementation from PathBase.
return PathBase.copy(self, target)
raise
copyfile(os.fspath(self), target)

def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
Expand Down
138 changes: 138 additions & 0 deletions Lib/pathlib/_os.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Low-level OS functionality wrappers used by pathlib.
"""

from errno import EBADF, EOPNOTSUPP, ETXTBSY, EXDEV
import os
import sys
try:
import fcntl
except ImportError:
fcntl = None
try:
import posix
except ImportError:
posix = None
try:
import _winapi
except ImportError:
_winapi = None


def get_copy_blocksize(infd):
"""Determine blocksize for fastcopying on Linux.
Hopefully the whole file will be copied in a single call.
The copying itself should be performed in a loop 'till EOF is
reached (0 return) so a blocksize smaller or bigger than the actual
file size should not make any difference, also in case the file
content changes while being copied.
"""
try:
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
except OSError:
blocksize = 2 ** 27 # 128 MiB
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
# see gh-82500.
if sys.maxsize < 2 ** 32:
blocksize = min(blocksize, 2 ** 30)
return blocksize


if fcntl and hasattr(fcntl, 'FICLONE'):
def clonefd(source_fd, target_fd):
"""
Perform a lightweight copy of two files, where the data blocks are
copied only when modified. This is known as Copy on Write (CoW),
instantaneous copy or reflink.
"""
fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
else:
clonefd = None


if posix and hasattr(posix, '_fcopyfile'):
def copyfd(source_fd, target_fd):
"""
Copy a regular file content using high-performance fcopyfile(3)
syscall (macOS).
"""
posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
elif hasattr(os, 'copy_file_range'):
def copyfd(source_fd, target_fd):
"""
Copy data from one regular mmap-like fd to another by using a
high-performance copy_file_range(2) syscall that gives filesystems
an opportunity to implement the use of reflinks or server-side
copy.
This should work on Linux >= 4.5 only.
"""
blocksize = get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.copy_file_range(source_fd, target_fd, blocksize,
offset_dst=offset)
if sent == 0:
break # EOF
offset += sent
elif hasattr(os, 'sendfile'):
def copyfd(source_fd, target_fd):
"""Copy data from one regular mmap-like fd to another by using
high-performance sendfile(2) syscall.
This should work on Linux >= 2.6.33 only.
"""
blocksize = get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.sendfile(target_fd, source_fd, offset, blocksize)
if sent == 0:
break # EOF
offset += sent
else:
copyfd = None


if _winapi and hasattr(_winapi, 'CopyFile2'):
def copyfile(source, target):
"""
Copy from one file to another using CopyFile2 (Windows only).
"""
_winapi.CopyFile2(source, target, 0)
else:
copyfile = None


def copyfileobj(source_f, target_f):
"""
Copy data from file-like object source_f to file-like object target_f.
"""
try:
source_fd = source_f.fileno()
target_fd = target_f.fileno()
except Exception:
pass # Fall through to generic code.
else:
try:
# Use OS copy-on-write where available.
if clonefd:
try:
clonefd(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
raise err

# Use OS copy where available.
if copyfd:
copyfd(source_fd, target_fd)
return
except OSError as err:
# Produce more useful error messages.
err.filename = source_f.name
err.filename2 = target_f.name
raise err

# Last resort: copy with fileobj read() and write().
read_source = source_f.read
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)
62 changes: 62 additions & 0 deletions Lib/test/test_pathlib/test_pathlib_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,68 @@ def test_write_text_with_newlines(self):
self.assertEqual((p / 'fileA').read_bytes(),
b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq')

def test_copy_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'copyA'
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())

def test_copy_directory(self):
base = self.cls(self.base)
source = base / 'dirA'
target = base / 'copyA'
with self.assertRaises(OSError):
source.copy(target)

@needs_symlinks
def test_copy_symlink(self):
base = self.cls(self.base)
source = base / 'linkA'
target = base / 'copyA'
source.copy(target)
self.assertTrue(target.exists())
self.assertFalse(target.is_symlink())
self.assertEqual(source.read_text(), target.read_text())

def test_copy_to_existing_file(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirB' / 'fileB'
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(source.read_text(), target.read_text())

def test_copy_to_existing_directory(self):
base = self.cls(self.base)
source = base / 'fileA'
target = base / 'dirA'
with self.assertRaises(OSError):
source.copy(target)

@needs_symlinks
def test_copy_to_existing_symlink(self):
base = self.cls(self.base)
source = base / 'dirB' / 'fileB'
target = base / 'linkA'
real_target = base / 'fileA'
source.copy(target)
self.assertTrue(target.exists())
self.assertTrue(target.is_symlink())
self.assertTrue(real_target.exists())
self.assertFalse(real_target.is_symlink())
self.assertEqual(source.read_text(), real_target.read_text())

def test_copy_empty(self):
base = self.cls(self.base)
source = base / 'empty'
target = base / 'copyA'
source.write_bytes(b'')
source.copy(target)
self.assertTrue(target.exists())
self.assertEqual(target.read_bytes(), b'')

def test_iterdir(self):
P = self.cls
p = P(self.base)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add :meth:`pathlib.Path.copy`, which copies the content of one file to another,
like :func:`shutil.copyfile`.

0 comments on commit 7e37fbe

Please sign in to comment.