Skip to content

Commit

Permalink
Merge branch 'main' into v0.25-release
Browse files Browse the repository at this point in the history
  • Loading branch information
Wauplin committed Sep 17, 2024
2 parents 6f49af5 + f1a7ed4 commit 1b4bfe7
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
70 changes: 59 additions & 11 deletions src/huggingface_hub/_upload_large_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
logger = logging.getLogger(__name__)

WAITING_TIME_IF_NO_TASKS = 10 # seconds
MAX_NB_REGULAR_FILES_PER_COMMIT = 75
MAX_NB_LFS_FILES_PER_COMMIT = 150


def upload_large_folder_internal(
Expand Down Expand Up @@ -373,17 +375,18 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
if (
status.nb_workers_commit == 0
and status.queue_commit.qsize() > 0
and (status.last_commit_attempt is None or time.time() - status.last_commit_attempt > 5 * 60)
and status.last_commit_attempt is not None
and time.time() - status.last_commit_attempt > 5 * 60
):
status.nb_workers_commit += 1
logger.debug("Job: commit (more than 5 minutes since last commit attempt)")
return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25))
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 2. Commit if at least 25 files are ready to commit
elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 25:
# 2. Commit if at least 100 files are ready to commit
elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 150:
status.nb_workers_commit += 1
logger.debug("Job: commit (>25 files ready)")
return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25))
logger.debug("Job: commit (>100 files ready)")
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 3. Get upload mode if at least 10 files
elif status.queue_get_upload_mode.qsize() >= 10:
Expand Down Expand Up @@ -430,18 +433,39 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
logger.debug("Job: get upload mode")
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50))

# 10. Commit if at least 1 file
elif status.nb_workers_commit == 0 and status.queue_commit.qsize() > 0:
# 10. Commit if at least 1 file and 1 min since last commit attempt
elif (
status.nb_workers_commit == 0
and status.queue_commit.qsize() > 0
and status.last_commit_attempt is not None
and time.time() - status.last_commit_attempt > 1 * 60
):
status.nb_workers_commit += 1
logger.debug("Job: commit (1 min since last commit attempt)")
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 11. Commit if at least 1 file all other queues are empty and all workers are waiting
# e.g. when it's the last commit
elif (
status.nb_workers_commit == 0
and status.queue_commit.qsize() > 0
and status.queue_sha256.qsize() == 0
and status.queue_get_upload_mode.qsize() == 0
and status.queue_preupload_lfs.qsize() == 0
and status.nb_workers_sha256 == 0
and status.nb_workers_get_upload_mode == 0
and status.nb_workers_preupload_lfs == 0
):
status.nb_workers_commit += 1
logger.debug("Job: commit")
return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25))
return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit))

# 11. If all queues are empty, exit
# 12. If all queues are empty, exit
elif all(metadata.is_committed or metadata.should_ignore for _, metadata in status.items):
logger.info("All files have been processed! Exiting worker.")
return None

# 12. If no task is available, wait
# 13. If no task is available, wait
else:
status.nb_workers_waiting += 1
logger.debug(f"No task available, waiting... ({WAITING_TIME_IF_NO_TASKS}s)")
Expand Down Expand Up @@ -547,6 +571,30 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]:
return [queue.get() for _ in range(min(queue.qsize(), n))]


def _get_items_to_commit(queue: "queue.Queue[JOB_ITEM_T]") -> List[JOB_ITEM_T]:
"""Special case for commit job: the number of items to commit depends on the type of files."""
# Can take at most 50 regular files and/or 100 LFS files in a single commit
items: List[JOB_ITEM_T] = []
nb_lfs, nb_regular = 0, 0
while True:
# If empty queue => commit everything
if queue.qsize() == 0:
return items

# If we have enough items => commit them
if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT:
return items

# Else, get a new item and increase counter
item = queue.get()
items.append(item)
_, metadata = item
if metadata.upload_mode == "lfs":
nb_lfs += 1
else:
nb_regular += 1


def _print_overwrite(report: str) -> None:
"""Print a report, overwriting the previous lines.
Expand Down
31 changes: 28 additions & 3 deletions src/huggingface_hub/hf_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
validate_hf_hub_args,
)
from .utils import tqdm as hf_tqdm
from .utils._deprecation import _deprecate_method
from .utils._typing import CallableT
from .utils.endpoint_helpers import _is_emission_within_threshold

Expand Down Expand Up @@ -1405,6 +1406,10 @@ class User:
Number of upvotes received by the user.
num_likes (`int`, *optional*):
Number of likes given by the user.
num_following (`int`, *optional*):
Number of users this user is following.
num_followers (`int`, *optional*):
Number of users following this user.
orgs (list of [`Organization`]):
List of organizations the user is part of.
"""
Expand All @@ -1423,6 +1428,8 @@ class User:
num_papers: Optional[int] = None
num_upvotes: Optional[int] = None
num_likes: Optional[int] = None
num_following: Optional[int] = None
num_followers: Optional[int] = None
orgs: List[Organization] = field(default_factory=list)

def __init__(self, **kwargs) -> None:
Expand All @@ -1439,6 +1446,8 @@ def __init__(self, **kwargs) -> None:
self.num_papers = kwargs.pop("numPapers", None)
self.num_upvotes = kwargs.pop("numUpvotes", None)
self.num_likes = kwargs.pop("numLikes", None)
self.num_following = kwargs.pop("numFollowing", None)
self.num_followers = kwargs.pop("numFollowers", None)
self.user_type = kwargs.pop("type", None)
self.orgs = [Organization(**org) for org in kwargs.pop("orgs", [])]

Expand Down Expand Up @@ -4010,6 +4019,9 @@ def _payload_as_ndjson() -> Iterable[bytes]:

@experimental
@validate_hf_hub_args
@_deprecate_method(
version="0.27", message="This is an experimental feature. Please use `upload_large_folder` instead."
)
def create_commits_on_pr(
self,
*,
Expand Down Expand Up @@ -4848,8 +4860,10 @@ def upload_folder(
new files. This is useful if you don't know which files have already been uploaded.
Note: to avoid discrepancies the `.gitattributes` file is not deleted even if it matches the pattern.
multi_commits (`bool`):
Deprecated. For large uploads, use `upload_large_folder` instead.
If True, changes are pushed to a PR using a multi-commit process. Defaults to `False`.
multi_commits_verbose (`bool`):
Deprecated. For large uploads, use `upload_large_folder` instead.
If True and `multi_commits` is used, more information will be displayed to the user.
run_as_future (`bool`, *optional*):
Whether or not to run this method in the background. Background jobs are run sequentially without
Expand Down Expand Up @@ -5342,15 +5356,16 @@ def upload_large_folder(
Order of priority:
1. Commit if more than 5 minutes since last commit attempt (and at least 1 file).
2. Commit if at least 25 files are ready to commit.
2. Commit if at least 150 files are ready to commit.
3. Get upload mode if at least 10 files have been hashed.
4. Pre-upload LFS file if at least 1 file and no worker is pre-uploading.
5. Hash file if at least 1 file and no worker is hashing.
6. Get upload mode if at least 1 file and no worker is getting upload mode.
7. Pre-upload LFS file if at least 1 file (exception: if hf_transfer is enabled, only 1 worker can preupload LFS at a time).
8. Hash file if at least 1 file to hash.
9. Get upload mode if at least 1 file to get upload mode.
10. Commit if at least 1 file to commit.
10. Commit if at least 1 file to commit and at least 1 min since last commit attempt.
11. Commit if at least 1 file to commit and all other queues are empty.
Special rules:
- If `hf_transfer` is enabled, only 1 LFS uploader at a time. Otherwise the CPU would be bloated by `hf_transfer`.
Expand Down Expand Up @@ -9463,14 +9478,24 @@ def _prepare_upload_folder_additions(
repo_type=repo_type,
token=token,
)
if len(filtered_repo_objects) > 30:
logger.info(
"It seems you are trying to upload a large folder at once. This might take some time and then fail if "
"the folder is too large. For such cases, it is recommended to upload in smaller batches or to use "
"`HfApi().upload_large_folder(...)`/`huggingface-cli upload-large-folder` instead. For more details, "
"check out https://huggingface.co/docs/huggingface_hub/main/en/guides/upload#upload-a-large-folder."
)

return [
logger.info(f"Start hashing {len(filtered_repo_objects)} files.")
operations = [
CommitOperationAdd(
path_or_fileobj=relpath_to_abspath[relpath], # absolute path on disk
path_in_repo=prefix + relpath, # "absolute" path in repo
)
for relpath in filtered_repo_objects
]
logger.info(f"Finished hashing {len(filtered_repo_objects)} files.")
return operations

def _validate_yaml(self, content: str, *, repo_type: Optional[str] = None, token: Union[bool, str, None] = None):
"""
Expand Down
2 changes: 2 additions & 0 deletions tests/test_hf_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4093,6 +4093,8 @@ def test_user_overview(self) -> None:
assert overview.num_upvotes > 10
assert len(overview.orgs) > 0
assert any(org.name == "huggingface" for org in overview.orgs)
assert overview.num_following > 300
assert overview.num_followers > 1000

def test_organization_members(self) -> None:
members = self.api.list_organization_members("huggingface")
Expand Down

0 comments on commit 1b4bfe7

Please sign in to comment.