Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add push to hub tracker #1214

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion swift/hub/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import re
import shutil
import tempfile
import time
import uuid
from http import HTTPStatus
from http.cookiejar import CookieJar
Expand Down Expand Up @@ -142,7 +143,7 @@ def create_model(self,
'Visibility': visibility, # server check
'License': license,
'OriginalModelId': original_model_id,
'TrainId': os.environ.get('MODELSCOPE_TRAIN_ID', ''),
'TrainId': os.environ.get('MODELSCOPE_TRAIN_ID') or f'swift-{time.time()}',
}
r = self.session.post(
path, json=body, cookies=cookies, headers=self.headers)
Expand Down
118 changes: 27 additions & 91 deletions swift/trainers/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re
import shutil
import time
from distutils.util import strtobool
from pathlib import Path
from types import MethodType
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
Expand All @@ -23,36 +24,39 @@
from transformers.trainer import (ADAPTER_CONFIG_NAME, ADAPTER_SAFE_WEIGHTS_NAME, ADAPTER_WEIGHTS_NAME, CONFIG_NAME,
PREFIX_CHECKPOINT_DIR, SAFE_WEIGHTS_NAME, TRAINER_STATE_NAME, TRAINING_ARGS_NAME,
WEIGHTS_NAME, IntervalStrategy, Trainer, TrainerCallback, is_peft_available)
from transformers.trainer_utils import EvalPrediction
from transformers.trainer_utils import EvalPrediction, HubStrategy
from transformers.training_args import TrainingArguments
from transformers.utils import is_sagemaker_mp_enabled, is_torch_npu_available
from transformers.utils import is_sagemaker_mp_enabled, is_torch_npu_available, PushInProgress

from swift.hub import Repository
from swift.hub.check_model import check_local_model_is_latest
from swift.torchacc_utils import (save_ta_ddp_checkpoint, save_ta_fsdp_checkpoint, ta_load_optimizer_and_scheduler,
ta_save_optimizer_and_scheduler, ta_trim_graph)
from swift.tuners import SwiftModel
from swift.utils import check_json_format, create_ms_repo, get_logger, use_torchacc
from swift.utils import check_json_format, create_ms_repo, get_logger, use_torchacc, push_to_ms_hub
from swift.utils.constants import Invoke
from .optimizers.galore import create_optimizer_and_scheduler
from .utils import can_return_loss, find_labels, get_function, is_instance_of_ms_model

logger = get_logger()


def _push_to_hub(self: Repository, commit_message: str = 'Commit files to Modelscope Hub', **kwargs):
blocking = kwargs.get('blocking', True)
self.push(commit_message)
if not blocking:
# Compatible with transformers
return None, None
else:
return None


class PushToMsHubMixin:
repo: Repository

_hub_type = 'hf' if strtobool(os.environ.get('USE_HF', 'False')) else 'ms'

if _hub_type == 'ms':
import transformers.trainer
transformers.trainer.create_repo = create_ms_repo
transformers.trainer.upload_folder = push_to_ms_hub

def init_hf_repo(self) -> None:
if self._hub_type == 'hf':
return super().init_hf_repo()
else:
self.init_git_repo(at_init=True)

def _add_patterns_to_file(self, file_name: str, patterns: List[str], commit_message: Optional[str] = None) -> None:
# Make sure we only do this on the main process
if not self.is_world_process_zero():
Expand Down Expand Up @@ -100,9 +104,15 @@ def _add_patterns_to_gitattributes(self, patterns: List[str], commit_message: Op
commit_message = f'Add `{patterns[0]}` patterns to {file_name}'
self._add_patterns_to_file(file_name, new_patterns, commit_message)

def init_hf_repo(self) -> None:
"""init ms repo. Compatible with transformers>=4.34"""
self.init_git_repo(at_init=True)
@staticmethod
def _push_to_hub(repo: Repository, commit_message: str = 'Commit files to Modelscope Hub', **kwargs):
blocking = kwargs.get('blocking', True)
repo.push(commit_message)
if not blocking:
# Compatible with transformers
return None, None
else:
return None

def init_git_repo(self, at_init: bool = False) -> None:
if not self.is_world_process_zero():
Expand All @@ -114,7 +124,7 @@ def init_git_repo(self, at_init: bool = False) -> None:
self.args.hub_model_id = create_ms_repo(self.args.hub_model_id, self.args.hub_token, self.args.hub_private_repo)
self.repo = Repository(self.args.output_dir, self.args.hub_model_id)
self._add_patterns_to_gitattributes(['*.safetensors', '*.bin', '*.pt'])
self.repo.push_to_hub = MethodType(_push_to_hub, self.repo)
self.repo.push_to_hub = MethodType(self._push_to_hub, self.repo)
self.repo.local_dir = self.repo.model_dir # hf compatibility

# By default, ignore the checkpoint folders
Expand All @@ -128,82 +138,8 @@ def init_git_repo(self, at_init: bool = False) -> None:
if os.environ.get('SM_TRAINING_ENV'):
self._add_patterns_to_gitignore(['*.sagemaker-uploading', '*.sagemaker-uploaded'],
'Add `*.sagemaker` patterns to .gitignore')

self.push_in_progress = None

def push_to_hub(self, commit_message: str = 'End of training', **kwargs) -> None:
# user calls manually `push_to_hub` with `self.args.push_to_hub = False`
create_model_card = kwargs.pop('create_model_card', None)
if not hasattr(self, 'repo'):
self.init_git_repo()
self.save_model(_internal_call=True)

if not self.is_world_process_zero():
return

self.repo.push_to_hub(commit_message, **kwargs)
# push separately the model card to be independent from the rest of the model
readme_path = os.path.join(self.args.output_dir, 'README.md')
if create_model_card is None:
create_model_card = not os.path.exists(readme_path)
if create_model_card and self.args.should_save:
model_name = kwargs.pop('model_name', None)
if model_name is None and self.args.should_save:
if self.args.hub_model_id is not None:
model_name = self.args.hub_model_id.split('/')[-1]
else:
model_name = os.path.basename(self.args.output_dir)
self.create_model_card(model_name=model_name, **kwargs)
self.repo.push_to_hub('update model card README.md', **kwargs)

def _push_from_checkpoint(self, checkpoint_folder: str) -> None:
"""Compatible with transformers>=4.32"""
# Only push from one node.
if not self.is_world_process_zero() or self.args.push_hub_strategy == 'end':
return
output_dir = self.args.output_dir
# To avoid a new synchronization of all model weights, we just copy the file from the checkpoint folder
modeling_files = [CONFIG_NAME, WEIGHTS_NAME, SAFE_WEIGHTS_NAME]
if is_peft_available():
modeling_files.extend([ADAPTER_CONFIG_NAME, ADAPTER_WEIGHTS_NAME, ADAPTER_SAFE_WEIGHTS_NAME])
for modeling_file in modeling_files:
if os.path.isfile(os.path.join(checkpoint_folder, modeling_file)):
shutil.copy(os.path.join(checkpoint_folder, modeling_file), os.path.join(output_dir, modeling_file))
# Saving the tokenizer is fast and we don't know how many files it may have spawned, so we resave it to be sure.
if self.tokenizer is not None:
self.tokenizer.save_pretrained(output_dir)
# Same for the training arguments
torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))

try:
if self.args.push_hub_strategy == 'checkpoint':
# Temporarily move the checkpoint just saved for the push
tmp_checkpoint = os.path.join(output_dir, 'last-checkpoint')
# We have to remove the "last-checkpoint" dir if it exists, otherwise the checkpoint is moved as a
# subfolder.
if os.path.isdir(tmp_checkpoint):
shutil.rmtree(tmp_checkpoint)
shutil.move(checkpoint_folder, tmp_checkpoint)

if self.args.save_strategy == IntervalStrategy.STEPS:
commit_message = f'Training in progress, step {self.state.global_step}'
else:
commit_message = f'Training in progress, epoch {int(self.state.epoch)}'
if self.args.push_hub_strategy == 'push_best':
folder, checkpoint_name = os.path.split(checkpoint_folder)
checkpoint_name = checkpoint_name.replace('tmp-checkpoint-', 'checkpoint-')
last_model_checkpoint = os.path.join(folder, checkpoint_name)
if last_model_checkpoint == self.state.best_model_checkpoint:
self.repo.push_to_hub(commit_message=commit_message, blocking=False, auto_lfs_prune=True)
else:
self.repo.push_to_hub(commit_message=commit_message, blocking=False, auto_lfs_prune=True)
except Exception as e:
logger.error(f'Error when pushing to hub: {e}')
finally:
if self.args.push_hub_strategy == 'checkpoint':
# Move back the checkpoint to its place
shutil.move(tmp_checkpoint, checkpoint_folder)


class SwiftMixin:

Expand Down
71 changes: 41 additions & 30 deletions swift/utils/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import subprocess
import tempfile
import time
from typing import Optional
from pathlib import Path
from typing import Optional, Union, List

from requests.exceptions import HTTPError

Expand All @@ -15,63 +16,73 @@
logger = get_logger()


def create_ms_repo(hub_model_id: str, hub_token: Optional[str] = None, hub_private_repo: bool = False) -> str:
assert hub_model_id is not None, 'Please enter a valid hub_model_id'
def create_ms_repo(
repo_id: str,
*,
token: Optional[str] = None,
private: bool = False,
**kwargs) -> str:
assert repo_id is not None, 'Please enter a valid hub_model_id'

api = HubApi()
if hub_token is None:
if token is None:
hub_token = os.environ.get('MODELSCOPE_API_TOKEN')
if hub_token is not None:
api.login(hub_token)
visibility = ModelVisibility.PRIVATE if hub_private_repo else ModelVisibility.PUBLIC
if token is not None:
api.login(token)
visibility = ModelVisibility.PRIVATE if private else ModelVisibility.PUBLIC

if '/' not in hub_model_id:
if '/' not in repo_id:
user_name = ModelScopeConfig.get_user_info()[0]
assert isinstance(user_name, str)
hub_model_id = f'{user_name}/{hub_model_id}'
logger.info(f"'/' not in hub_model_id, setting hub_model_id: {hub_model_id}")
repo_id = f'{user_name}/{repo_id}'
logger.info(f"'/' not in hub_model_id, setting hub_model_id: {repo_id}")
try:
api.create_model(hub_model_id, visibility)
api.create_model(repo_id, visibility)
except HTTPError:
# The remote repository has been created
pass
return hub_model_id
return repo_id


def push_to_ms_hub(ckpt_dir: str,
hub_model_id: str,
hub_token: Optional[str] = None,
hub_private_repo: bool = False,
commit_message: str = 'update files'):
logger.info(f'Starting push to hub. ckpt_dir: {ckpt_dir}.')
def push_to_ms_hub(self,
*,
repo_id: str,
folder_path: Union[str, Path],
path_in_repo: Optional[str] = None,
commit_message: Optional[str] = None,
token: Union[str, bool, None] = None):
logger.info(f'Starting push to hub. ckpt_dir: {folder_path}.')
tmp_file_name = tempfile.TemporaryDirectory().name
subprocess_run(['git', 'lfs', 'env'], stdout=subprocess.PIPE) # check git-lfs install

hub_model_id = create_ms_repo(hub_model_id, hub_token, hub_private_repo)
path_in_repo = path_in_repo or ''
if not folder_path.endswith(path_in_repo):
folder_path = os.path.join(folder_path, path_in_repo)

git_token = ModelScopeConfig.get_token()
ms_url = f'https://oauth2:{git_token}@www.modelscope.cn/{hub_model_id}.git'
subprocess_run(['git', '-C', ckpt_dir, 'clone', ms_url, tmp_file_name], env={'GIT_LFS_SKIP_SMUDGE': '1'})
tmp_dir = os.path.join(ckpt_dir, tmp_file_name)
ms_url = f'https://oauth2:{git_token}@www.modelscope.cn/{repo_id}.git'
subprocess_run(['git', '-C', folder_path, 'clone', ms_url, tmp_file_name], env={'GIT_LFS_SKIP_SMUDGE': '1'})
tmp_dir = os.path.join(folder_path, tmp_file_name)
subprocess_run(['git', '-C', tmp_dir, 'lfs', 'pull'])
logger.info('Git clone the repo successfully.')
# mv .git
dst_git_path = os.path.join(ckpt_dir, '.git')
dst_git_path = os.path.join(folder_path, '.git')
if os.path.exists(dst_git_path):
shutil.rmtree(dst_git_path)
shutil.copytree(os.path.join(tmp_dir, '.git'), dst_git_path)
shutil.copy(os.path.join(tmp_dir, '.gitattributes'), os.path.join(ckpt_dir, '.gitattributes'))
shutil.copy(os.path.join(tmp_dir, '.gitattributes'), os.path.join(folder_path, '.gitattributes'))
shutil.rmtree(tmp_dir)
# add commit push
subprocess_run(['git', '-C', ckpt_dir, 'lfs', 'install'])
subprocess_run(['git', '-C', folder_path, 'lfs', 'install'])
time.sleep(0.5)
logger.info('Start `git add .`')
subprocess_run(['git', '-C', ckpt_dir, 'add', '.'])
if is_repo_clean(ckpt_dir):
subprocess_run(['git', '-C', folder_path, 'add', '.'])
if is_repo_clean(folder_path):
logger.info('Repo currently clean. Ignoring commit and push_to_hub')
else:
subprocess_run(['git', '-C', ckpt_dir, 'commit', '-m', commit_message])
subprocess_run(['git', '-C', ckpt_dir, 'push'])
url = f'https://www.modelscope.cn/models/{hub_model_id}/summary'
subprocess_run(['git', '-C', folder_path, 'commit', '-m', commit_message])
subprocess_run(['git', '-C', folder_path, 'push'])
url = f'https://www.modelscope.cn/models/{repo_id}/summary'
logger.info(f'Push to Modelscope successful. url: `{url}`.')


Expand Down
Loading