Skip to content

Commit

Permalink
Merge pull request #900 from talkdirty/develop
Browse files Browse the repository at this point in the history
Add onlyMatching argument to syncFromSynapse to filter downloads
  • Loading branch information
thomasyu888 authored Aug 13, 2022
2 parents 0e9f7c4 + 436504f commit 4aac54e
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions synapseutils/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _sync_executor(syn):


def syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFiles=None, followLink=False,
manifest="all", downloadFile=True):
manifest="all", downloadFile=True, onlyMatching=None):
"""Synchronizes all the files in a folder (including subfolders) from Synapse and adds a readme manifest with file
metadata.
Expand All @@ -69,6 +69,11 @@ def syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFi
:param downloadFile Determines whether downloading the files.
Defaults to True
:param onlyMatching Determines list of regexes to be matched against files.
Only if at least one file matches the regex, it will
be downloaded.
Defaults to None
:returns: list of entities (files, tables, links)
This function will crawl all subfolders of the project/folder specified by `entity` and download all files that have
Expand Down Expand Up @@ -104,7 +109,7 @@ def syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFi
# 2 threads always, if those aren't available then we'll run single threaded to avoid a deadlock
with _sync_executor(syn) as executor:
sync_from_synapse = _SyncDownloader(syn, executor)
files = sync_from_synapse.sync(entity, path, ifcollision, followLink, downloadFile, manifest)
files = sync_from_synapse.sync(entity, path, ifcollision, followLink, downloadFile, manifest, onlyMatching)

# the allFiles parameter used to be passed in as part of the recursive implementation of this function
# with the public signature invoking itself. now that this isn't a recursive any longer we don't need
Expand Down Expand Up @@ -225,7 +230,7 @@ def __init__(self, syn, executor: concurrent.futures.Executor, max_concurrent_fi
max_concurrent_file_downloads = max(int(max_concurrent_file_downloads or self._syn.max_threads / 2), 1)
self._file_semaphore = threading.BoundedSemaphore(max_concurrent_file_downloads)

def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifest="all"):
def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifest="all", onlyMatching=None):
progress = CumulativeTransferProgress('Downloaded')

if is_synapse_id(entity):
Expand All @@ -238,7 +243,7 @@ def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifes
)

if is_container(entity):
root_folder_sync = self._sync_root(entity, path, ifcollision, followLink, progress, downloadFile, manifest)
root_folder_sync = self._sync_root(entity, path, ifcollision, followLink, progress, downloadFile, manifest, onlyMatching)

# once the whole folder hierarchy has been traversed this entrant thread waits for
# all file downloads to complete before returning
Expand All @@ -256,22 +261,39 @@ def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifes
files.sort(key=lambda f: f.get('path') or '')
return files

def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLink, progress, downloadFile):
def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLink, progress, downloadFile, onlyMatching):
try:
# we use syn.get to download the File.
# these context managers ensure that we are using some shared state
# when conducting that download (shared progress bar, ExecutorService shared
# by all multi threaded downloads in this sync)
with progress.accumulate_progress(), \
download_shared_executor(self._executor):

entity = self._syn.get(
entity_id,
downloadLocation=path,
ifcollision=ifcollision,
followLink=followLink,
downloadFile=downloadFile,
)

file_matches = True
if onlyMatching is not None:
file_matches = False
entity_meta = self._syn.get(
entity_id,
downloadFile=False,
)
for regex in onlyMatching:
if re.match(regex, entity_meta.name) is not None:
file_matches = True

if file_matches:
entity = self._syn.get(
entity_id,
downloadLocation=path,
ifcollision=ifcollision,
followLink=followLink,
downloadFile=downloadFile,
)
else:
parent_folder_sync.update(
finished_id=entity_id,
)
return

files = []
provenance = None
Expand Down Expand Up @@ -302,7 +324,7 @@ def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLin
finally:
self._file_semaphore.release()

def _sync_root(self, root, root_path, ifcollision, followLink, progress, downloadFile, manifest="all"):
def _sync_root(self, root, root_path, ifcollision, followLink, progress, downloadFile, manifest="all", onlyMatching=None):
# stack elements are a 3-tuple of:
# 1. the folder entity/dict
# 2. the local path to the folder to download to
Expand Down Expand Up @@ -372,6 +394,7 @@ def _sync_root(self, root, root_path, ifcollision, followLink, progress, downloa
followLink,
progress,
downloadFile,
onlyMatching,
)

for child_folder in child_folders:
Expand Down

0 comments on commit 4aac54e

Please sign in to comment.