Skip to content

Commit

Permalink
renamed copy2 -> copy and copytree -> copytree_copy, copying stats re…
Browse files Browse the repository at this point in the history
…moved
  • Loading branch information
martinhoefling committed Nov 11, 2019
1 parent ad094fa commit 84eb2cb
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 82 deletions.
121 changes: 65 additions & 56 deletions smbclient/shutil.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import unicode_literals
import errno

from ntpath import join, basename, normcase, abspath, normpath, splitdrive
from ntpath import join, basename, normpath, splitdrive, splitunc

import sys
import stat as py_stat
import os.path
from os import error as os_error

from smbclient._io import ioctl_request, SMBFileTransaction, SMBRawIO
Expand Down Expand Up @@ -73,63 +73,40 @@ def onerror(*args):
onerror(rmdir, path, sys.exc_info())


def copy2(src, dst, server_side_copy=False, **kwargs):
"""Copy data and metadata. Return the file's destination.
Metadata is copied with copystat(). Please see the copystat function
for more information.
def copy(src, dst, server_side_copy=False, **kwargs):
"""Copy data.
The destination may be a directory.
"""
_check_src_dst(src, dst)

if isdir(dst, **kwargs):
dst = join(dst, basename(src))
if server_side_copy:
copyfile_server_side(src, dst, **kwargs)
if server_side_copy and _is_remote_file(src) and _is_sameshare(src, dst):
_copyfile_server_side(src, dst, **kwargs)
else:
copyfile(src, dst, **kwargs)
copystat(src, dst, **kwargs)


def copystat(src, dst, **kwargs):
"""Copy file metadata
Copy the permission bits, last access time, last modification time, and
flags from `src` to `dst`. The file contents, owner, and group are
unaffected. `src` and `dst` are path names given as strings.
"""
st = stat(src, **kwargs)
utime(dst, ns=(st.st_atime_ns, st.st_mtime_ns), **kwargs)

# TODO: in principle, we can copy the readonly flag via chmod
# mode = py_stat.S_IMODE(st.st_mode)
# if hasattr(os, 'chmod'):
# os.chmod(dst, mode)


def copyfile(src, dst, **kwargs):
"""Copy data from src to dst"""
if _samefile(src, dst):
_check_src_dst(src, dst)

if _samefile(src, dst, **kwargs):
raise ValueError("`%s` and `%s` are the same file" % (src, dst))

for fn in [src, dst]:
try:
st = stat(fn, **kwargs)
except OSError:
# File most likely does not exist
pass
def _open(fname, mode, **kwargs):
if _is_remote_file(fname):
return open_file(fname, mode, **kwargs)
else:
# XXX What about other special files? (sockets, devices...)
if py_stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
return open(fname, mode)

with open_file(src, 'rb', **kwargs) as fsrc:
with open_file(dst, 'wb', **kwargs) as fdst:
copyfileobj(fsrc, fdst)
with _open(src, 'rb', **kwargs) as fsrc, _open(dst, 'wb', **kwargs) as fdst:
copyfileobj(fsrc, fdst)


def copytree(src, dst, symlinks=False, ignore=None, server_side_copy=False, **kwargs):
"""Recursively copy a directory tree using copy2().
def copytree_copy(src, dst, symlinks=False, ignore=None, server_side_copy=False, **kwargs):
"""Recursively copy a directory tree using copy().
The destination directory must not already exist.
If exception(s) occur, an Error is raised with a list of reasons.
Expand All @@ -154,6 +131,8 @@ def copytree(src, dst, symlinks=False, ignore=None, server_side_copy=False, **kw
XXX Consider this example code rather than the ultimate tool.
"""
_check_src_dst(src, dst)

names = listdir(src, **kwargs)
if ignore is not None:
ignored_names = ignore(src, names)
Expand All @@ -172,29 +151,21 @@ def copytree(src, dst, symlinks=False, ignore=None, server_side_copy=False, **kw
linkto = readlink(srcname, **kwargs)
symlink(linkto, dstname, **kwargs)
elif isdir(srcname, **kwargs):
copytree(srcname, dstname, symlinks, ignore, **kwargs)
copytree_copy(srcname, dstname, symlinks, ignore, **kwargs)
else:
# Will raise a SpecialFileError for unsupported file types
copy2(srcname, dstname, server_side_copy=server_side_copy, **kwargs)
copy(srcname, dstname, server_side_copy=server_side_copy, **kwargs)
# catch the Error from the recursive copytree so that we can
# continue with other files
except Error as err:
errors.extend(err.args[0])
except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
except OSError as why:
if WindowsError is not None and isinstance(why, WindowsError):
# Copying file access times may fail on Windows
pass
else:
errors.append((src, dst, str(why)))
if errors:
raise Error(errors)


def copyfile_server_side(src, dst, **kwargs):
def _copyfile_server_side(src, dst, **kwargs):
"""
Server side copy of a file
Expand Down Expand Up @@ -247,7 +218,7 @@ def copyfile_server_side(src, dst, **kwargs):

ioctl_request(transaction_dst, CtlCode.FSCTL_SRV_COPYCHUNK_WRITE,
flags=IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL,
output_size=32, buffer=copychunkcopy_struct)
output_size=32, input_buffer=copychunkcopy_struct)

for result in transaction_dst.results:
copychunk_response = SMB2SrvCopyChunkResponse()
Expand Down Expand Up @@ -280,8 +251,39 @@ def _batches(lst, n):
yield lst[i:i + n]


def _samefile(src, dst):
return normcase(abspath(src)) == normcase(abspath(dst))
def _samefile(src, dst, **kwargs):
unc_src, rest_src = splitunc(src)
unc_dst, rest_dst = splitunc(dst)
if unc_src != unc_dst:
return False

if unc_src:
stat1 = stat(src, **kwargs)
if src == dst:
return True
try:
stat2 = stat(dst, **kwargs)
return stat1.st_ino == stat2.st_ino and stat1.st_dev == stat2.st_dev
except SMBOSError:
return False

else:
try:
return os.path.samefile(src, dst)
except OSError:
# target file does not exist
return False


def _is_remote_file(src):
unc, _ = splitunc(src)
return bool(unc)


def _is_sameshare(src, dst):
unc_src, _ = splitunc(src)
unc_dst, _ = splitunc(dst)
return unc_src == unc_dst


def copyfileobj(fsrc, fdst, length=16 * 1024):
Expand All @@ -291,3 +293,10 @@ def copyfileobj(fsrc, fdst, length=16 * 1024):
if not buf:
break
fdst.write(buf)


def _check_src_dst(src, dst):
if not _is_remote_file(src):
raise ValueError("Local sources are not supported yet")
if not _is_remote_file(dst):
raise ValueError("Local destinations are not supported yet")
55 changes: 29 additions & 26 deletions tests/test_smbclient_shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@

from __future__ import division

import os
import shutil
import sys

import pytest
import re
from time import sleep

import smbclient
from smbclient import open_file, mkdir, stat
from smbclient.path import exists, isdir
from smbclient.shutil import rmtree, copy2, copytree, Error, copyfile_server_side, copyfile
from smbclient import open_file, mkdir, stat, symlink
from smbclient.path import exists
from smbclient.shutil import rmtree, copy, copytree_copy, _copyfile_server_side, copyfile


def test_rmtree(smb_share):
Expand All @@ -26,6 +28,9 @@ def test_rmtree(smb_share):
with open_file("%s\\dir2\\file2" % smb_share, mode='w') as fd:
fd.write(u"content")

if os.name == "nt" or os.environ.get('SMB_FORCE', False):
symlink("%s\\dir2\\file2" % smb_share, "%s\\dir2\\file3" % smb_share)

assert exists("%s\\dir2" % smb_share) is True

rmtree("%s\\dir2" % smb_share)
Expand Down Expand Up @@ -62,12 +67,11 @@ def test_copy2(smb_share):
fd.write(u"content")

sleep(0.1)
copy2("%s\\file1" % smb_share, "%s\\dir2\\file1" % smb_share)
copy("%s\\file1" % smb_share, "%s\\dir2\\file1" % smb_share)

src_stat = stat("%s\\file1" % smb_share)
dst_stat = stat("%s\\dir2\\file1" % smb_share)
assert src_stat.st_atime == dst_stat.st_atime
assert src_stat.st_mtime == dst_stat.st_mtime

assert src_stat.st_size == dst_stat.st_size


Expand All @@ -79,12 +83,12 @@ def test_copy2_raises_when_source_and_target_identical(smb_share):
sleep(0.1)
if (sys.version_info > (3, 0)):
expected = 'are the same file'
context = pytest.raises(Error, match=re.escape(expected))
context = pytest.raises(ValueError, match=re.escape(expected))
else:
context = pytest.raises(Error)
context = pytest.raises(ValueError)

with context:
copy2("%s\\file1" % smb_share, "%s\\file1" % smb_share)
copy("%s\\file1" % smb_share, "%s\\file1" % smb_share)


def test_copy2_with_dir_as_target(smb_share):
Expand All @@ -93,12 +97,11 @@ def test_copy2_with_dir_as_target(smb_share):
fd.write(u"content")

sleep(0.1)
copy2("%s\\file1" % smb_share, "%s\\dir2" % smb_share)
copy("%s\\file1" % smb_share, "%s\\dir2" % smb_share)

src_stat = stat("%s\\file1" % smb_share)
dst_stat = stat("%s\\dir2\\file1" % smb_share)
assert src_stat.st_atime == dst_stat.st_atime
assert src_stat.st_mtime == dst_stat.st_mtime

assert src_stat.st_size == dst_stat.st_size


Expand All @@ -113,12 +116,11 @@ def test_copytree(smb_share):
fd.write(u"content")

sleep(0.01)
copytree("%s\\dir2" % smb_share, "%s\\dir4" % smb_share)
copytree_copy("%s\\dir2" % smb_share, "%s\\dir4" % smb_share)

src_stat = stat("%s\\dir2\\dir3\\file1" % smb_share)
dst_stat = stat("%s\\dir4\\dir3\\file1" % smb_share)
assert src_stat.st_atime == dst_stat.st_atime
assert src_stat.st_mtime == dst_stat.st_mtime

assert src_stat.st_size == dst_stat.st_size


Expand All @@ -136,18 +138,21 @@ def ignore(src, names):
return [name for name in names if name == "file2"]

sleep(0.01)
copytree("%s\\dir2" % smb_share, "%s\\dir4" % smb_share, ignore=ignore)
copytree_copy("%s\\dir2" % smb_share, "%s\\dir4" % smb_share, ignore=ignore)

assert exists("%s\\dir4\\dir3\\file1" % smb_share) is True
assert exists("%s\\dir4\\file2" % smb_share) is False


def test_copyfile_with_same_file(smb_share):
with open_file("%s\\file1" % smb_share, mode='w') as fd:
fd.write(u"content")

if (sys.version_info > (3, 0)):
expected = 'are the same file'
context = pytest.raises(Error, match=re.escape(expected))
context = pytest.raises(ValueError, match=re.escape(expected))
else:
context = pytest.raises(Error)
context = pytest.raises(ValueError)

with context:
copyfile("%s\\file1" % smb_share, "%s\\file1" % smb_share)
Expand All @@ -159,19 +164,18 @@ def test_server_side_copy(smb_share):
fd.write(u"content" * 1024)

sleep(0.1)
copy2("%s\\file1" % smb_share, "%s\\dir2\\file1" % smb_share, server_side_copy=True)
copy("%s\\file1" % smb_share, "%s\\dir2\\file1" % smb_share, server_side_copy=True)

src_stat = stat("%s\\file1" % smb_share)
dst_stat = stat("%s\\dir2\\file1" % smb_share)
assert src_stat.st_atime == dst_stat.st_atime
assert src_stat.st_mtime == dst_stat.st_mtime

assert src_stat.st_size == dst_stat.st_size


def test_server_side_copy_across_paths_raises():
expected = 'Server side copy can only occur on the same drive'
with pytest.raises(ValueError, match=re.escape(expected)):
copyfile_server_side("//host/filer1/file1", "//host/filer2/file2")
_copyfile_server_side("//host/filer1/file1", "//host/filer2/file2")


def test_server_side_copy_target_exists(smb_share):
Expand All @@ -187,7 +191,7 @@ def test_server_side_copy_target_exists(smb_share):
context = pytest.raises(ValueError)

with context:
copyfile_server_side("%s\\file1" % smb_share, "%s\\file2" % smb_share)
_copyfile_server_side("%s\\file1" % smb_share, "%s\\file2" % smb_share)


def test_server_side_copy_multiple_chunks(smb_share):
Expand All @@ -198,10 +202,9 @@ def test_server_side_copy_multiple_chunks(smb_share):
smbclient.shutil.CHUNK_SIZE = 1024

sleep(0.1)
copy2("%s\\file1" % smb_share, "%s\\dir2\\file1" % smb_share, server_side_copy=True)
copy("%s\\file1" % smb_share, "%s\\dir2\\file1" % smb_share, server_side_copy=True)

src_stat = stat("%s\\file1" % smb_share)
dst_stat = stat("%s\\dir2\\file1" % smb_share)
assert src_stat.st_atime == dst_stat.st_atime
assert src_stat.st_mtime == dst_stat.st_mtime

assert src_stat.st_size == dst_stat.st_size

0 comments on commit 84eb2cb

Please sign in to comment.