diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..da7bf30 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,19 @@ +# Ignore the following files for image building +.github/ +logs/ +venv/ +frontend/node_modules/ +frontend/dist/ + +# Append the config file via secret files +# or environment files +config.cfg + +# Ignore Python cache +__pycache__/ + +# Ignore Docker Compose files +relmonservice.yaml + +relmonsvc.sh +*.pid \ No newline at end of file diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 1036d0d..3cb8fcf 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -8,26 +8,25 @@ on: jobs: build: - - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 name: Get newest code and run pylint steps: - name: Checkout repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: submodules: true - - name: Set up Python 3.6.8 + - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.6.8" + python-version: "3.11.4" - name: Install dependencies run: | python3 -m pip install --upgrade pip python3 -m pip install -r requirements.txt - name: Run pylint - # --fail-under=9.5 - fail if score is below 9.5 + # --fail-under=9.75 - fail if score is below 9.75 # --fail-on=E - fail if there were errors, regardless of the score # --reports=y - print a report at the end run: | python3 -m pylint --version - python3 -m pylint --fail-under=9.5 --fail-on=E --reports=y `find . -type f | grep .py$ | xargs` + python3 -m pylint --fail-under=9.75 --fail-on=E --reports=y `find . -type d \( -path ./venv -o -path ./core_lib \) -prune -o -type f | grep .py$ | xargs` diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..9be221f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "core_lib"] + path = core_lib + url = https://github.com/cms-PdmV/PdmVWebCore.git + branch = SSOMigration diff --git a/.pylintrc b/.pylintrc index 51926e6..a0c5f87 100644 --- a/.pylintrc +++ b/.pylintrc @@ -60,88 +60,9 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use "--disable=all --enable=classes # --disable=W". -disable=print-statement, - parameter-unpacking, - unpacking-in-except, - old-raise-syntax, - backtick, - long-suffix, - old-ne-operator, - old-octal-literal, - import-star-module-level, - non-ascii-bytes-literal, - raw-checker-failed, - bad-inline-option, - locally-disabled, - file-ignored, - suppressed-message, - useless-suppression, - deprecated-pragma, - use-symbolic-message-instead, - apply-builtin, - basestring-builtin, - buffer-builtin, - cmp-builtin, - coerce-builtin, - execfile-builtin, - file-builtin, - long-builtin, - raw_input-builtin, - reduce-builtin, - standarderror-builtin, - unicode-builtin, - xrange-builtin, - coerce-method, - delslice-method, - getslice-method, - setslice-method, - no-absolute-import, - old-division, - dict-iter-method, - dict-view-method, - next-method-called, - metaclass-assignment, - indexing-exception, - raising-string, - reload-builtin, - oct-method, - hex-method, - nonzero-method, - cmp-method, - input-builtin, - round-builtin, - intern-builtin, - unichr-builtin, - map-builtin-not-iterating, - zip-builtin-not-iterating, - range-builtin-not-iterating, - filter-builtin-not-iterating, - using-cmp-argument, - eq-without-hash, - div-method, - idiv-method, - rdiv-method, - exception-message-attribute, - invalid-str-codec, - sys-max-int, - bad-python3-import, - deprecated-string-function, - deprecated-str-translate-call, - deprecated-itertools-function, - deprecated-types-field, - next-method-defined, - dict-items-not-iterating, - dict-keys-not-iterating, - dict-values-not-iterating, - deprecated-operator-function, - deprecated-urllib-function, - xreadlines-attribute, - deprecated-sys-function, - exception-escape, - comprehension-escape, - raise-missing-from, - consider-using-with, - unspecified-encoding +disable=consider-using-f-string, + broad-exception-caught, + consider-using-with # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option @@ -317,13 +238,6 @@ max-line-length=100 # Maximum number of lines in a module. max-module-lines=1000 -# List of optional constructs for which whitespace checking is disabled. `dict- -# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. -# `trailing-comma` allows a space between comma and closing bracket: (a, ). -# `empty-line` allows space-only lines. -no-space-check=trailing-comma, - dict-separator - # Allow the body of a class to be on the same line as the declaration if body # contains single statement. single-line-class-stmt=no @@ -564,9 +478,3 @@ known-standard-library= # Force import order to recognize a module as part of a third party library. known-third-party=enchant - -[EXCEPTIONS] - -# Exceptions that will emit a warning when being caught. Defaults to -# "BaseException, Exception". -overgeneral-exceptions=BaseException, diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1307611 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,40 @@ +# Build web application bundle +FROM node:16-buster-slim@sha256:1417528032837e47462ea8cfe983108b0152f989e95cba2ddfbe0f0ddc2dcfbd AS frontend + +WORKDIR /usr/app + +COPY frontend . + +RUN npm install +RUN npm run build + +# Build dependencies +FROM python:3.11.3-alpine3.18@sha256:caafba876f841774905f73df0fcaf7fe3f55aaf9cb48a9e369a41077f860d4a7 AS build + +WORKDIR /usr/app +RUN python -m venv /usr/app/venv +ENV PATH="/usr/app/venv/bin:$PATH" + +COPY requirements.txt . +RUN pip install -r requirements.txt + +# Create image for deployment +FROM python:3.11.3-alpine3.18@sha256:caafba876f841774905f73df0fcaf7fe3f55aaf9cb48a9e369a41077f860d4a7 AS backend + +RUN addgroup -g 1001 pdmv && adduser --disabled-password -u 1001 -G pdmv pdmv + +RUN mkdir /usr/app && chown pdmv:pdmv /usr/app +WORKDIR /usr/app + +COPY --chown=pdmv:pdmv . . +RUN rm -rf frontend/* + +COPY --chown=pdmv:pdmv --from=frontend /usr/app/dist ./frontend/dist +COPY --chown=pdmv:pdmv --from=build /usr/app/venv ./venv + +RUN chmod -R 707 /usr/app/relmons/ + +USER 1001 + +ENV PATH="/usr/app/venv/bin:$PATH" +CMD [ "python", "main.py" ] \ No newline at end of file diff --git a/config.cfg b/config.cfg deleted file mode 100644 index c1b1a34..0000000 --- a/config.cfg +++ /dev/null @@ -1,22 +0,0 @@ -[prod] -callback_url = https://cms-pdmv.cern.ch/relmonservice/api/update -service_url = https://cms-pdmv.cern.ch/relmonservice -reports_url = https://cms-pdmv.cern.ch/relmon -submission_host = lxplus.cern.ch -remote_directory = relmon_submission/ -ssh_credentials = /home/... -database_auth = /home/... -web_location = /eos/... -tick_interval = 600 - - -[dev] -callback_url = https://pdmv-dev-proxy.web.cern.ch/relmonservice/api/update -service_url = https://pdmv-dev-proxy.web.cern.ch/relmonservice -reports_url = https://cms-pdmv.cern.ch/relmon -submission_host = lxplus.cern.ch -remote_directory = relmon_test_submission/ -ssh_credentials = /home/... -database_auth = /home/... -web_location = /eos/... -tick_interval = 600 diff --git a/core_lib b/core_lib new file mode 160000 index 0000000..2bd7886 --- /dev/null +++ b/core_lib @@ -0,0 +1 @@ +Subproject commit 2bd78867a019b1bbfaef16e40e7947d38d837ff7 diff --git a/environment.py b/environment.py new file mode 100644 index 0000000..ca30873 --- /dev/null +++ b/environment.py @@ -0,0 +1,98 @@ +""" +This module parses some configuration variables from +the runtime environment to use them in different sections +from this application + +Attributes: + CALLBACK_URL (str): This is the url for the endpoint that HTCondor + jobs use to update the status for the running RelMon job. + For example: "https://cms-pdmv.cern.ch/relmonservice/api/update" + For more details, please see the endpoint definition: /api/update. + SERVICE_URL (str): This is the url for RelMonService2 application. + For example: "https://cms-pdmv.cern.ch/relmonservice" + It is used to include the application's url into email notifications + and for request cookies/tokens for authenticating callback request. + REPORTS_URL (str): This is the url for RelMon report page. + For example: "https://cms-pdmv.cern.ch/relmon" + This is a static web page that renders all outputs for the reports. + SUBMISSION_HOST (str): This is the server where this application will open an SSH session + to submit jobs through HTCondor. For example: "lxplus.cern.ch" + REMOTE_DIRECTORY (str): This is the folder (into AFS or EOS) that stores + all the required bundle files to submit a HTCondor job. + SERVICE_ACCOUNT_USERNAME (str): Username to authenticate to `SUBMISSION_HOST` + SERVICE_ACCOUNT_PASSWORD (str): Password to authenticate to `SUBMISSION_HOST` + EMAIL_AUTH_REQUIRED (bool): If this environment variable is provided, + the email client will authenticate to the email server. By default it is false, + because this anonymous server does not require to authenticate. + WEB_LOCATION_PATH (str): This is the path (AFS or EOS) + where all RelMon reports are going to be stored. This is the path used by `REPORT_URL` + application to load the reports static files. + TICK_INTERNAL (int): Elapsed time in seconds to perform a tick, please see `controller.tick()` + for more details. + MONGO_DB_HOST (str): MongoDB host for opening a client session. + MONGO_DB_PORT (int): MongoDB port for opening a client session. + MONGO_DB_USER (str): MongoDB user to authenticate a new client session. + MONGO_DB_PASSWORD (str): MongoDB password to authenticate a new client session. + HOST (str): Flask listening hostname + PORT (int): Flask port + DEBUG (bool): Enables DEBUG mode for RelMonService2 application + ENABLE_AUTH_MIDDLEWARE (bool): Enables the AuthenticationMiddleware to parse JWT + or enable the application to handle OIDC flow by itself. + SECRET_KEY (str): Flask secret key. + CLIENT_ID (str): Client ID related to RelMonService2 application + or the reverse proxy that provides authentication. + CALLBACK_CLIENT_ID (str): Client ID for CLI integration application. + CALLBACK_CLIENT_SECRET (str): Client secret for CLI integration application. +""" +import os +import inspect + +# RelMonService2 application +CALLBACK_URL: str = os.getenv("CALLBACK_URL", "") +SERVICE_URL: str = os.getenv("SERVICE_URL", "") +REPORTS_URL: str = os.getenv("REPORTS_URL", "") +SUBMISSION_HOST: str = os.getenv("SUBMISSION_HOST", "") +REMOTE_DIRECTORY: str = os.getenv("REMOTE_DIRECTORY", "") +SERVICE_ACCOUNT_USERNAME: str = os.getenv("SERVICE_ACCOUNT_USERNAME", "") +SERVICE_ACCOUNT_PASSWORD: str = os.getenv("SERVICE_ACCOUNT_PASSWORD", "") +EMAIL_AUTH_REQUIRED: bool = bool(os.getenv("EMAIL_AUTH_REQUIRED")) +WEB_LOCATION_PATH: str = os.getenv("WEB_LOCATION_PATH", "") +TICK_INTERVAL: int = int(os.getenv("TICK_INTERVAL", "600")) + +# MongoDB database +MONGO_DB_HOST: str = os.getenv("MONGO_DB_HOST", "") +MONGO_DB_PORT: int = int(os.getenv("MONGO_DB_PORT", "27017")) +MONGO_DB_USER: str = os.getenv("MONGO_DB_USER", "") +MONGO_DB_PASSWORD: str = os.getenv("MONGO_DB_PASSWORD", "") + +# Flask web server +HOST: str = os.getenv("HOST", "0.0.0.0") +PORT: int = int(os.getenv("PORT", "8000")) +DEBUG: bool = bool(os.getenv("DEBUG")) +ENABLE_AUTH_MIDDLEWARE: bool = bool(os.getenv("ENABLE_AUTH_MIDDLEWARE")) + +# OAuth2 credentials +SECRET_KEY: str = os.getenv("SECRET_KEY", "") +CLIENT_ID: str = os.getenv("CLIENT_ID", "") +CALLBACK_CLIENT_ID: str = os.getenv("CALLBACK_CLIENT_ID", "") +CALLBACK_CLIENT_SECRET: str = os.getenv("CALLBACK_CLIENT_SECRET", "") + +# Check that all environment variables are provided +missing_environment_variables: dict[str, str] = { + k: v + for k, v in globals().items() + if not k.startswith("__") + and not inspect.ismodule(v) + and not isinstance(v, bool) + and not v +} + +if missing_environment_variables: + msg: str = ( + "There are some environment variables " + "required to be set before running this application\n" + "Please set the following values via environment variables\n" + "For more details, please see the description available into `environment.py` module\n" + f"{list(missing_environment_variables.keys())}" + ) + raise RuntimeError(msg) diff --git a/local/controller.py b/local/controller.py index 96382a2..7b4b637 100644 --- a/local/controller.py +++ b/local/controller.py @@ -15,17 +15,23 @@ from local.relmon import RelMon from local.file_creator import FileCreator from local.email_sender import EmailSender +from environment import ( + SERVICE_URL, + REPORTS_URL, + REMOTE_DIRECTORY, +) -class Controller(): +class Controller: """ Main instance of RelMon logic Performs ticks during which RelMons are deleted, reset, submitted and their status is checked (if they are running) """ + def __init__(self): - self.logger = logging.getLogger('logger') - self.logger.info('***** Creating a controller! *****') + self.logger = logging.getLogger("logger") + self.logger.info("***** Creating a controller! *****") self.is_tick_running = False # Multithread manager manager = Manager() @@ -33,27 +39,26 @@ def __init__(self): self.relmons_to_reset = manager.list() self.relmons_to_delete = manager.list() self.config = None - self.remote_directory = 'relmon' + self.remote_directory = "relmon" self.ssh_executor = None self.file_creator = None self.email_sender = None - self.service_url = 'localhost' - self.reports_url = 'localhost' + self.service_url = "localhost" + self.reports_url = "localhost" - def set_config(self, config): + def set_config(self): """ Take in a config and update all local variables """ - self.config = config - self.remote_directory = config['remote_directory'] - if self.remote_directory[-1] == '/': + self.remote_directory = REMOTE_DIRECTORY + if self.remote_directory[-1] == "/": self.remote_directory = self.remote_directory[:-1] - self.ssh_executor = SSHExecutor(config) - self.file_creator = FileCreator(config) - self.email_sender = EmailSender(config) - self.service_url = self.config['service_url'] - self.reports_url = self.config['reports_url'] + self.ssh_executor = SSHExecutor() + self.file_creator = FileCreator() + self.email_sender = EmailSender() + self.service_url = SERVICE_URL + self.reports_url = REPORTS_URL def tick(self): """ @@ -68,97 +73,102 @@ def tick(self): * Submit new relmons """ database = Database() - self.logger.info('Controller will tick') + self.logger.info("Controller will tick") tick_start = time.time() # Delete relmons - self.logger.info('Relmons to delete (%s): %s.', - len(self.relmons_to_delete), - ','.join([x['id'] for x in self.relmons_to_delete])) + self.logger.info( + "Relmons to delete (%s): %s.", + len(self.relmons_to_delete), + ",".join([x["id"] for x in self.relmons_to_delete]), + ) for relmon_dict in self.relmons_to_delete: - relmon_id = relmon_dict['id'] + relmon_id = relmon_dict["id"] self.__delete_relmon(relmon_id, database) self.relmons_to_delete.remove(relmon_dict) # Reset relmons - self.logger.info('Relmons to reset (%s): %s.', - len(self.relmons_to_reset), - ', '.join([x['id'] for x in self.relmons_to_reset])) + self.logger.info( + "Relmons to reset (%s): %s.", + len(self.relmons_to_reset), + ", ".join([x["id"] for x in self.relmons_to_reset]), + ) for relmon_dict in self.relmons_to_reset: - relmon_id = relmon_dict['id'] - self.__reset_relmon(relmon_id, database, relmon_dict['user_info']) + relmon_id = relmon_dict["id"] + self.__reset_relmon(relmon_id, database, relmon_dict["user_info"]) self.relmons_to_reset.remove(relmon_dict) # Check relmons - relmons_to_check = database.get_relmons_with_status('submitted') - relmons_to_check.extend(database.get_relmons_with_status('running')) - relmons_to_check.extend(database.get_relmons_with_status('finishing')) + relmons_to_check = database.get_relmons_with_status("submitted") + relmons_to_check.extend(database.get_relmons_with_status("running")) + relmons_to_check.extend(database.get_relmons_with_status("finishing")) # Add relmons with HTCondor status RUN to be checked - for relmon_dict in database.get_relmons_with_condor_status('RUN'): + for relmon_dict in database.get_relmons_with_condor_status("RUN"): for added_relmon in relmons_to_check: - if added_relmon['_id'] == relmon_dict['_id']: + if added_relmon["_id"] == relmon_dict["_id"]: break else: relmons_to_check.append(relmon_dict) - self.logger.info('Relmons to check (%s): %s.', - len(relmons_to_check), - ', '.join(r.get('id') for r in relmons_to_check)) + self.logger.info( + "Relmons to check (%s): %s.", + len(relmons_to_check), + ", ".join(r.get("id") for r in relmons_to_check), + ) for relmon_json in relmons_to_check: relmon = RelMon(relmon_json) self.__check_if_running(relmon, database) relmon = RelMon(database.get_relmon(relmon.get_id())) condor_status = relmon.get_condor_status() - if condor_status in ('DONE', 'REMOVED'): + if condor_status in ("DONE", "REMOVED"): # Refetch after check if running save self.__collect_output(relmon, database) # Submit relmons - relmons_to_submit = database.get_relmons_with_status('new') - self.logger.info('Relmons to submit (%s): %s.', - len(relmons_to_submit), - ', '.join(r.get('id') for r in relmons_to_submit)) + relmons_to_submit = database.get_relmons_with_status("new") + self.logger.info( + "Relmons to submit (%s): %s.", + len(relmons_to_submit), + ", ".join(r.get("id") for r in relmons_to_submit), + ) for relmon_json in relmons_to_submit: relmon = RelMon(relmon_json) - if 'NOSUBMIT' in relmon.get_name(): + if "NOSUBMIT" in relmon.get_name(): continue status = relmon.get_status() - if status == 'new': + if status == "new": # Double check and if it is new, submit it self.__submit_to_condor(relmon, database) self.ssh_executor.close_connections() tick_end = time.time() - self.logger.info('Controller tick finished. Took %.2fs', - tick_end - tick_start) + self.logger.info("Controller tick finished. Took %.2fs", tick_end - tick_start) def add_to_reset_list(self, relmon_id, user_info): """ Add relmon id to list of ids to be reset during next tick """ - self.logger.info('Will add %s to reset list', relmon_id) + self.logger.info("Will add %s to reset list", relmon_id) relmon_id = str(relmon_id) for item in self.relmons_to_reset: - if item['id'] == relmon_id: + if item["id"] == relmon_id: return - self.relmons_to_reset.append({'id': str(relmon_id), - 'user_info': user_info}) - self.logger.info('Added %s to reset list', relmon_id) + self.relmons_to_reset.append({"id": str(relmon_id), "user_info": user_info}) + self.logger.info("Added %s to reset list", relmon_id) def add_to_delete_list(self, relmon_id, user_info): """ Add relmon id to list of ids to be deleted during next tick """ - self.logger.info('Will add %s to delete list', relmon_id) + self.logger.info("Will add %s to delete list", relmon_id) relmon_id = str(relmon_id) for item in self.relmons_to_delete: - if item['id'] == relmon_id: + if item["id"] == relmon_id: return - self.relmons_to_delete.append({'id': str(relmon_id), - 'user_info': user_info}) - self.logger.info('Added %s to delete list', relmon_id) + self.relmons_to_delete.append({"id": str(relmon_id), "user_info": user_info}) + self.logger.info("Added %s to delete list", relmon_id) def create_relmon(self, relmon, database, user_info): """ @@ -167,19 +177,21 @@ def create_relmon(self, relmon, database, user_info): relmon.reset() relmon.set_user_info(user_info) database.create_relmon(relmon) - self.logger.info('Relmon %s was created', relmon) + self.logger.info("Relmon %s was created", relmon) def rename_relmon_reports(self, relmon_id, new_name): """ Rename relmon reports file """ - ssh_executor = SSHExecutor(self.config) - ssh_executor.execute_command([ - 'cd %s' % (self.file_creator.web_location), - 'EXISTING_REPORT=$(ls -1 %s*.sqlite | head -n 1)' % (relmon_id), - 'echo "Existing file name: $EXISTING_REPORT"', - 'mv "$EXISTING_REPORT" "%s___%s.sqlite"' % (relmon_id, new_name), - ]) + ssh_executor = SSHExecutor() + ssh_executor.execute_command( + [ + "cd %s" % (self.file_creator.web_location), + "EXISTING_REPORT=$(ls -1 %s*.sqlite | head -n 1)" % (relmon_id), + 'echo "Existing file name: $EXISTING_REPORT"', + 'mv "$EXISTING_REPORT" "%s___%s.sqlite"' % (relmon_id, new_name), + ] + ) def edit_relmon(self, new_relmon, database, user_info): """ @@ -188,33 +200,47 @@ def edit_relmon(self, new_relmon, database, user_info): relmon_id = new_relmon.get_id() old_relmon_data = database.get_relmon(relmon_id) old_relmon = RelMon(old_relmon_data) - if old_relmon.get_status() == 'done': - self.logger.info('Relmon %s is done, will try to do a smart edit', old_relmon) - new_category_names = [x['name'] for x in new_relmon.get_json().get('categories')] - old_category_names = [x['name'] for x in old_relmon.get_json().get('categories')] - self.logger.info('Relmon %s had these categories: %s', old_relmon, old_category_names) - self.logger.info('Relmon %s have these categories: %s', new_relmon, new_category_names) + if old_relmon.get_status() == "done": + self.logger.info( + "Relmon %s is done, will try to do a smart edit", old_relmon + ) + new_category_names = [ + x["name"] for x in new_relmon.get_json().get("categories") + ] + old_category_names = [ + x["name"] for x in old_relmon.get_json().get("categories") + ] + self.logger.info( + "Relmon %s had these categories: %s", old_relmon, old_category_names + ) + self.logger.info( + "Relmon %s have these categories: %s", new_relmon, new_category_names + ) categories_changed = False for category_name in set(new_category_names + old_category_names): old_category = old_relmon.get_bare_category(category_name) new_category = new_relmon.get_bare_category(category_name) old_category_string = json.dumps(old_category) new_category_string = json.dumps(new_category) - force_rerun = new_relmon.get_category(category_name).get('rerun', False) + force_rerun = new_relmon.get_category(category_name).get("rerun", False) if force_rerun or old_category_string != new_category_string: - self.logger.info('Category %s of %s changed', category_name, old_relmon) + self.logger.info( + "Category %s of %s changed", category_name, old_relmon + ) categories_changed = True old_relmon.get_category(category_name).update(new_category) old_relmon.reset_category(category_name) - name_changed = old_relmon_data['name'] != new_relmon.get_name() + name_changed = old_relmon_data["name"] != new_relmon.get_name() if name_changed or categories_changed: new_name = new_relmon.get_name() if not categories_changed: # Only name changed, categories did not change, just a rename - self.logger.info('Renaming %s to %s without changing categories', - old_relmon, - new_name) + self.logger.info( + "Renaming %s to %s without changing categories", + old_relmon, + new_name, + ) self.rename_relmon_reports(relmon_id, new_name) else: # Categories changed, will have to resubmit @@ -225,42 +251,48 @@ def edit_relmon(self, new_relmon, database, user_info): old_relmon.set_user_info(user_info) database.update_relmon(old_relmon) else: - self.logger.info('Nothing changed for %s?', old_relmon) + self.logger.info("Nothing changed for %s?", old_relmon) else: - self.logger.info('Relmon %s will be reset', old_relmon) - old_relmon.get_json()['name'] = new_relmon.get_name() - old_relmon.get_json()['categories'] = new_relmon.get_json().get('categories', []) + self.logger.info("Relmon %s will be reset", old_relmon) + old_relmon.get_json()["name"] = new_relmon.get_name() + old_relmon.get_json()["categories"] = new_relmon.get_json().get( + "categories", [] + ) # Update only name and categories, do not allow to update anything else old_relmon.reset() database.update_relmon(old_relmon) self.add_to_reset_list(relmon_id, user_info) - self.logger.info('Relmon %s was edited', old_relmon) + self.logger.info("Relmon %s was edited", old_relmon) def __submit_to_condor(self, relmon, database): """ Take relmon object and submit it to HTCondor """ relmon_id = relmon.get_id() - local_relmon_directory = 'relmons/%s' % (relmon_id) + local_relmon_directory = "relmons/%s" % (relmon_id) if not os.path.isdir(local_relmon_directory): os.mkdir(local_relmon_directory) - remote_relmon_directory = '%s/%s' % (self.remote_directory, relmon_id) - self.logger.info('Will submit %s to HTCondor', relmon) - self.logger.info('Remote directory of %s is %s', relmon, remote_relmon_directory) - self.logger.info('Saving %s to database', relmon) + remote_relmon_directory = "%s/%s" % (self.remote_directory, relmon_id) + self.logger.info("Will submit %s to HTCondor", relmon) + self.logger.info( + "Remote directory of %s is %s", relmon, remote_relmon_directory + ) + self.logger.info("Saving %s to database", relmon) database.update_relmon(relmon) # Refetch after update relmon = RelMon(database.get_relmon(relmon_id)) - self.logger.info('Resources for %s: CPU: %s, memory: %s, disk %s', - relmon, - relmon.get_cpu(), - relmon.get_memory(), - relmon.get_disk()) + self.logger.info( + "Resources for %s: CPU: %s, memory: %s, disk %s", + relmon, + relmon.get_cpu(), + relmon.get_memory(), + relmon.get_disk(), + ) try: - self.logger.info('Will create files for %s', relmon) + self.logger.info("Will create files for %s", relmon) # Dump the json to a file self.file_creator.create_relmon_file(relmon) # Create HTCondor submit file @@ -268,53 +300,65 @@ def __submit_to_condor(self, relmon, database): # Create actual job script file self.file_creator.create_job_script_file(relmon) - self.logger.info('Will prepare remote directory for %s', relmon) + self.logger.info("Will prepare remote directory for %s", relmon) # Prepare remote directory. Delete old one and create a new one - self.ssh_executor.execute_command([ - 'rm -rf %s' % (remote_relmon_directory), - 'mkdir -p %s' % (remote_relmon_directory) - ]) + self.ssh_executor.execute_command( + [ + "rm -rf %s" % (remote_relmon_directory), + "mkdir -p %s" % (remote_relmon_directory), + ] + ) - self.logger.info('Will upload files for %s', relmon) + self.logger.info("Will upload files for %s", relmon) # Upload relmon json, submit file and script to run - local_name = '%s/RELMON_%s' % (local_relmon_directory, relmon_id) - remote_name = '%s/RELMON_%s' % (remote_relmon_directory, relmon_id) - self.ssh_executor.upload_file('%s.json' % (local_name), - '%s.json' % (remote_name)) - self.ssh_executor.upload_file('%s.sub' % (local_name), - '%s.sub' % (remote_name)) - self.ssh_executor.upload_file('%s.sh' % (local_name), - '%s.sh' % (remote_name)) - - self.logger.info('Will try to submit %s', relmon) + local_name = "%s/RELMON_%s" % (local_relmon_directory, relmon_id) + remote_name = "%s/RELMON_%s" % (remote_relmon_directory, relmon_id) + self.ssh_executor.upload_file( + "%s.json" % (local_name), "%s.json" % (remote_name) + ) + self.ssh_executor.upload_file( + "%s.sub" % (local_name), "%s.sub" % (remote_name) + ) + self.ssh_executor.upload_file( + "%s.sh" % (local_name), "%s.sh" % (remote_name) + ) + + self.logger.info("Will try to submit %s", relmon) # Run condor_submit # Submission happens through lxplus as condor is not available on website machine # It is easier to ssh to lxplus than set up condor locally - stdout, stderr = self.ssh_executor.execute_command([ - 'cd %s' % (remote_relmon_directory), - 'voms-proxy-init -voms cms --valid 24:00 --out $(pwd)/proxy.txt', - 'module load lxbatch/tzero && condor_submit RELMON_%s.sub' % (relmon_id) - ]) + stdout, stderr = self.ssh_executor.execute_command( + [ + "cd %s" % (remote_relmon_directory), + "voms-proxy-init -voms cms --valid 24:00 --out $(pwd)/proxy.txt", + "module load lxbatch/tzero && condor_submit RELMON_%s.sub" + % (relmon_id), + ] + ) # Parse result of condor_submit - if stdout and '1 job(s) submitted to cluster' in stdout: + if stdout and "1 job(s) submitted to cluster" in stdout: # output is "1 job(s) submitted to cluster 801341" - relmon.set_status('submitted') + relmon.set_status("submitted") condor_id = int(float(stdout.split()[-1])) relmon.set_condor_id(condor_id) - relmon.set_condor_status('IDLE') - self.logger.info('Submitted %s. Condor job id %s', relmon, condor_id) + relmon.set_condor_status("IDLE") + self.logger.info("Submitted %s. Condor job id %s", relmon, condor_id) else: - self.logger.error('Error submitting %s.\nOutput: %s.\nError %s', - relmon, - stdout, - stderr) - relmon.set_status('failed') + self.logger.error( + "Error submitting %s.\nOutput: %s.\nError %s", + relmon, + stdout, + stderr, + ) + relmon.set_status("failed") except Exception as ex: - relmon.set_status('failed') - self.logger.error('Exception while trying to submit %s: %s', relmon, str(ex)) + relmon.set_status("failed") + self.logger.error( + "Exception while trying to submit %s: %s", relmon, str(ex) + ) - self.logger.info('%s status is %s', relmon, relmon.get_status()) + self.logger.info("%s status is %s", relmon, relmon.get_status()) database.update_relmon(relmon) def __check_if_running(self, relmon, database): @@ -322,34 +366,34 @@ def __check_if_running(self, relmon, database): Check if given RelMon is running in HTCondor and get it's status there """ relmon_condor_id = relmon.get_condor_id() - self.logger.info('Will check if %s is running in HTCondor, id: %s', - relmon, - relmon_condor_id) + self.logger.info( + "Will check if %s is running in HTCondor, id: %s", relmon, relmon_condor_id + ) stdout, stderr = self.ssh_executor.execute_command( - 'module load lxbatch/tzero && condor_q -af:h ClusterId JobStatus | ' - 'grep %s' % (relmon_condor_id) + "module load lxbatch/tzero && condor_q -af:h ClusterId JobStatus | " + "grep %s" % (relmon_condor_id) ) - new_condor_status = '' + new_condor_status = "" if stdout and not stderr: status_number = stdout.split()[-1] - self.logger.info('Relmon %s status is %s', relmon, status_number) + self.logger.info("Relmon %s status is %s", relmon, status_number) status_dict = { - '0': 'UNEXPLAINED', - '1': 'IDLE', - '2': 'RUN', - '3': 'REMOVED', - '4': 'DONE', - '5': 'HOLD', - '6': 'SUBMISSION ERROR' + "0": "UNEXPLAINED", + "1": "IDLE", + "2": "RUN", + "3": "REMOVED", + "4": "DONE", + "5": "HOLD", + "6": "SUBMISSION ERROR", } - new_condor_status = status_dict.get(status_number, 'REMOVED') + new_condor_status = status_dict.get(status_number, "REMOVED") else: - self.logger.error('Error with HTCondor?\nOutput: %s.\nError %s', - stdout, - stderr) + self.logger.error( + "Error with HTCondor?\nOutput: %s.\nError %s", stdout, stderr + ) relmon = RelMon(database.get_relmon(relmon.get_id())) - self.logger.info('Saving %s condor status as %s', relmon, new_condor_status) + self.logger.info("Saving %s condor status as %s", relmon, new_condor_status) relmon.set_condor_status(new_condor_status) database.update_relmon(relmon) @@ -359,66 +403,65 @@ def __collect_output(self, relmon, database): and send to relevant user via email """ condor_status = relmon.get_condor_status() - if condor_status not in ['DONE', 'REMOVED']: - self.logger.info('%s status is not DONE or REMOVED, it is %s', relmon, condor_status) + if condor_status not in ["DONE", "REMOVED"]: + self.logger.info( + "%s status is not DONE or REMOVED, it is %s", relmon, condor_status + ) return - logging.info('Collecting output for %s', relmon) + logging.info("Collecting output for %s", relmon) relmon_id = relmon.get_id() - remote_relmon_directory = '%s/%s' % (self.remote_directory, relmon_id) - local_relmon_directory = 'relmons/%s' % (relmon_id) + remote_relmon_directory = "%s/%s" % (self.remote_directory, relmon_id) + local_relmon_directory = "relmons/%s" % (relmon_id) self.ssh_executor.download_file( - '%s/validation_matrix.log' % (remote_relmon_directory), - '%s/validation_matrix.log' % (local_relmon_directory) + "%s/validation_matrix.log" % (remote_relmon_directory), + "%s/validation_matrix.log" % (local_relmon_directory), ) - remote_name = '%s/RELMON_%s' % (remote_relmon_directory, relmon_id) - local_name = '%s/%s' % (local_relmon_directory, relmon_id) + remote_name = "%s/RELMON_%s" % (remote_relmon_directory, relmon_id) + local_name = "%s/%s" % (local_relmon_directory, relmon_id) self.ssh_executor.download_file( - '%s.out' % (remote_name), - '%s.out' % (local_name) + "%s.out" % (remote_name), "%s.out" % (local_name) ) self.ssh_executor.download_file( - '%s.log' % (remote_name), - '%s.log' % (local_name) + "%s.log" % (remote_name), "%s.log" % (local_name) ) self.ssh_executor.download_file( - '%s.err' % (remote_name), - '%s.err' % (local_name) + "%s.err" % (remote_name), "%s.err" % (local_name) ) downloaded_files = [] - if os.path.isfile('%s.out' % (local_name)): - downloaded_files.append('%s.out' % (local_name)) + if os.path.isfile("%s.out" % (local_name)): + downloaded_files.append("%s.out" % (local_name)) - if os.path.isfile('%s.log' % (local_name)): - downloaded_files.append('%s.log' % (local_name)) + if os.path.isfile("%s.log" % (local_name)): + downloaded_files.append("%s.log" % (local_name)) - if os.path.isfile('%s.err' % (local_name)): - downloaded_files.append('%s.err' % (local_name)) + if os.path.isfile("%s.err" % (local_name)): + downloaded_files.append("%s.err" % (local_name)) - if os.path.isfile('%s/validation_matrix.log' % (local_relmon_directory)): - downloaded_files.append('%s/validation_matrix.log' % (local_relmon_directory)) + if os.path.isfile("%s/validation_matrix.log" % (local_relmon_directory)): + downloaded_files.append( + "%s/validation_matrix.log" % (local_relmon_directory) + ) attachments = [] if downloaded_files: - archive_name = '%s.zip' % (local_name) + archive_name = "%s.zip" % (local_name) attachments = [archive_name] - with zipfile.ZipFile(archive_name, 'w', zipfile.ZIP_DEFLATED) as zip_object: + with zipfile.ZipFile(archive_name, "w", zipfile.ZIP_DEFLATED) as zip_object: for file_path in downloaded_files: - zip_object.write(file_path, file_path.split('/')[-1]) + zip_object.write(file_path, file_path.split("/")[-1]) - if relmon.get_status() != 'failed': - relmon.set_status('done') + if relmon.get_status() != "failed": + relmon.set_status("done") self.__send_done_notification(relmon, files=attachments) else: self.__send_failed_notification(relmon, files=attachments) database.update_relmon(relmon) shutil.rmtree(local_relmon_directory, ignore_errors=True) - self.ssh_executor.execute_command([ - 'rm -rf %s' % (remote_relmon_directory) - ]) + self.ssh_executor.execute_command(["rm -rf %s" % (remote_relmon_directory)]) def __reset_relmon(self, relmon_id, database, user_info): """ @@ -429,12 +472,14 @@ def __reset_relmon(self, relmon_id, database, user_info): relmon = RelMon(relmon_json) relmon_status = relmon.get_status() self.__terminate_relmon(relmon) - old_username = relmon.get_user_info().get('login') - new_username = user_info.get('login') - if old_username != new_username and relmon_status != 'done': - self.logger.info('Reset by %s while not done, should inform %s', - new_username, - old_username) + old_username = relmon.get_user_info().get("login") + new_username = user_info.get("login") + if old_username != new_username and relmon_status != "done": + self.logger.info( + "Reset by %s while not done, should inform %s", + new_username, + old_username, + ) self.__send_reset_notification(relmon, user_info) relmon.reset() @@ -449,7 +494,7 @@ def __delete_relmon(self, relmon_id, database): relmon = RelMon(relmon_json) self.__terminate_relmon(relmon) database.delete_relmon(relmon) - local_relmon_directory = 'relmons/%s' % (relmon_id) + local_relmon_directory = "relmons/%s" % (relmon_id) if os.path.isdir(local_relmon_directory): shutil.rmtree(local_relmon_directory, ignore_errors=True) @@ -457,29 +502,31 @@ def __terminate_relmon(self, relmon): """ Terminate RelMon job in HTCondor """ - self.logger.info('Trying to terminate %s', relmon) + self.logger.info("Trying to terminate %s", relmon) condor_id = relmon.get_condor_id() if condor_id > 0: self.ssh_executor.execute_command( - 'module load lxbatch/tzero && condor_rm %s' % (condor_id) + "module load lxbatch/tzero && condor_rm %s" % (condor_id) ) else: - self.logger.info('Relmon %s HTCondor id is not valid: %s', relmon, condor_id) + self.logger.info( + "Relmon %s HTCondor id is not valid: %s", relmon, condor_id + ) - self.logger.info('Finished terminating relmon %s', relmon) + self.logger.info("Finished terminating relmon %s", relmon) def __send_reset_notification(self, relmon, new_user_info): """ Send notification email that RelMon was reset """ relmon_name = relmon.get_name() - new_user_fullname = new_user_info.get('fullname', '') - body = 'Hello,\n\n' - body += 'RelMon %s was reset by %s.\n' % (relmon_name, new_user_fullname) - body += 'You will not receive notification when this RelMon finishes running.\n' - body += 'RelMon in RelMon Service: %s?q=%s\n' % (self.service_url, relmon_name) - subject = 'RelMon %s was reset' % (relmon_name) - recipients = [relmon.get_user_info()['email']] + new_user_fullname = new_user_info.get("fullname", "") + body = "Hello,\n\n" + body += "RelMon %s was reset by %s.\n" % (relmon_name, new_user_fullname) + body += "You will not receive notification when this RelMon finishes running.\n" + body += "RelMon in RelMon Service: %s?q=%s\n" % (self.service_url, relmon_name) + subject = "RelMon %s was reset" % (relmon_name) + recipients = [relmon.get_user_info()["email"]] self.email_sender.send(subject, body, recipients) def __send_done_notification(self, relmon, files=None): @@ -487,15 +534,15 @@ def __send_done_notification(self, relmon, files=None): Send email notification that RelMon has successfully finished """ relmon_name = relmon.get_name() - body = 'Hello,\n\n' - body += 'RelMon %s has finished running.\n' % (relmon_name) - body += 'Reports can be found here: %s?q=%s\n' % (self.reports_url, relmon_name) - body += 'RelMon in RelMon Service: %s?q=%s\n' % (self.service_url, relmon_name) + body = "Hello,\n\n" + body += "RelMon %s has finished running.\n" % (relmon_name) + body += "Reports can be found here: %s?q=%s\n" % (self.reports_url, relmon_name) + body += "RelMon in RelMon Service: %s?q=%s\n" % (self.service_url, relmon_name) if files: - body += 'You can find job output as an attachment.\n' + body += "You can find job output as an attachment.\n" - subject = 'RelMon %s is done' % (relmon_name) - recipients = [relmon.get_user_info()['email']] + subject = "RelMon %s is done" % (relmon_name) + recipients = [relmon.get_user_info()["email"]] self.email_sender.send(subject, body, recipients, files) def __send_failed_notification(self, relmon, files=None): @@ -503,12 +550,12 @@ def __send_failed_notification(self, relmon, files=None): Send email notification that RelMon has failed """ relmon_name = relmon.get_name() - body = 'Hello,\n\n' - body += 'RelMon %s has failed.\n' % (relmon_name) - body += 'RelMon in RelMon Service: %s?q=%s\n' % (self.service_url, relmon_name) + body = "Hello,\n\n" + body += "RelMon %s has failed.\n" % (relmon_name) + body += "RelMon in RelMon Service: %s?q=%s\n" % (self.service_url, relmon_name) if files: - body += 'You can find job output as an attachment.\n' + body += "You can find job output as an attachment.\n" - subject = 'RelMon %s failed' % (relmon_name) - recipients = [relmon.get_user_info()['email']] + subject = "RelMon %s failed" % (relmon_name) + recipients = [relmon.get_user_info()["email"]] self.email_sender.send(subject, body, recipients, files) diff --git a/local/email_sender.py b/local/email_sender.py index 52e241f..0449a38 100644 --- a/local/email_sender.py +++ b/local/email_sender.py @@ -3,42 +3,39 @@ """ import smtplib import logging -import json from email import encoders from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText +from environment import ( + SERVICE_ACCOUNT_USERNAME, + SERVICE_ACCOUNT_PASSWORD, + EMAIL_AUTH_REQUIRED, +) -class EmailSender(): +class EmailSender: """ Email Sender allows to send emails to users using CERN SMTP server """ - def __init__(self, config): - self.logger = logging.getLogger('logger') - self.credentials = config['ssh_credentials'] + def __init__(self): + self.logger = logging.getLogger("logger") self.smtp = None def __setup_smtp(self): """ - Read credentials and connect to SMTP file + Setup SMTP client session """ - if ':' not in self.credentials: - with open(self.credentials) as json_file: - credentials = json.load(json_file) - else: - credentials = {} - credentials['username'] = self.credentials.split(':')[0] - credentials['password'] = self.credentials.split(':')[1] - - self.logger.info('Credentials loaded successfully: %s', credentials['username']) - self.smtp = smtplib.SMTP(host='cernmx.cern.ch', port=25) - # self.smtp.connect() + self.logger.info( + "Credentials loaded successfully: %s", SERVICE_ACCOUNT_USERNAME + ) + self.smtp = smtplib.SMTP(host="cernmx.cern.ch", port=25) self.smtp.ehlo() self.smtp.starttls() self.smtp.ehlo() - # self.smtp.login(credentials['username'], credentials['password']) + if EMAIL_AUTH_REQUIRED: + self.smtp.login(SERVICE_ACCOUNT_USERNAME, SERVICE_ACCOUNT_PASSWORD) def __close_smtp(self): """ @@ -52,32 +49,33 @@ def send(self, subject, body, recipients, files=None): Send email """ body = body.strip() - body += '\n\nSincerely,\nRelMon Service' - ccs = ['PdmV Service Account '] + body += "\n\nSincerely,\nRelMon Service" + ccs = ["PdmV Service Account "] # Create a fancy email message message = MIMEMultipart() - message['Subject'] = '[RelMon] %s' % (subject) - message['From'] = 'PdmV Service Account ' - message['To'] = ', '.join(recipients) - message['Cc'] = ', '.join(ccs) + message["Subject"] = "[RelMon] %s" % (subject) + message["From"] = "PdmV Service Account " + message["To"] = ", ".join(recipients) + message["Cc"] = ", ".join(ccs) # Set body text message.attach(MIMEText(body)) if files: for path in files: - attachment = MIMEBase('application', 'octet-stream') - with open(path, 'rb') as attachment_file: + attachment = MIMEBase("application", "octet-stream") + with open(path, "rb") as attachment_file: attachment.set_payload(attachment_file.read()) - file_name = path.split('/')[-1] + file_name = path.split("/")[-1] encoders.encode_base64(attachment) - attachment.add_header('Content-Disposition', - 'attachment; filename="%s"' % (file_name)) + attachment.add_header( + "Content-Disposition", 'attachment; filename="%s"' % (file_name) + ) message.attach(attachment) - self.logger.info('Will send "%s" to %s', message['Subject'], message['To']) + self.logger.info('Will send "%s" to %s', message["Subject"], message["To"]) self.__setup_smtp() try: - self.smtp.sendmail(message['From'], recipients + ccs, message.as_string()) + self.smtp.sendmail(message["From"], recipients + ccs, message.as_string()) except Exception as ex: self.logger.error(ex) finally: diff --git a/local/file_creator.py b/local/file_creator.py index 4fff9db..7de4a8a 100644 --- a/local/file_creator.py +++ b/local/file_creator.py @@ -2,21 +2,30 @@ Module for FileCreator """ import json +from environment import ( + REMOTE_DIRECTORY, + WEB_LOCATION_PATH, + SERVICE_URL, + CALLBACK_URL, + CALLBACK_CLIENT_ID, + CALLBACK_CLIENT_SECRET, + CLIENT_ID, +) -class FileCreator(): +class FileCreator: """ File creator creates bash executable for condor and condor submission job file """ - def __init__(self, config): - self.remote_location = config['remote_directory'] - self.web_location = config['web_location'] - if self.web_location[-1] == '/': + def __init__(self): + self.remote_location = REMOTE_DIRECTORY + self.web_location = WEB_LOCATION_PATH + if self.web_location[-1] == "/": self.web_location = self.web_location[:-1] - self.cookie_url = config['service_url'] - self.callback_url = config['callback_url'] + self.cookie_url = SERVICE_URL + self.callback_url = CALLBACK_URL def create_job_script_file(self, relmon): """ @@ -25,90 +34,90 @@ def create_job_script_file(self, relmon): relmon_id = relmon.get_id() cpus = relmon.get_cpu() relmon_name = relmon.get_name() - script_file_name = 'relmons/%s/RELMON_%s.sh' % (relmon_id, relmon_id) - old_web_sqlite_path = '%s/%s*.sqlite' % (self.web_location, relmon_id) - web_sqlite_path = '"%s/%s___%s.sqlite"' % (self.web_location, relmon_id, relmon_name) + script_file_name = "relmons/%s/RELMON_%s.sh" % (relmon_id, relmon_id) + old_web_sqlite_path = "%s/%s*.sqlite" % (self.web_location, relmon_id) + web_sqlite_path = '"%s/%s___%s.sqlite"' % ( + self.web_location, + relmon_id, + relmon_name, + ) script_file_content = [ - '#!/bin/bash', - 'DIR=$(pwd)', - 'export HOME=$(pwd)', + "#!/bin/bash", + "DIR=$(pwd)", + "export HOME=$(pwd)", # Clone the relmon service - 'git clone https://github.com/cms-PdmV/relmonservice2.git', + "git clone https://github.com/cms-PdmV/relmonservice2.git", # Fallback for github hiccups - 'if [ ! -d relmonservice2 ]; then', - ' wget https://github.com/cms-PdmV/RelmonService2/archive/master.zip', - ' unzip master.zip', - ' mv RelmonService2-master relmonservice2', - 'fi', - # Make a cookie for callbacks about progress - 'cern-get-sso-cookie -u %s -o cookie.txt' % (self.cookie_url), - 'cp cookie.txt relmonservice2/remote', + "if [ ! -d relmonservice2 ]; then", + " wget https://github.com/cms-PdmV/RelmonService2/archive/master.zip", + " unzip master.zip", + " mv RelmonService2-master relmonservice2", + "fi", # CMSSW environment setup - 'source /cvmfs/cms.cern.ch/cmsset_default.sh', - 'scramv1 project CMSSW CMSSW_11_0_4', - 'cd CMSSW_11_0_4/src', + "source /cvmfs/cms.cern.ch/cmsset_default.sh", + "scramv1 project CMSSW CMSSW_11_0_4", + "cd CMSSW_11_0_4/src", # Open scope for CMSSW - '(', - 'eval `scramv1 runtime -sh`', - 'cd ../..', + "(", + "eval `scramv1 runtime -sh`", + "cd ../..", # Create reports directory - 'mkdir -p Reports', + "mkdir -p Reports", # Run the remote apparatus - 'python relmonservice2/remote/remote_apparatus.py ' # No newline - '-r RELMON_%s.json -p proxy.txt --cpus %s --callback %s' % (relmon_id, - cpus, - self.callback_url), + "python relmonservice2/remote/remote_apparatus.py " # No newline + "-r RELMON_%s.json -p proxy.txt --cpus %s --callback %s" + % (relmon_id, cpus, self.callback_url), # Close scope for CMSSW - ')', - 'cd $DIR', + ")", + "cd $DIR", # Remove all root files - 'rm *.root', + "rm *.root", # Copy sqlitify to Reports directory - 'cp relmonservice2/remote/sqltify.py Reports/sqltify.py', + "cp relmonservice2/remote/sqltify.py Reports/sqltify.py", # Go to reports directory - 'cd Reports', + "cd Reports", # Try to copy existing reports file - 'EXISTING_REPORT=$(ls -1 %s | head -n 1)' % (old_web_sqlite_path), + "EXISTING_REPORT=$(ls -1 %s | head -n 1)" % (old_web_sqlite_path), 'echo "Existing file name: $EXISTING_REPORT"', 'if [ ! -z "$EXISTING_REPORT" ]; then', ' echo "File exists"', ' time rsync -v "$EXISTING_REPORT" reports.sqlite', - 'fi', + "fi", # Run sqltify - 'python3 sqltify.py', + "python3 sqltify.py", # Checksum for created sqlite 'echo "HTCondor workspace"', 'echo "MD5 Sum"', - 'md5sum reports.sqlite', + "md5sum reports.sqlite", # List sizes - 'ls -l reports.sqlite', + "ls -l reports.sqlite", # Do integrity check 'echo "Integrity check:"', 'echo "PRAGMA integrity_check;" | sqlite3 reports.sqlite', # Remove old sql file from web path 'if [ ! -z "$EXISTING_REPORT" ]; then', ' rm -f "$EXISTING_REPORT"', - 'fi', + "fi", # Copy reports sqlite to web path - 'time rsync -v reports.sqlite %s' % (web_sqlite_path), + "time rsync -v reports.sqlite %s" % (web_sqlite_path), # Checksum for created sqlite 'echo "EOS space"', 'echo "MD5 Sum"', - 'md5sum %s' % (web_sqlite_path), + "md5sum %s" % (web_sqlite_path), # List sizes - 'ls -l %s' % (web_sqlite_path), + "ls -l %s" % (web_sqlite_path), # Do integrity check 'echo "Integrity check:"', 'echo "PRAGMA integrity_check;" | sqlite3 %s' % (web_sqlite_path), - 'cd $DIR', - 'cern-get-sso-cookie -u %s -o cookie.txt' % (self.cookie_url), - 'cp cookie.txt relmonservice2/remote', - 'python3 relmonservice2/remote/remote_apparatus.py ' # No newlines here - '-r RELMON_%s.json --callback %s --notifydone' % (relmon_id, self.callback_url) + "cd $DIR", + "cp cookie.txt relmonservice2/remote", + "python3 relmonservice2/remote/remote_apparatus.py " # No newlines here + "-r RELMON_%s.json --callback %s --notifydone" + % (relmon_id, self.callback_url), ] - script_file_content_string = '\n'.join(script_file_content) - with open(script_file_name, 'w') as output_file: + script_file_content_string = "\n".join(script_file_content) + with open(script_file_name, "w", encoding="utf-8") as output_file: output_file.write(script_file_content_string) @classmethod @@ -118,8 +127,8 @@ def create_relmon_file(cls, relmon): """ relmon_id = relmon.get_id() relmon_data = relmon.get_json() - relmon_file_name = 'relmons/%s/RELMON_%s.json' % (relmon_id, relmon_id) - with open(relmon_file_name, 'w') as output_file: + relmon_file_name = "relmons/%s/RELMON_%s.json" % (relmon_id, relmon_id) + with open(relmon_file_name, "w", encoding="utf-8") as output_file: json.dump(relmon_data, output_file, indent=2, sort_keys=True) @classmethod @@ -131,27 +140,34 @@ def create_condor_job_file(cls, relmon): cpus = relmon.get_cpu() memory = relmon.get_memory() disk = relmon.get_disk() - condor_file_name = 'relmons/%s/RELMON_%s.sub' % (relmon_id, relmon_id) + condor_file_name = "relmons/%s/RELMON_%s.sub" % (relmon_id, relmon_id) + credentials_env = ( + f"CALLBACK_CLIENT_ID={CALLBACK_CLIENT_ID} " + f"CALLBACK_CLIENT_SECRET={CALLBACK_CLIENT_SECRET} " + f"APPLICATION_CLIENT_ID={CLIENT_ID}" + ) + credentials_env_arg = f'"{credentials_env}"' condor_file_content = [ - 'executable = RELMON_%s.sh' % (relmon_id), - 'output = RELMON_%s.out' % (relmon_id), - 'error = RELMON_%s.err' % (relmon_id), - 'log = RELMON_%s.log' % (relmon_id), - 'transfer_input_files = RELMON_%s.json,proxy.txt' % (relmon_id), - 'when_to_transfer_output = on_exit', - 'request_cpus = %s' % (cpus), - 'request_memory = %s' % (memory), - 'request_disk = %s' % (disk), + "executable = RELMON_%s.sh" % (relmon_id), + "environment = %s" % (credentials_env_arg), + "output = RELMON_%s.out" % (relmon_id), + "error = RELMON_%s.err" % (relmon_id), + "log = RELMON_%s.log" % (relmon_id), + "transfer_input_files = RELMON_%s.json,proxy.txt" % (relmon_id), + "when_to_transfer_output = on_exit", + "request_cpus = %s" % (cpus), + "request_memory = %s" % (memory), + "request_disk = %s" % (disk), '+JobFlavour = "tomorrow"', - '+JobPrio = 100', + "+JobPrio = 100", 'requirements = (OpSysAndVer =?= "CentOS7")', # Leave in queue when status is DONE for two hours - 7200 seconds - 'leave_in_queue = JobStatus == 4 && (CompletionDate =?= UNDEFINED' - ' || ((CurrentTime - CompletionDate) < 7200))', + "leave_in_queue = JobStatus == 4 && (CompletionDate =?= UNDEFINED" + " || ((CurrentTime - CompletionDate) < 7200))", '+AccountingGroup = "group_u_CMS.CAF.PHYS"', - 'queue' + "queue", ] - condor_file_content_string = '\n'.join(condor_file_content) - with open(condor_file_name, 'w') as output_file: + condor_file_content_string = "\n".join(condor_file_content) + with open(condor_file_name, "w", encoding="utf-8") as output_file: output_file.write(condor_file_content_string) diff --git a/local/relmon.py b/local/relmon.py index 3a854c7..597e02d 100644 --- a/local/relmon.py +++ b/local/relmon.py @@ -5,7 +5,7 @@ from copy import deepcopy -class RelMon(): +class RelMon: """ This class represents a single RelMon object and has some convenience methods such as required resources and reset @@ -13,129 +13,153 @@ class RelMon(): def __init__(self, data): data = deepcopy(data) - data['name'] = RelMon.sanitize_name(data['name']) + data["name"] = RelMon.sanitize_name(data["name"]) self.data = data - for category in self.data.get('categories', []): - category['status'] = category.get('status', 'initial') + for category in self.data.get("categories", []): + category["status"] = category.get("status", "initial") new_references = [] - for old_reference in category['reference']: + for old_reference in category["reference"]: if isinstance(old_reference, str): name = RelMon.sanitize_name(old_reference) if not name: continue - new_references.append({'name': name, - 'file_name': '', - 'file_url': '', - 'file_size': 0, - 'status': 'initial', - 'events': 0, - 'match': ''}) + new_references.append( + { + "name": name, + "file_name": "", + "file_url": "", + "file_size": 0, + "status": "initial", + "events": 0, + "match": "", + } + ) else: - name = RelMon.sanitize_name(old_reference['name']) + name = RelMon.sanitize_name(old_reference["name"]) if not name: continue - new_references.append({'name': name, - 'file_name': old_reference.get('file_name', ''), - 'file_url': old_reference.get('file_url', ''), - 'file_size': old_reference.get('file_size', 0), - 'status': old_reference.get('status', 'initial'), - 'events': old_reference.get('events', 0), - 'match': old_reference.get('match', '')}) + new_references.append( + { + "name": name, + "file_name": old_reference.get("file_name", ""), + "file_url": old_reference.get("file_url", ""), + "file_size": old_reference.get("file_size", 0), + "status": old_reference.get("status", "initial"), + "events": old_reference.get("events", 0), + "match": old_reference.get("match", ""), + } + ) new_targets = [] - for old_target in category['target']: + for old_target in category["target"]: if isinstance(old_target, str): name = RelMon.sanitize_name(old_target) if not name: continue - new_targets.append({'name': name, - 'file_name': '', - 'file_url': '', - 'file_size': 0, - 'status': 'initial', - 'events': 0, - 'match': ''}) + new_targets.append( + { + "name": name, + "file_name": "", + "file_url": "", + "file_size": 0, + "status": "initial", + "events": 0, + "match": "", + } + ) else: - name = RelMon.sanitize_name(old_target['name']) + name = RelMon.sanitize_name(old_target["name"]) if not name: continue - new_targets.append({'name': name, - 'file_name': old_target.get('file_name', ''), - 'file_url': old_target.get('file_url', ''), - 'file_size': old_target.get('file_size', 0), - 'status': old_target.get('status', 'initial'), - 'events': old_target.get('events', 0), - 'match': old_target.get('match', '')}) - - category['reference'] = new_references - category['target'] = new_targets + new_targets.append( + { + "name": name, + "file_name": old_target.get("file_name", ""), + "file_url": old_target.get("file_url", ""), + "file_size": old_target.get("file_size", 0), + "status": old_target.get("status", "initial"), + "events": old_target.get("events", 0), + "match": old_target.get("match", ""), + } + ) + + category["reference"] = new_references + category["target"] = new_targets @staticmethod def sanitize_name(name): """ Replace all non letters, digits, hyphens and underscores with underscore """ - return re.sub(r'[^A-Za-z0-9/\-_]', '_', name.strip()) + return re.sub(r"[^A-Za-z0-9/\-_]", "_", name.strip()) def reset_category(self, category_name): """ Reset category with given name to initial status """ category = self.get_category(category_name) - category['status'] = 'initial' + category["status"] = "initial" new_references = [] - for old_reference in category['reference']: + for old_reference in category["reference"]: if isinstance(old_reference, str): name = RelMon.sanitize_name(old_reference.strip()) else: - name = RelMon.sanitize_name(old_reference['name'].strip()) + name = RelMon.sanitize_name(old_reference["name"].strip()) if not name: continue - new_references.append({'name': name, - 'file_name': '', - 'file_url': '', - 'file_size': 0, - 'status': 'initial', - 'events': 0, - 'match': ''}) + new_references.append( + { + "name": name, + "file_name": "", + "file_url": "", + "file_size": 0, + "status": "initial", + "events": 0, + "match": "", + } + ) new_targets = [] - for old_target in category['target']: + for old_target in category["target"]: if isinstance(old_target, str): name = RelMon.sanitize_name(old_target.strip()) else: - name = RelMon.sanitize_name(old_target['name'].strip()) + name = RelMon.sanitize_name(old_target["name"].strip()) if not name: continue - new_targets.append({'name': name, - 'file_name': '', - 'file_url': '', - 'file_size': 0, - 'status': 'initial', - 'events': 0, - 'match': ''}) - - category['reference'] = new_references - category['target'] = new_targets + new_targets.append( + { + "name": name, + "file_name": "", + "file_url": "", + "file_size": 0, + "status": "initial", + "events": 0, + "match": "", + } + ) + + category["reference"] = new_references + category["target"] = new_targets def reset(self, reset_categories=True): """ Reset relmon and zero-out references and targets """ - self.set_status('new') - self.set_condor_status('') + self.set_status("new") + self.set_condor_status("") self.set_condor_id(0) if reset_categories: - for category in self.data['categories']: - self.reset_category(category['name']) + for category in self.data["categories"]: + self.reset_category(category["name"]) return self.data @@ -143,31 +167,31 @@ def get_id(self): """ Getter for id """ - return self.data.get('id') + return self.data.get("id") def get_name(self): """ Getter for name """ - return self.data.get('name') + return self.data.get("name") def set_name(self, name): """ Getter for name """ - self.data['name'] = name + self.data["name"] = name def get_cpu(self): """ Return number of CPUs required based on number of references and targets """ number_of_relvals = 0 - for category in self.data['categories']: - if category['status'] != 'initial': + for category in self.data["categories"]: + if category["status"] != "initial": continue - number_of_relvals += len(category['reference']) - number_of_relvals += len(category['target']) + number_of_relvals += len(category["reference"]) + number_of_relvals += len(category["target"]) # Pairs CPU # 0 - 5 - 1 @@ -199,7 +223,7 @@ def get_memory(self): """ Return amount of memory required based on number of CPUs """ - memory = str(self.get_cpu() * 2) + 'G' + memory = str(self.get_cpu() * 2) + "G" return memory def get_disk(self): @@ -207,16 +231,16 @@ def get_disk(self): Return amount of disk space required based on number of references and targets """ number_of_relvals = 0 - for category in self.data['categories']: - if category['status'] != 'initial': + for category in self.data["categories"]: + if category["status"] != "initial": continue - number_of_relvals += len(category['reference']) - number_of_relvals += len(category['target']) + number_of_relvals += len(category["reference"]) + number_of_relvals += len(category["target"]) # At lest 300M number_of_relvals = max(number_of_relvals, 1) - disk = '%sM' % (number_of_relvals * 300) + disk = "%sM" % (number_of_relvals * 300) return disk def get_json(self): @@ -229,76 +253,80 @@ def get_status(self): """ Getter for status """ - return self.data['status'] + return self.data["status"] def get_condor_status(self): """ Getter for condor status """ - return self.data.get('condor_status', '') + return self.data.get("condor_status", "") def get_condor_id(self): """ Getter for condor id """ - return self.data.get('condor_id', 0) + return self.data.get("condor_id", 0) def set_status(self, status): """ Setter for status """ - self.data['status'] = status + self.data["status"] = status def set_condor_status(self, condor_status): """ Setter for condor status """ - self.data['condor_status'] = condor_status + self.data["condor_status"] = condor_status def set_condor_id(self, condor_id): """ Setter for condor id """ - self.data['condor_id'] = condor_id + self.data["condor_id"] = condor_id def get_category(self, category_name): """ Get a category dictionary """ - for category in self.data.get('categories', []): - if category['name'] == category_name: + for category in self.data.get("categories", []): + if category["name"] == category_name: return category - self.data['categories'] = self.data.get('categories', []) - self.data['categories'].append({'name': category_name, 'reference': [], 'target': []}) + self.data["categories"] = self.data.get("categories", []) + self.data["categories"].append( + {"name": category_name, "reference": [], "target": []} + ) return self.get_category(category_name) def set_user_info(self, user_info): """ Set a dictionary with user who edited last info """ - self.data['user_info'] = user_info + self.data["user_info"] = user_info def get_user_info(self): """ Return dictionary with user info who acted on this RelMon last """ - return self.data['user_info'] + return self.data["user_info"] def get_bare_category(self, category_name): """ Return bare minimum category: list of references, list of targets, pairing and hlt settings """ category = self.get_category(category_name) - bare_category = {'reference': [x['name'] for x in category.get('reference', [])], - 'target': [x['name'] for x in category.get('target', [])], - 'automatic_pairing': category.get('automatic_pairing'), - 'hlt': category.get('hlt')} + bare_category = { + "reference": [x["name"] for x in category.get("reference", [])], + "target": [x["name"] for x in category.get("target", [])], + "automatic_pairing": category.get("automatic_pairing"), + "hlt": category.get("hlt"), + } return bare_category def __str__(self): - return '%s (%s)' % (self.get_name(), self.get_id()) + return "%s (%s)" % (self.get_name(), self.get_id()) def __repr__(self): return str(self) diff --git a/local/ssh_executor.py b/local/ssh_executor.py index ecd9964..be77b19 100644 --- a/local/ssh_executor.py +++ b/local/ssh_executor.py @@ -1,55 +1,58 @@ """ Module that handles all SSH operations - both ssh and ftp """ -import json import logging import time import paramiko +from environment import ( + SERVICE_ACCOUNT_PASSWORD, + SERVICE_ACCOUNT_USERNAME, + SUBMISSION_HOST, +) -class SSHExecutor(): +class SSHExecutor: """ SSH executor allows to perform remote commands and upload/download files """ - def __init__(self, config): + def __init__(self): self.ssh_client = None self.ftp_client = None - self.logger = logging.getLogger('logger') - self.remote_host = config['submission_host'] - self.credentials = config['ssh_credentials'] + self.logger = logging.getLogger("logger") + self.remote_host = SUBMISSION_HOST + self.credentials = { + "username": SERVICE_ACCOUNT_USERNAME, + "password": SERVICE_ACCOUNT_PASSWORD, + } def setup_ssh(self): """ Initiate SSH connection and save it as self.ssh_client """ - self.logger.info('Will set up ssh') + self.logger.info("Will set up ssh") if self.ssh_client: self.close_connections() - if ':' not in self.credentials: - with open(self.credentials) as json_file: - credentials = json.load(json_file) - else: - credentials = {} - credentials['username'] = self.credentials.split(':')[0] - credentials['password'] = self.credentials.split(':')[1] - - self.logger.info('Credentials loaded successfully: %s', credentials['username']) + self.logger.info( + "Credentials loaded successfully: %s", self.credentials["username"] + ) self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.ssh_client.connect(self.remote_host, - username=credentials['username'], - password=credentials['password'], - timeout=30) - self.logger.info('Done setting up ssh') + self.ssh_client.connect( + self.remote_host, + username=self.credentials["username"], + password=self.credentials["password"], + timeout=30, + ) + self.logger.info("Done setting up ssh") def setup_ftp(self): """ Initiate SFTP connection and save it as self.ftp_client If needed, SSH connection will be automatically set up """ - self.logger.info('Will set up ftp') + self.logger.info("Will set up ftp") if self.ftp_client: self.close_connections() @@ -57,7 +60,7 @@ def setup_ftp(self): self.setup_ssh() self.ftp_client = self.ssh_client.open_sftp() - self.logger.info('Done setting up ftp') + self.logger.info("Done setting up ftp") def execute_command(self, command): """ @@ -67,11 +70,11 @@ def execute_command(self, command): self.setup_ssh() if isinstance(command, list): - command = '; '.join(command) + command = "; ".join(command) - self.logger.info('Executing %s', command) + self.logger.info("Executing %s", command) (_, stdout, stderr) = self.ssh_client.exec_command(command) - self.logger.info('Executed %s. Reading response', command) + self.logger.info("Executed %s. Reading response", command) # Close channel after minute of waiting for EOF # This timeouts and closes channel if nothing was received stdout_timeout = time.time() + 60 @@ -81,7 +84,7 @@ def execute_command(self, command): stdout.channel.close() break - stdout = stdout.read().decode('utf-8').strip() + stdout = stdout.read().decode("utf-8").strip() # Same thing for stderr stderr_timeout = time.time() + 60 while not stderr.channel.eof_received: @@ -90,13 +93,13 @@ def execute_command(self, command): stderr.channel.close() break - stderr = stderr.read().decode('utf-8').strip() + stderr = stderr.read().decode("utf-8").strip() # Read output from stdout and stderr streams if stdout: - self.logger.info('STDOUT (%s): %s', command, stdout) + self.logger.info("STDOUT (%s): %s", command, stdout) if stderr: - self.logger.error('STDERR (%s): %s', command, stderr) + self.logger.error("STDERR (%s): %s", command, stderr) return stdout, stderr @@ -104,42 +107,46 @@ def upload_file(self, copy_from, copy_to): """ Upload a file """ - self.logger.info('Will upload file %s to %s', copy_from, copy_to) + self.logger.info("Will upload file %s to %s", copy_from, copy_to) if not self.ftp_client: self.setup_ftp() try: self.ftp_client.put(copy_from, copy_to) - self.logger.info('Uploaded file to %s', copy_to) + self.logger.info("Uploaded file to %s", copy_to) except Exception as ex: - self.logger.error('Error uploading file from %s to %s. %s', copy_from, copy_to, ex) + self.logger.error( + "Error uploading file from %s to %s. %s", copy_from, copy_to, ex + ) def download_file(self, copy_from, copy_to): """ Download file from remote host """ - self.logger.info('Will download file %s to %s', copy_from, copy_to) + self.logger.info("Will download file %s to %s", copy_from, copy_to) if not self.ftp_client: self.setup_ftp() try: self.ftp_client.get(copy_from, copy_to) - self.logger.info('Downloaded file to %s', copy_to) + self.logger.info("Downloaded file to %s", copy_to) except Exception as ex: - self.logger.error('Error downloading file from %s to %s. %s', copy_from, copy_to, ex) + self.logger.error( + "Error downloading file from %s to %s. %s", copy_from, copy_to, ex + ) def close_connections(self): """ Close any active connections """ if self.ftp_client: - self.logger.info('Closing ftp client') + self.logger.info("Closing ftp client") self.ftp_client.close() self.ftp_client = None - self.logger.info('Closed ftp client') + self.logger.info("Closed ftp client") if self.ssh_client: - self.logger.info('Closing ssh client') + self.logger.info("Closing ssh client") self.ssh_client.close() self.ssh_client = None - self.logger.info('Closed ssh client') + self.logger.info("Closed ssh client") diff --git a/main.py b/main.py index a592516..8947239 100644 --- a/main.py +++ b/main.py @@ -1,105 +1,169 @@ """ Module that contains start of the program, tick scheduler and web APIs """ -import argparse import logging import json -import configparser import os import time import inspect from datetime import datetime -from flask import Flask, render_template, request, make_response +from flask import ( + Flask, + session, + render_template, + request, + make_response, +) from flask_restful import Api from jinja2.exceptions import TemplateNotFound from apscheduler.schedulers.background import BackgroundScheduler +from core_lib.middlewares.auth import AuthenticationMiddleware, UserInfo from mongodb_database import Database from local.controller import Controller from local.relmon import RelMon - - -app = Flask(__name__, - static_folder="./frontend/dist/static", - template_folder="./frontend/dist") +from environment import ( + TICK_INTERVAL, + HOST, + PORT, + DEBUG, + SECRET_KEY, + ENABLE_AUTH_MIDDLEWARE, +) + + +app = Flask( + __name__, static_folder="./frontend/dist/static", template_folder="./frontend/dist" +) api = Api(app) +if ENABLE_AUTH_MIDDLEWARE: + app.secret_key = SECRET_KEY + auth: AuthenticationMiddleware = AuthenticationMiddleware(app=app) + app.before_request( + lambda: auth.authenticate(request=request, flask_session=session) + ) scheduler = BackgroundScheduler() controller = Controller() -@app.route('/') +def get_groups_from_headers() -> list[str]: + """ + Retrieves the list of e-groups sent via Adfs-Group header + """ + groups = [ + x.strip().lower() for x in request.headers.get("Adfs-Group", "???").split(";") + ] + return groups + + +def get_roles() -> list[str]: + """ + Retrieves the list of authorized roles/groups + """ + user_data: UserInfo | None = session.get("user") + if user_data: + return user_data.roles + return get_groups_from_headers() + + +def user_info_dict(): + """ + Get user name, login, email and authorized flag from request headers + """ + user_data: UserInfo = session.get("user") + if user_data: + return { + "login": user_data.username, + "authorized_user": is_user_authorized(), + "fullname": user_data.fullname, + "email": user_data.email, + } + fullname = request.headers.get("Adfs-Fullname", "") + login = request.headers.get("Adfs-Login", "") + email = request.headers.get("Adfs-Email", "") + return { + "login": login, + "authorized_user": is_user_authorized(), + "fullname": fullname, + "email": email, + } + + +@app.route("/") def index_page(): """ Return index.html """ try: - return render_template('index.html') + return render_template("index.html") except TemplateNotFound: - response = '' - response += 'Webpage is starting, please wait a few minutes...' + response = "" + response += "Webpage is starting, please wait a few minutes..." return response -@app.route('/api/create', methods=['POST']) +@app.route("/api/create", methods=["POST"]) def add_relmon(): """ API to create a RelMon """ if not is_user_authorized(): - return output_text({'message': 'Unauthorized'}, code=403) + return output_text({"message": "Unauthorized"}, code=403) - relmon = json.loads(request.data.decode('utf-8')) - if not relmon.get('name'): - return output_text({'message': 'No name'}, code=400) + relmon = json.loads(request.data.decode("utf-8")) + if not relmon.get("name"): + return output_text({"message": "No name"}, code=400) - relmon['id'] = str(int(time.time())) + relmon["id"] = str(int(time.time())) relmon = RelMon(relmon) database = Database() if database.get_relmons_with_name(relmon.get_name()): - return output_text({'message': 'RelMon with this name already exists'}, code=422) + return output_text( + {"message": "RelMon with this name already exists"}, code=422 + ) if database.get_relmon(relmon.get_id()): - return output_text({'message': 'RelMon with this ID already exists'}, code=422) + return output_text({"message": "RelMon with this ID already exists"}, code=422) controller.create_relmon(relmon, database, user_info_dict()) controller_tick() - return output_text({'message': 'OK'}) + return output_text({"message": "OK"}) -@app.route('/api/reset', methods=['POST']) +@app.route("/api/reset", methods=["POST"]) def reset_relmon(): """ API to reset a RelMon """ if not is_user_authorized(): - return output_text({'message': 'Unauthorized'}, code=403) + return output_text({"message": "Unauthorized"}, code=403) - data = json.loads(request.data.decode('utf-8')) - if 'id' in data: - controller.add_to_reset_list(str(int(data['id'])), user_info_dict()) + data = json.loads(request.data.decode("utf-8")) + if "id" in data: + controller.add_to_reset_list(str(int(data["id"])), user_info_dict()) controller_tick() - return output_text({'message': 'OK'}) + return output_text({"message": "OK"}) - return output_text({'message': 'No ID'}) + return output_text({"message": "No ID"}) -@app.route('/api/delete', methods=['DELETE']) +@app.route("/api/delete", methods=["DELETE"]) def delete_relmon(): """ API to delete a RelMon """ if not is_user_authorized(): - return output_text({'message': 'Unauthorized'}, code=403) + return output_text({"message": "Unauthorized"}, code=403) - data = json.loads(request.data.decode('utf-8')) - if 'id' in data: - controller.add_to_delete_list(str(int(data['id'])), user_info_dict()) + data = json.loads(request.data.decode("utf-8")) + if "id" in data: + controller.add_to_delete_list(str(int(data["id"])), user_info_dict()) controller_tick() - return output_text({'message': 'OK'}) + return output_text({"message": "OK"}) - return output_text({'message': 'No ID'}) + return output_text({"message": "No ID"}) -@app.route('/api/get_relmons') +@app.route("/api/get_relmons") def get_relmons(): """ API to fetch RelMons from database @@ -109,57 +173,70 @@ def get_relmons(): if args is None: args = {} - page = int(args.get('page', 0)) - limit = int(args.get('limit', database.PAGE_SIZE)) - query = args.get('q') + page = int(args.get("page", 0)) + limit = int(args.get("limit", database.PAGE_SIZE)) + query = args.get("q") if query: query = query.strip() - if query.lower() in ('new', 'submitted', 'running', 'finishing', 'done', 'failed'): - query_dict = {'status': query.lower()} - data, total_rows = database.get_relmons(query_dict=query_dict, - page=page, - page_size=limit) + if query.lower() in ( + "new", + "submitted", + "running", + "finishing", + "done", + "failed", + ): + query_dict = {"status": query.lower()} + data, total_rows = database.get_relmons( + query_dict=query_dict, page=page, page_size=limit + ) else: - query_dict = {'_id': query} - data, total_rows = database.get_relmons(query_dict=query_dict, - page=page, - page_size=limit) + query_dict = {"_id": query} + data, total_rows = database.get_relmons( + query_dict=query_dict, page=page, page_size=limit + ) if total_rows == 0: - query = '*%s*' % (query) + query = "*%s*" % (query) # Perform case insensitive search - query_dict = {'name': {'$regex': query.replace('*', '.*'), '$options': '-i'}} - data, total_rows = database.get_relmons(query_dict=query_dict, - page=page, - page_size=limit) + query_dict = { + "name": {"$regex": query.replace("*", ".*"), "$options": "-i"} + } + data, total_rows = database.get_relmons( + query_dict=query_dict, page=page, page_size=limit + ) else: data, total_rows = database.get_relmons(page=page, page_size=limit) for relmon in data: - relmon.pop('user_info', None) - relmon['total_relvals'] = 0 - relmon['downloaded_relvals'] = 0 - relmon['compared_relvals'] = 0 - for category in relmon.get('categories'): - relmon['total_relvals'] += len(category['reference']) + len(category['target']) - for reference_target in ('reference', 'target'): - category['rerun'] = False - category['%s_status' % (reference_target)] = {} - category['%s_size' % (reference_target)] = 0 + relmon.pop("user_info", None) + relmon["total_relvals"] = 0 + relmon["downloaded_relvals"] = 0 + relmon["compared_relvals"] = 0 + for category in relmon.get("categories"): + relmon["total_relvals"] += len(category["reference"]) + len( + category["target"] + ) + for reference_target in ("reference", "target"): + category["rerun"] = False + category["%s_status" % (reference_target)] = {} + category["%s_size" % (reference_target)] = 0 for relval in category[reference_target]: - category['%s_size' % (reference_target)] += relval.get('file_size', 0) - relmon_status = relval['status'] - if relmon_status not in category['%s_status' % (reference_target)]: - category['%s_status' % (reference_target)][relmon_status] = 0 + category["%s_size" % (reference_target)] += relval.get( + "file_size", 0 + ) + relmon_status = relval["status"] + if relmon_status not in category["%s_status" % (reference_target)]: + category["%s_status" % (reference_target)][relmon_status] = 0 - if relmon_status != 'initial': - relmon['downloaded_relvals'] += + 1 + if relmon_status != "initial": + relmon["downloaded_relvals"] += +1 - if category['status'] == 'done': - relmon['compared_relvals'] += 1 + if category["status"] == "done": + relmon["compared_relvals"] += 1 - category['%s_status' % (reference_target)][relmon_status] += 1 + category["%s_status" % (reference_target)][relmon_status] += 1 - return output_text({'data': data, 'total_rows': total_rows, 'page_size': limit}) + return output_text({"data": data, "total_rows": total_rows, "page_size": limit}) def output_text(data, code=200, headers=None): @@ -168,81 +245,95 @@ def output_text(data, code=200, headers=None): """ resp = make_response(json.dumps(data, indent=1, sort_keys=True), code) resp.headers.extend(headers or {}) - resp.headers['Content-Type'] = 'application/json' - resp.headers['Access-Control-Allow-Origin'] = '*' + resp.headers["Content-Type"] = "application/json" + resp.headers["Access-Control-Allow-Origin"] = "*" return resp -@app.route('/api/edit', methods=['POST']) +@app.route("/api/edit", methods=["POST"]) def edit_relmon(): """ API for RelMon editing """ if not is_user_authorized(): - return output_text({'message': 'Unauthorized'}, code=403) + return output_text({"message": "Unauthorized"}, code=403) - relmon = json.loads(request.data.decode('utf-8')) + relmon = json.loads(request.data.decode("utf-8")) relmon = RelMon(relmon) database = Database() existing_relmons_with_same_name = database.get_relmons_with_name(relmon.get_name()) for existing_relmon_with_same_name in existing_relmons_with_same_name: - if existing_relmon_with_same_name['id'] != relmon.get_id(): - return output_text({'message': 'RelMon with this name already exists'}, code=409) + if existing_relmon_with_same_name["id"] != relmon.get_id(): + return output_text( + {"message": "RelMon with this name already exists"}, code=409 + ) relmon_id = relmon.get_id() existing_relmon = database.get_relmon(relmon_id) if not relmon_id or not existing_relmon: - return output_text({'message': 'RelMon does not exist'}, code=404) + return output_text({"message": "RelMon does not exist"}, code=404) controller.edit_relmon(relmon, database, user_info_dict()) controller_tick() - return output_text({'message': 'OK'}) + return output_text({"message": "OK"}) -@app.route('/api/update', methods=['POST']) +@app.route("/api/update", methods=["POST"]) def update_info(): """ API for jobs in HTCondor to notify about progress """ - login = request.headers.get('Adfs-Login', '???') - logger = logging.getLogger('logger') - if login not in ('pdmvserv', 'jrumsevi'): + authorized_roles: set[str] = set(["cms-pdmv-serv"]) + authorized_service_application: str = "service-account-cms-ppd-pdmv-api-access" + user_roles: set[str] = set(get_roles()) + logger = logging.getLogger("logger") + user_data: dict[str, str] = user_info_dict() + login: str = user_data["login"] + if ( + bool(user_roles & authorized_roles) is False + and login != authorized_service_application + ): logger.warning('Not letting through user "%s" to do update', login) - return output_text({'message': 'Unauthorized'}, code=403) + return output_text({"message": "Unauthorized"}, code=403) - data = json.loads(request.data.decode('utf-8')) + data = json.loads(request.data.decode("utf-8")) database = Database() - relmon = database.get_relmon(data['id']) + relmon = database.get_relmon(data["id"]) if not relmon: - return output_text({'message': 'Could not find'}) - - old_status = relmon.get('status') - relmon['categories'] = data['categories'] - relmon['status'] = data['status'] - logger.info('Update for %s (%s). Status is %s', relmon['name'], relmon['id'], relmon['status']) + return output_text({"message": "Could not find"}) + + old_status = relmon.get("status") + relmon["categories"] = data["categories"] + relmon["status"] = data["status"] + logger.info( + "Update for %s (%s). Status is %s", + relmon["name"], + relmon["id"], + relmon["status"], + ) database.update_relmon(RelMon(relmon)) - if relmon['status'] != old_status: + if relmon["status"] != old_status: for job in scheduler.get_jobs(): job.modify(next_run_time=datetime.now()) - return output_text({'message': 'OK'}) + return output_text({"message": "OK"}) -@app.route('/api/tick') +@app.route("/api/tick") def controller_tick(): """ API to trigger a controller tick """ if not is_user_authorized(): - return output_text({'message': 'Unauthorized'}, code=403) + return output_text({"message": "Unauthorized"}, code=403) for job in scheduler.get_jobs(): job.modify(next_run_time=datetime.now()) - return output_text({'message': 'OK'}) + return output_text({"message": "OK"}) -@app.route('/api/user') +@app.route("/api/user") def user_info(): """ API for user info @@ -250,8 +341,8 @@ def user_info(): return output_text(user_info_dict()) -@app.route('/api', defaults={'_path': ''}) -@app.route('/api/') +@app.route("/api", defaults={"_path": ""}) +@app.route("/api/") def api_documentation(_path): """ Endpoint for API documentation HTML @@ -261,38 +352,27 @@ def api_documentation(_path): for rule in app.url_map.iter_rules(): endpoint = rule.rule func = app.view_functions[rule.endpoint] - methods = sorted(list(rule.methods & {'GET', 'PUT', 'POST', 'DELETE'})) - if not methods or 'api' not in endpoint: + methods = sorted(list(rule.methods & {"GET", "PUT", "POST", "DELETE"})) + if not methods or "api" not in endpoint: continue - docs[endpoint] = {'doc': func.__doc__.strip(), - 'methods': methods, - 'file': inspect.getfile(func).replace(base, '').strip('/'), - 'line': inspect.getsourcelines(func)[1]} - - return render_template('api_documentation.html', docs=docs) + docs[endpoint] = { + "doc": func.__doc__.strip(), + "methods": methods, + "file": inspect.getfile(func).replace(base, "").strip("/"), + "line": inspect.getsourcelines(func)[1], + } - -def user_info_dict(): - """ - Get user name, login, email and authorized flag from request headers - """ - fullname = request.headers.get('Adfs-Fullname', '') - login = request.headers.get('Adfs-Login', '') - email = request.headers.get('Adfs-Email', '') - authorized_user = is_user_authorized() - return {'login': login, - 'authorized_user': authorized_user, - 'fullname': fullname, - 'email': email} + return render_template("api_documentation.html", docs=docs) -def is_user_authorized(): +def is_user_authorized() -> bool: """ Return whether user is a member of administrators e-group """ - groups = [x.strip().lower() for x in request.headers.get('Adfs-Group', '???').split(';')] - return 'cms-ppd-pdmv-val-admin-pdmv' in groups + authorized_roles: set[str] = set(["cms-ppd-pdmv-val-admin-pdmv", "cms-pdmv-serv"]) + user_roles: set[str] = set(get_roles()) + return bool(user_roles & authorized_roles) def tick(): @@ -306,81 +386,31 @@ def setup_console_logging(): """ Setup logging to console """ - logging.basicConfig(format='[%(asctime)s][%(levelname)s] %(message)s', level=logging.INFO) - - -def get_config(mode): - """ - Get config as a dictionary - Based on the mode - prod or dev - """ - config = configparser.ConfigParser() - config.read('config.cfg') - config = dict(config.items(mode)) - logging.info('Config values:') - for key, value in config.items(): - if key in ('ssh_credentials', 'database_auth'): - logging.info(' %s: ******', key) - else: - logging.info(' %s: %s', key, value) - - return config + logging.basicConfig( + format="[%(asctime)s][%(levelname)s] %(message)s", level=logging.INFO + ) def main(): """ Main function, parse arguments, create a controller and start Flask web server """ - parser = argparse.ArgumentParser(description='RelMon Service') - parser.add_argument('--mode', - choices=['prod', 'dev'], - required=True, - help='Production (prod) or development (dev) mode') - parser.add_argument('--debug', - help='Debug mode', - action='store_true') - parser.add_argument('--port', - help='Port, default is 8001', - default=8001) - parser.add_argument('--host', - help='Host IP, default is 0.0.0.0', - default='0.0.0.0') - args = vars(parser.parse_args()) - debug = args.get('debug', False) + debug = DEBUG + host = HOST + port = PORT + setup_console_logging() - logger = logging.getLogger('logger') - mode = args.get('mode', 'dev').lower() - logger.info('Mode is "%s"', mode) - config = get_config(mode) - scheduler.add_executor('processpool') - if not debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true': - controller.set_config(config) - scheduler.add_job(tick, - 'interval', - seconds=int(config.get('tick_interval')), - max_instances=1) - - database_auth = config.get('database_auth') - if database_auth: - Database.set_credentials_file(database_auth) + logger = logging.getLogger("logger") + scheduler.add_executor("processpool") + if not debug or os.environ.get("WERKZEUG_RUN_MAIN") == "true": + controller.set_config() + scheduler.add_job(tick, "interval", seconds=TICK_INTERVAL, max_instances=1) scheduler.start() - port = args.get('port') - host = args.get('host') - logger.info('Will run on %s:%s', host, port) - if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': - # Do only once, before the reloader - pid = os.getpid() - logger.info('PID: %s', pid) - with open('relmonservice.pid', 'w') as pid_file: - pid_file.write(str(pid)) - - app.run(host=host, - port=port, - debug=debug, - threaded=True) + logger.info("Will run on %s:%s", host, port) + app.run(host=host, port=port, debug=debug, threaded=True) scheduler.shutdown() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/mongodb_database.py b/mongodb_database.py index e697e4e..da1668b 100644 --- a/mongodb_database.py +++ b/mongodb_database.py @@ -7,6 +7,7 @@ import os from pymongo import MongoClient from pymongo.errors import DuplicateKeyError +from environment import MONGO_DB_PORT, MONGO_DB_HOST, MONGO_DB_PASSWORD, MONGO_DB_USER class Database: @@ -14,28 +15,31 @@ class Database: Database class represents MongoDB database It encapsulates underlying connection and exposes some convenience methods """ + PAGE_SIZE = 10 - DATABASE_HOST = 'localhost' - DATABASE_PORT = 27017 - DATABASE_NAME = 'relmons' - COLLECTION_NAME = 'relmons' - USERNAME = None - PASSWORD = None + DATABASE_HOST = MONGO_DB_HOST + DATABASE_PORT = MONGO_DB_PORT + DATABASE_NAME = "relmons" + COLLECTION_NAME = "relmons" + USERNAME = MONGO_DB_USER + PASSWORD = MONGO_DB_PASSWORD def __init__(self): - self.logger = logging.getLogger('logger') - db_host = os.environ.get('DB_HOST', Database.DATABASE_HOST) - db_port = os.environ.get('DB_PORT', Database.DATABASE_PORT) + self.logger = logging.getLogger("logger") + db_host = os.environ.get("DB_HOST", Database.DATABASE_HOST) + db_port = os.environ.get("DB_PORT", Database.DATABASE_PORT) if Database.USERNAME and Database.PASSWORD: - self.logger.debug('Using DB with username and password') - self.client = MongoClient(db_host, - db_port, - username=Database.USERNAME, - password=Database.PASSWORD, - authSource='admin', - authMechanism='SCRAM-SHA-256')[Database.DATABASE_NAME] + self.logger.debug("Using DB with username and password") + self.client = MongoClient( + db_host, + db_port, + username=Database.USERNAME, + password=Database.PASSWORD, + authSource="admin", + authMechanism="SCRAM-SHA-256", + )[Database.DATABASE_NAME] else: - self.logger.debug('Using DB without username and password') + self.logger.debug("Using DB without username and password") self.client = MongoClient(db_host, db_port)[Database.DATABASE_NAME] self.relmons = self.client[self.COLLECTION_NAME] @@ -53,18 +57,18 @@ def set_credentials_file(cls, filename): """ Load credentials from a JSON file """ - with open(filename) as json_file: + with open(filename, encoding="utf-8") as json_file: credentials = json.load(json_file) - cls.set_credentials(credentials['username'], credentials['password']) + cls.set_credentials(credentials["username"], credentials["password"]) def create_relmon(self, relmon): """ Add given RelMon to the database """ relmon_json = relmon.get_json() - relmon_json['last_update'] = int(time.time()) - relmon_json['_id'] = relmon_json['id'] + relmon_json["last_update"] = int(time.time()) + relmon_json["_id"] = relmon_json["id"] try: return self.relmons.insert_one(relmon_json) except DuplicateKeyError: @@ -75,13 +79,13 @@ def update_relmon(self, relmon): Update given RelMon in the database based on ID """ relmon_json = relmon.get_json() - relmon_json['last_update'] = int(time.time()) - if '_id' not in relmon_json: - self.logger.error('No _id in document') + relmon_json["last_update"] = int(time.time()) + if "_id" not in relmon_json: + self.logger.error("No _id in document") return try: - self.relmons.replace_one({'_id': relmon_json['_id']}, relmon_json) + self.relmons.replace_one({"_id": relmon_json["_id"]}, relmon_json) except DuplicateKeyError: return @@ -89,7 +93,7 @@ def delete_relmon(self, relmon): """ Delete given RelMon from the database based on it's ID """ - self.relmons.delete_one({'_id': relmon.get_id()}) + self.relmons.delete_one({"_id": relmon.get_id()}) def get_relmon_count(self): """ @@ -101,7 +105,7 @@ def get_relmon(self, relmon_id): """ Fetch a RelMon with given ID from the database """ - return self.relmons.find_one({'_id': relmon_id}) + return self.relmons.find_one({"_id": relmon_id}) def get_relmons(self, query_dict=None, page=0, page_size=PAGE_SIZE): """ @@ -111,7 +115,7 @@ def get_relmons(self, query_dict=None, page=0, page_size=PAGE_SIZE): if query_dict is None: query_dict = {} - relmons = self.relmons.find(query_dict).sort('_id', -1) + relmons = self.relmons.find(query_dict).sort("_id", -1) total_rows = relmons.count() relmons = relmons.skip(page * page_size).limit(page_size) return list(relmons), total_rows @@ -120,19 +124,19 @@ def get_relmons_with_status(self, status): """ Get list of RelMons with given status """ - relmons = self.relmons.find({'status': status}) + relmons = self.relmons.find({"status": status}) return list(relmons) def get_relmons_with_condor_status(self, status): """ Get list of RelMons with given HTCondor status """ - relmons = self.relmons.find({'condor_status': status}) + relmons = self.relmons.find({"condor_status": status}) return list(relmons) def get_relmons_with_name(self, relmon_name): """ Get list of (should be one) RelMons with given name """ - relmons = self.relmons.find({'name': relmon_name}) + relmons = self.relmons.find({"name": relmon_name}) return list(relmons) diff --git a/remote/cmswebwrapper.py b/remote/cmswebwrapper.py index fd90f8e..808112b 100644 --- a/remote/cmswebwrapper.py +++ b/remote/cmswebwrapper.py @@ -6,13 +6,14 @@ import json import os import time + try: from http.client import HTTPSConnection except ImportError: from httplib import HTTPSConnection -class CMSWebWrapper(): +class CMSWebWrapper: """ CMSWebWrapper handles all communication with cmsweb It requires paths to grid user certificate and grid user key files @@ -29,33 +30,37 @@ def __get_connection(self): Return a HTTPSConnection to cmsweb.cern.ch """ if self.cert_file is None or self.key_file is None: - raise Exception('Missing user certificate or user key') + raise RuntimeError("Missing user certificate or user key") - return HTTPSConnection('cmsweb.cern.ch', - port=443, - cert_file=self.cert_file, - key_file=self.key_file, - timeout=120) + return HTTPSConnection( + "cmsweb.cern.ch", + port=443, + cert_file=self.cert_file, + key_file=self.key_file, + timeout=120, + ) def get(self, path, cache=True): """ Make a simple GET request Add Accept: application/json headers """ - logging.info('Will try to GET %s', path) + logging.info("Will try to GET %s", path) if cache and path in self.__cache: - logging.info('Found %s response in cache', path) + logging.info("Found %s response in cache", path) return self.__cache[path] connection = self.__get_connection() - connection.request('GET', path, headers={'Accept': 'application/json'}) + connection.request("GET", path, headers={"Accept": "application/json"}) response = connection.getresponse() if response.status != 200: - logging.error('Problems (%d) with %s: %s', response.status, path, response.read()) + logging.error( + "Problems (%d) with %s: %s", response.status, path, response.read() + ) connection.close() return None - decoded_response = response.read().decode('utf-8') + decoded_response = response.read().decode("utf-8") if cache: self.__cache[path] = decoded_response @@ -66,20 +71,20 @@ def get_big_file(self, path, filename=None): """ Download files chunk by chunk """ - logging.info('Will try to download file %s', path) + logging.info("Will try to download file %s", path) if filename is None: - filename = path.split('/')[-1] - logging.info('Using file name %s for %s', filename, path) + filename = path.split("/")[-1] + logging.info("Using file name %s for %s", filename, path) if os.path.isfile(filename): - logging.info('File %s already exists', filename) + logging.info("File %s already exists", filename) return filename connection = self.__get_connection() - connection.request('GET', path) + connection.request("GET", path) response = connection.getresponse() chunk_size = 1024 * 1024 * 8 # 8 megabytes - with open(filename, 'wb') as output_file: + with open(filename, "wb") as output_file: total_chunk_size = 0 start_time = time.time() while True: @@ -93,10 +98,12 @@ def get_big_file(self, path, filename=None): end_time = time.time() speed = (total_chunk_size / (1024.0 * 1024.0)) / (end_time - start_time) - logging.info('Downloaded %.2fMB in %.2fs. Speed %.2fMB/s', - total_chunk_size / (1024.0 * 1024.0), - end_time - start_time, - speed) + logging.info( + "Downloaded %.2fMB in %.2fs. Speed %.2fMB/s", + total_chunk_size / (1024.0 * 1024.0), + end_time - start_time, + speed, + ) connection.close() return filename @@ -105,7 +112,7 @@ def get_workflow(self, workflow_name): """ Get a single workflow from ReqMgr2 """ - workflow_string = self.get('/reqmgr2/data/request?name=%s' % (workflow_name)) + workflow_string = self.get("/reqmgr2/data/request?name=%s" % (workflow_name)) if not workflow_string: return None @@ -113,10 +120,12 @@ def get_workflow(self, workflow_name): workflow = json.loads(workflow_string) # 'result' is a list of elements and each of them is # dictionary that has workflow name as key - return workflow.get('result', [{}])[0].get(workflow_name) + return workflow.get("result", [{}])[0].get(workflow_name) except ValueError as ex: - logging.error('Failed to parse workflow %s JSON %s. %s', - workflow_name, - workflow_string, - ex) + logging.error( + "Failed to parse workflow %s JSON %s. %s", + workflow_name, + workflow_string, + ex, + ) return None diff --git a/remote/events.py b/remote/events.py index 9b38ac0..864f5de 100644 --- a/remote/events.py +++ b/remote/events.py @@ -2,25 +2,27 @@ Module contains a function get_events that extracts number of events from DQMIO file """ import logging + try: - #pylint: disable=import-error + # pylint: disable=import-error import ROOT - #pylint: enable=import-error + + # pylint: enable=import-error except ImportError: - logging.error('Error importing ROOT') + logging.error("Error importing ROOT") def get_events(file_name): """ Get events from a given DQMIO file """ - logging.info('Getting events for %s', file_name) + logging.info("Getting events for %s", file_name) try: - root_file = ROOT.TFile.Open(file_name, 'READ') + root_file = ROOT.TFile.Open(file_name, "READ") tree = root_file.Get("DQMData") return walk(tree) except Exception as ex: - logging.error('Error getting events for %s file: %s', file_name, ex) + logging.error("Error getting events for %s file: %s", file_name, ex) return 0 @@ -30,16 +32,16 @@ def walk(directory): Go through directories in depth-first way """ keys = directory.GetListOfKeys() - elements_to_enter = ('DQM', 'Run summary', 'TimerService', 'Generator', 'Particles') + elements_to_enter = ("DQM", "Run summary", "TimerService", "Generator", "Particles") for elem in keys: elem_name = elem.GetName() item = directory.Get(elem_name) if item: if item.IsFolder(): - if elem_name in elements_to_enter or elem_name.startswith('Run '): + if elem_name in elements_to_enter or elem_name.startswith("Run "): return walk(item) else: - if elem_name in ('nEvt', 'event allocated'): + if elem_name in ("nEvt", "event allocated"): try: return int(item.GetEntries()) except Exception as ex: diff --git a/remote/remote_apparatus.py b/remote/remote_apparatus.py index 8edfe82..2e51517 100644 --- a/remote/remote_apparatus.py +++ b/remote/remote_apparatus.py @@ -15,12 +15,15 @@ import time import sys import traceback +import subprocess from subprocess import Popen from difflib import SequenceMatcher -#pylint: disable=import-error + +# pylint: disable=import-error from cmswebwrapper import CMSWebWrapper from events import get_events -#pylint: enable=import-error + +# pylint: enable=import-error def get_dqmio_dataset(workflow): @@ -28,9 +31,9 @@ def get_dqmio_dataset(workflow): Given a workflow dictionary, return first occurence of DQMIO string Return None if it could not be found """ - output_datasets = workflow.get('OutputDatasets', []) + output_datasets = workflow.get("OutputDatasets", []) for dataset in output_datasets: - if '/DQMIO' in dataset: + if "/DQMIO" in dataset: return dataset return None @@ -40,61 +43,164 @@ def get_root_file_path_for_dataset(cmsweb, dqmio_dataset, category_name): """ Get list of URLs for given dataset """ - logging.info('Getting root file path for dataset %s. Category %s', - dqmio_dataset, - category_name) - cmssw = dqmio_dataset.split('/')[2].split('-')[0] - dataset_part = dqmio_dataset.replace('/', '__') - if category_name == 'Data': - cmsweb_dqm_dir_link = '/dqm/relval/data/browse/ROOT/RelValData/' + logging.info( + "Getting root file path for dataset %s. Category %s", + dqmio_dataset, + category_name, + ) + cmssw = dqmio_dataset.split("/")[2].split("-")[0] + dataset_part = dqmio_dataset.replace("/", "__") + if category_name == "Data": + cmsweb_dqm_dir_link = "/dqm/relval/data/browse/ROOT/RelValData/" else: - cmsweb_dqm_dir_link = '/dqm/relval/data/browse/ROOT/RelVal/' + cmsweb_dqm_dir_link = "/dqm/relval/data/browse/ROOT/RelVal/" - cmsweb_dqm_dir_link += '_'.join(cmssw.split('_')[:3]) + '_x/' + cmsweb_dqm_dir_link += "_".join(cmssw.split("_")[:3]) + "_x/" response = cmsweb.get(cmsweb_dqm_dir_link) hyperlink_regex = re.compile("href=['\"]([-\\._a-zA-Z/\\d]*)['\"]") hyperlinks = hyperlink_regex.findall(response)[1:] hyperlinks = list(hyperlinks) - logging.info('Substring to look for: %s. Total links in page: %s. Looking in %s', - dataset_part, - len(hyperlinks), - cmsweb_dqm_dir_link) + logging.info( + "Substring to look for: %s. Total links in page: %s. Looking in %s", + dataset_part, + len(hyperlinks), + cmsweb_dqm_dir_link, + ) hyperlinks = [x for x in hyperlinks if dataset_part in x] hyperlinks = sorted(hyperlinks) - logging.info('Selected hyperlinks %s', json.dumps(hyperlinks, indent=2, sort_keys=True)) + logging.info( + "Selected hyperlinks %s", json.dumps(hyperlinks, indent=2, sort_keys=True) + ) return hyperlinks +def get_client_credentials(): + """ + This function retrieves the client credentials given + via environment variables + + Returns: + dict: Credentials required to request an access token via + client credential grant + + Raises: + RuntimeError: If there are environment variables that were not provided + """ + required_variables = [ + "CALLBACK_CLIENT_ID", + "CALLBACK_CLIENT_SECRET", + "APPLICATION_CLIENT_ID", + ] + credentials = {} + msg = ( + "Some required environment variables are not available " + "to send the callback notification. Please set them:\n" + ) + for var in required_variables: + value = os.getenv(var) + if not value: + msg += "%s\n" % var + continue + credentials[var] = value + + if len(credentials) == len(required_variables): + logging.info("Returning OAuth2 credentials for requesting a token") + return credentials + + logging.error(msg) + raise RuntimeError(msg) + + +def get_access_token(credentials): + """ + Request an access token to Keycloak (CERN SSO) via a + client credential grant. + + Args: + credentials (dict): Credentials required to perform a client credential grant + Client ID, Client Secret and Target application (audience) + + Returns: + str: Authorization header including the captured access token + + Raises: + RuntimeError: If there is an issue requesting the access token + """ + cern_api_access_token = "https://auth.cern.ch/auth/realms/cern/api-access/token" + client_id = credentials["CALLBACK_CLIENT_ID"] + client_secret = credentials["CALLBACK_CLIENT_SECRET"] + audience = credentials["APPLICATION_CLIENT_ID"] + command = [ + "curl", + "-s", + "--location", + "--request", + "POST", + cern_api_access_token, + "--header", + "'Content-Type: application/x-www-form-urlencoded'", + "--data-urlencode 'grant_type=client_credentials'", + "--data-urlencode 'client_id=%s'" % client_id, + "--data-urlencode 'client_secret=%s'" % client_secret, + "--data-urlencode 'audience=%s'" % audience, + ] + command = " ".join(command) + logging.info("Requesting access token...") + proc = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + stdout = proc.communicate()[0] + stdout = stdout.decode("utf-8") + if proc.returncode != 0: + request_error = "Error requesting an access token: %s" % stdout + logging.error(request_error) + raise RuntimeError(request_error) + + token_content = json.loads(stdout) + token = token_content.get("access_token") + if not token: + token_error = "Invalid access token request. Details: %s" % token_content + logging.error(token_error) + raise RuntimeError(token_error) + + header = "Bearer %s" % token + return header + + def notify(relmon, callback_url): """ Send a notification about progress back to RelMon service """ - with open('notify_data.json', 'w') as json_file: + with open("notify_data.json", "w", encoding="utf-8") as json_file: json.dump(relmon, json_file, indent=2, sort_keys=True) - command = ['curl', - '-X', - 'POST', - '--cookie cookie.txt', - callback_url, - '-s', - '-k', - '-L', - '-m', - '60', - '-d', - '@notify_data.json', - '-H', - '\'Content-Type: application/json\'', - '-o', - '/dev/null'] - command = ' '.join(command) - logging.info('Notifying...') - proc = Popen(command, shell=True) - proc.wait() - - os.remove('notify_data.json') + credentials = get_client_credentials() + access_token = get_access_token(credentials) + + command = [ + "curl", + "-X", + "POST", + callback_url, + "-s", + "-k", + "-L", + "-m", + "60", + "-d", + "@notify_data.json", + "-H", + "'Content-Type: application/json'", + "-H", + "'Authorization: %s'" % access_token, + ] + command = " ".join(command) + logging.info("Notifying...") + proc = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + stdout = proc.communicate()[0] + stdout = stdout.decode("utf-8") + logging.info("Notification result: %s", stdout) + + os.remove("notify_data.json") time.sleep(0.05) @@ -102,70 +208,80 @@ def download_root_files(relmon, cmsweb, callback_url): """ Download all files needed for comparison and fill relmon dictionary """ - for category in relmon.get('categories', []): - if category['status'] != 'initial': + for category in relmon.get("categories", []): + if category["status"] != "initial": continue - category_name = category['name'] - reference_list = category.get('reference', []) - target_list = category.get('target', []) + category_name = category["name"] + reference_list = category.get("reference", []) + target_list = category.get("target", []) for item in reference_list + target_list: - name = item['name'] - if name.lower().startswith('/relval') and name.lower().endswith('/dqmio'): - logging.info('Name %s is dataset name', name) + name = item["name"] + if name.lower().startswith("/relval") and name.lower().endswith("/dqmio"): + logging.info("Name %s is dataset name", name) # Dataset name dqmio_dataset = name else: - logging.info('Name %s is workflow name', name) + logging.info("Name %s is workflow name", name) # Workflow name - workflow = cmsweb.get_workflow(item['name']) + workflow = cmsweb.get_workflow(item["name"]) if not workflow: - item['status'] = 'no_workflow' + item["status"] = "no_workflow" notify(relmon, callback_url) - logging.warning('Could not find workflow %s in ReqMgr2', item['name']) + logging.warning( + "Could not find workflow %s in ReqMgr2", item["name"] + ) continue dqmio_dataset = get_dqmio_dataset(workflow) if not dqmio_dataset: - item['status'] = 'no_dqmio' + item["status"] = "no_dqmio" notify(relmon, callback_url) - logging.warning('Could not find DQMIO dataset in %s. Datasets: %s', - item['name'], - ', '.join(workflow.get('OutputDatasets', []))) + logging.warning( + "Could not find DQMIO dataset in %s. Datasets: %s", + item["name"], + ", ".join(workflow.get("OutputDatasets", [])), + ) continue - file_urls = get_root_file_path_for_dataset(cmsweb, dqmio_dataset, category_name) + file_urls = get_root_file_path_for_dataset( + cmsweb, dqmio_dataset, category_name + ) if not file_urls: - item['status'] = 'no_root' + item["status"] = "no_root" notify(relmon, callback_url) - logging.warning('Could not get root file path for %s dataset of %s workflow', - dqmio_dataset, - item['name']) + logging.warning( + "Could not get root file path for %s dataset of %s workflow", + dqmio_dataset, + item["name"], + ) continue - item['versioned'] = len(file_urls) > 1 + item["versioned"] = len(file_urls) > 1 file_url = file_urls[-1] - logging.info('File URL for %s is %s', item['name'], file_url) - item['file_url'] = file_url - item['file_size'] = 0 - item['status'] = 'downloading' - item['file_name'] = item['file_url'].split('/')[-1] - item['events'] = 0 + logging.info("File URL for %s is %s", item["name"], file_url) + item["file_url"] = file_url + item["file_size"] = 0 + item["status"] = "downloading" + item["file_name"] = item["file_url"].split("/")[-1] + item["events"] = 0 notify(relmon, callback_url) try: - item['file_name'] = cmsweb.get_big_file(item['file_url']) - item['status'] = 'downloaded' - item['file_size'] = os.path.getsize(item['file_name']) - item['events'] = get_events(item['file_name']) - logging.info('Downloaded %s. Size %.2f MB. Events %s', - item['file_name'], - item.get('file_size', 0) / 1024.0 / 1024.0, - item['events']) + item["file_name"] = cmsweb.get_big_file(item["file_url"]) + item["status"] = "downloaded" + item["file_size"] = os.path.getsize(item["file_name"]) + item["events"] = get_events(item["file_name"]) + logging.info( + "Downloaded %s. Size %.2f MB. Events %s", + item["file_name"], + item.get("file_size", 0) / 1024.0 / 1024.0, + item["events"], + ) except Exception as ex: logging.error(ex) - logging.error('Error getting %s for %s', item['file_url'], item['name']) - item['status'] = 'failed' + logging.error("Error getting %s for %s", item["file_url"], item["name"]) + item["status"] = "failed" notify(relmon, callback_url) @@ -176,13 +292,13 @@ def get_local_subreport_path(category_name, hlt): Basically add Report and HLT to category name """ name = category_name - if 'PU' in category_name: - name = name.split('_')[0] + 'Report_PU' + if "PU" in category_name: + name = name.split("_")[0] + "Report_PU" else: - name += 'Report' + name += "Report" if hlt: - name += '_HLT' + name += "_HLT" return name @@ -191,7 +307,7 @@ def get_important_part(file_name): """ Return part of dataset file name that will be used for matching """ - return file_name.split('__')[1] + '_' + file_name.split("__")[2].split("-")[1] + return file_name.split("__")[1] + "_" + file_name.split("__")[2].split("-")[1] def make_file_tree(items, category): @@ -200,17 +316,17 @@ def make_file_tree(items, category): """ result_tree = {} for item in items: - filename = item['file_name'] - split_filename = filename.split('__') + filename = item["file_name"] + split_filename = filename.split("__") if len(split_filename) < 2: logging.error('Bad file name: "%s"', filename) continue - dataset = split_filename[1].split('_')[0] - if category == 'Data': - run_number = split_filename[0].split('_')[-1] + dataset = split_filename[1].split("_")[0] + if category == "Data": + run_number = split_filename[0].split("_")[-1] else: - run_number = 'all_runs' + run_number = "all_runs" if dataset not in result_tree: result_tree[dataset] = {} @@ -244,12 +360,12 @@ def calculate_similarities(references, targets): all_ratios = [] for reference in references: for target in targets: - reference_string = get_important_part(reference['file_name']) - target_string = get_important_part(target['file_name']) + reference_string = get_important_part(reference["file_name"]) + target_string = get_important_part(target["file_name"]) ratio = SequenceMatcher(a=reference_string, b=target_string).ratio() reference_target_ratio = (reference, target, ratio) all_ratios.append(reference_target_ratio) - logging.info('%s %s -> %s', reference_string, target_string, ratio) + logging.info("%s %s -> %s", reference_string, target_string, ratio) all_ratios.sort(key=lambda x: x[2], reverse=True) return all_ratios @@ -265,59 +381,68 @@ def pick_pairs(all_ratios): for reference_target_ratio in all_ratios: reference = reference_target_ratio[0] target = reference_target_ratio[1] - reference_name = reference['file_name'] - target_name = target['file_name'] + reference_name = reference["file_name"] + target_name = target["file_name"] ratio = reference_target_ratio[2] if reference_name not in used_references and target_name not in used_targets: - logging.info('Pair %s with %s. Similarity %.3f', reference_name, target_name, ratio) + logging.info( + "Pair %s with %s. Similarity %.3f", reference_name, target_name, ratio + ) used_references.add(reference_name) used_targets.add(target_name) selected_pairs.append((reference, target)) return selected_pairs + def pair_references_with_targets(category): """ Do automatic pairing based on dataset names, runs and similarities in names """ - logging.info('Will try to automatically find pairs in %s', category['name']) - references = category.get('reference', []) - targets = category.get('target', []) - reference_tree = make_file_tree(references, category['name']) - target_tree = make_file_tree(targets, category['name']) + logging.info("Will try to automatically find pairs in %s", category["name"]) + references = category.get("reference", []) + targets = category.get("target", []) + reference_tree = make_file_tree(references, category["name"]) + target_tree = make_file_tree(targets, category["name"]) - logging.info('References tree: %s', json.dumps(reference_tree, indent=2, sort_keys=True)) - logging.info('Targets tree: %s', json.dumps(target_tree, indent=2, sort_keys=True)) + logging.info( + "References tree: %s", json.dumps(reference_tree, indent=2, sort_keys=True) + ) + logging.info("Targets tree: %s", json.dumps(target_tree, indent=2, sort_keys=True)) selected_pairs = [] for reference_dataset, reference_runs in reference_tree.items(): for reference_run, references_in_run in reference_runs.items(): - targets_in_run = target_tree.get(reference_dataset, {}).get(reference_run, []) + targets_in_run = target_tree.get(reference_dataset, {}).get( + reference_run, [] + ) if len(references_in_run) == 1 and len(targets_in_run) == 1: reference = references_in_run.pop() target = targets_in_run.pop() - reference_name = reference['file_name'] - target_name = target['file_name'] - logging.info('Pair %s with %s', reference_name, target_name) - reference['match'] = target['name'] - target['match'] = reference['name'] + reference_name = reference["file_name"] + target_name = target["file_name"] + logging.info("Pair %s with %s", reference_name, target_name) + reference["match"] = target["name"] + target["match"] = reference["name"] selected_pairs.append((reference_name, target_name)) else: - logging.info('Dataset %s. Run %s. Will try to match %s\nwith\n%s', - reference_dataset, - reference_run, - json.dumps(references_in_run, indent=2, sort_keys=True), - json.dumps(targets_in_run, indent=2, sort_keys=True)) + logging.info( + "Dataset %s. Run %s. Will try to match %s\nwith\n%s", + reference_dataset, + reference_run, + json.dumps(references_in_run, indent=2, sort_keys=True), + json.dumps(targets_in_run, indent=2, sort_keys=True), + ) all_ratios = calculate_similarities(references_in_run, targets_in_run) pairs = pick_pairs(all_ratios) for reference, target in pairs: references_in_run.remove(reference) targets_in_run.remove(target) - selected_pairs.append((reference['file_name'], target['file_name'])) - reference['match'] = target['name'] - target['match'] = reference['name'] + selected_pairs.append((reference["file_name"], target["file_name"])) + reference["match"] = target["name"] + target["match"] = reference["name"] # Delete empty items wo there would be less to print for tree in (reference_tree, target_tree): @@ -325,13 +450,16 @@ def pair_references_with_targets(category): for _, runs in tree.items(): for _, items_in_run in runs.items(): for item in items_in_run: - if item['status'] == 'downloaded': - item['status'] = 'no_match' + if item["status"] == "downloaded": + item["status"] = "no_match" - logging.info('References leftovers tree: %s', - json.dumps(reference_tree, indent=2, sort_keys=True)) - logging.info('Targets leftovers tree: %s', - json.dumps(target_tree, indent=2, sort_keys=True)) + logging.info( + "References leftovers tree: %s", + json.dumps(reference_tree, indent=2, sort_keys=True), + ) + logging.info( + "Targets leftovers tree: %s", json.dumps(target_tree, indent=2, sort_keys=True) + ) sorted_references = [x[0] for x in selected_pairs] sorted_targets = [x[1] for x in selected_pairs] @@ -344,77 +472,94 @@ def get_dataset_lists(category): Return lists of files to compare Automatically paired if automatic pairing is enabled """ - reference_list = category.get('reference', {}) - target_list = category.get('target', {}) + reference_list = category.get("reference", {}) + target_list = category.get("target", {}) reference_dataset_list = [] target_dataset_list = [] - automatic_pairing = category['automatic_pairing'] + automatic_pairing = category["automatic_pairing"] if not reference_list and not target_list: return [], [] if automatic_pairing: - reference_dataset_list, target_dataset_list = pair_references_with_targets(category) + reference_dataset_list, target_dataset_list = pair_references_with_targets( + category + ) else: for i in range(min(len(reference_list), len(target_list))): - if reference_list[i]['file_name'] and target_list[i]['file_name']: - reference_dataset_list.append(reference_list[i]['file_name']) - target_dataset_list.append(target_list[i]['file_name']) - reference_list[i]['match'] = target_list[i]['name'] - target_list[i]['match'] = reference_list[i]['name'] - - if not reference_list[i]['file_name']: - logging.error('File name is missing for %s, will not compare this workflow', - reference_list[i]['name']) - - if not target_list[i]['file_name']: - logging.error('File name is missing for %s, will not compare this workflow', - target_list[i]['name']) + if reference_list[i]["file_name"] and target_list[i]["file_name"]: + reference_dataset_list.append(reference_list[i]["file_name"]) + target_dataset_list.append(target_list[i]["file_name"]) + reference_list[i]["match"] = target_list[i]["name"] + target_list[i]["match"] = reference_list[i]["name"] + + if not reference_list[i]["file_name"]: + logging.error( + "File name is missing for %s, will not compare this workflow", + reference_list[i]["name"], + ) + + if not target_list[i]["file_name"]: + logging.error( + "File name is missing for %s, will not compare this workflow", + target_list[i]["name"], + ) return reference_dataset_list, target_dataset_list -def compare_compress_move(category_name, hlt, reference_list, target_list, cpus, log_file): +def compare_compress_move( + category_name, hlt, reference_list, target_list, cpus, log_file +): """ The main function that compares, compresses and moves reports to Reports directory """ subreport_path = get_local_subreport_path(category_name, hlt) - comparison_command = ' '.join(['ValidationMatrix.py', - '-R', - ','.join(reference_list), - '-T', - ','.join(target_list), - '-o', - subreport_path, - '-N %s' % (cpus), - '--hash_name', - '--HLT' if hlt else '']) + comparison_command = " ".join( + [ + "ValidationMatrix.py", + "-R", + ",".join(reference_list), + "-T", + ",".join(target_list), + "-o", + subreport_path, + "-N %s" % (cpus), + "--hash_name", + "--HLT" if hlt else "", + ] + ) # Remove all /cms-service-reldqm/style/blueprint/ from HTML files - path_fix_command = ("find %s/ -type f -name '*.html' |xargs -L1 " - "sed -i -e 's#/cms-service-reldqm/style/blueprint/##g'" % (subreport_path)) - img_src_fix_command = ("find %s/ -type f -name '*.html' |xargs -L1 " - "sed -i -e 's#http://cmsweb.cern.ch//dqm#https://cmsweb.cern.ch/dqm#g'" % (subreport_path)) - compression_command = ' '.join(['dir2webdir.py', subreport_path]) - move_command = ' '.join(['mv', subreport_path, 'Reports/']) - - logging.info('ValidationMatrix command: %s', comparison_command) + path_fix_command = ( + "find %s/ -type f -name '*.html' |xargs -L1 " + "sed -i -e 's#/cms-service-reldqm/style/blueprint/##g'" % (subreport_path) + ) + img_src_fix_command = ( + "find %s/ -type f -name '*.html' |xargs -L1 " + "sed -i -e 's#http://cmsweb.cern.ch//dqm#https://cmsweb.cern.ch/dqm#g'" + % (subreport_path) + ) + compression_command = " ".join(["dir2webdir.py", subreport_path]) + move_command = " ".join(["mv", subreport_path, "Reports/"]) + + logging.info("ValidationMatrix command: %s", comparison_command) proc = Popen(comparison_command, stdout=log_file, stderr=log_file, shell=True) proc.communicate() - logging.info('Path fix command: %s', path_fix_command) + logging.info("Path fix command: %s", path_fix_command) proc = Popen(path_fix_command, stdout=log_file, stderr=log_file, shell=True) proc.communicate() - logging.info(' src fix command: %s', img_src_fix_command) + logging.info(" src fix command: %s", img_src_fix_command) proc = Popen(img_src_fix_command, stdout=log_file, stderr=log_file, shell=True) proc.communicate() - logging.info('Compression command: %s', compression_command) + logging.info("Compression command: %s", compression_command) proc = Popen(compression_command, stdout=log_file, stderr=log_file, shell=True) proc.communicate() - logging.info('Move command: %s', move_command) + logging.info("Move command: %s", move_command) proc = Popen(move_command, stdout=log_file, stderr=log_file, shell=True) proc.communicate() @@ -423,50 +568,49 @@ def run_validation_matrix(relmon, cpus, callback_url): """ Iterate through categories and start comparison process """ - with open("validation_matrix.log", "w") as log_file: - for category in relmon.get('categories', []): - if category['status'] != 'initial': + with open("validation_matrix.log", "w", encoding="utf-8") as log_file: + for category in relmon.get("categories", []): + if category["status"] != "initial": continue - category_name = category['name'] + category_name = category["name"] subreport_path_no_hlt = get_local_subreport_path(category_name, False) - logging.info('Creating directory %s', subreport_path_no_hlt) - os.makedirs('Reports/' + subreport_path_no_hlt) - if category_name.lower() != 'generator': + logging.info("Creating directory %s", subreport_path_no_hlt) + os.makedirs("Reports/" + subreport_path_no_hlt) + if category_name.lower() != "generator": subreport_path_hlt = get_local_subreport_path(category_name, True) - logging.info('Creating directory %s', subreport_path_hlt) - os.makedirs('Reports/' + subreport_path_hlt) + logging.info("Creating directory %s", subreport_path_hlt) + os.makedirs("Reports/" + subreport_path_hlt) - hlt = category['hlt'] - logging.info('Category: %s', category_name) - logging.info('HLT: %s', hlt) + hlt = category["hlt"] + logging.info("Category: %s", category_name) + logging.info("HLT: %s", hlt) reference_list, target_list = get_dataset_lists(category) if reference_list and target_list: - category['status'] = 'comparing' + category["status"] = "comparing" notify(relmon, callback_url) # Run Generator without HLT # Do not run Generator with HLT - if hlt in ('only', 'both') and category_name.lower() != 'generator': + if hlt in ("only", "both") and category_name.lower() != "generator": # Run with HLT # Do not run generator with HLT - compare_compress_move(category_name, - True, - reference_list, - target_list, - cpus, - log_file) - - if hlt in ('no', 'both') or category_name.lower() == 'generator': + compare_compress_move( + category_name, True, reference_list, target_list, cpus, log_file + ) + + if hlt in ("no", "both") or category_name.lower() == "generator": # Run without HLT # Run Generator without HLT - compare_compress_move(category_name, - False, - reference_list, - target_list, - cpus, - log_file) - - category['status'] = 'done' + compare_compress_move( + category_name, + False, + reference_list, + target_list, + cpus, + log_file, + ) + + category["status"] = "done" notify(relmon, callback_url) @@ -474,87 +618,81 @@ def main(): """ Main function """ - parser = argparse.ArgumentParser(description='File downloader and ValidationMatrix runner') - parser.add_argument('--relmon', - '-r', - type=str, - help='JSON file with RelMon') - parser.add_argument('--cert', - '-c', - type=str, - help='File name for GRID certificate') - parser.add_argument('--key', - '-k', - type=str, - help='File name for GRID key') - parser.add_argument('--proxy', - '-p', - type=str, - help='File name for GRID proxy file') - parser.add_argument('--cpus', - nargs='?', - const=1, - type=int, - default=1, - help='Number of CPU cores for ValidationMatrix') - parser.add_argument('--callback', - type=str, - help='URL for callbacks') - parser.add_argument('--notifydone', - action='store_true', - help='Just notify that job is completed') + parser = argparse.ArgumentParser( + description="File downloader and ValidationMatrix runner" + ) + parser.add_argument("--relmon", "-r", type=str, help="JSON file with RelMon") + parser.add_argument("--cert", "-c", type=str, help="File name for GRID certificate") + parser.add_argument("--key", "-k", type=str, help="File name for GRID key") + parser.add_argument("--proxy", "-p", type=str, help="File name for GRID proxy file") + parser.add_argument( + "--cpus", + nargs="?", + const=1, + type=int, + default=1, + help="Number of CPU cores for ValidationMatrix", + ) + parser.add_argument("--callback", type=str, help="URL for callbacks") + parser.add_argument( + "--notifydone", action="store_true", help="Just notify that job is completed" + ) args = vars(parser.parse_args()) - logging.basicConfig(stream=sys.stdout, - format='[%(asctime)s][%(levelname)s] %(message)s', - level=logging.INFO) - - cert_file = args.get('cert') - key_file = args.get('key') - proxy_file = args.get('proxy') - relmon_filename = args.get('relmon') - cpus = args.get('cpus', 1) - callback_url = args.get('callback') - notify_done = bool(args.get('notifydone')) - logging.info('Arguments: %s; cert %s; key %s; proxy: %s; cpus %s; callback %s; notify %s', - relmon_filename, - cert_file, - key_file, - proxy_file, - cpus, - callback_url, - 'YES' if notify_done else 'NO') - - with open(relmon_filename) as relmon_file: + logging.basicConfig( + stream=sys.stdout, + format="[%(asctime)s][%(levelname)s] %(message)s", + level=logging.INFO, + ) + + cert_file = args.get("cert") + key_file = args.get("key") + proxy_file = args.get("proxy") + relmon_filename = args.get("relmon") + cpus = args.get("cpus", 1) + callback_url = args.get("callback") + notify_done = bool(args.get("notifydone")) + logging.info( + "Arguments: %s; cert %s; key %s; proxy: %s; cpus %s; callback %s; notify %s", + relmon_filename, + cert_file, + key_file, + proxy_file, + cpus, + callback_url, + "YES" if notify_done else "NO", + ) + + with open(relmon_filename, encoding="utf-8") as relmon_file: relmon = json.load(relmon_file) try: if notify_done: - if relmon['status'] != 'failed': - relmon['status'] = 'done' + if relmon["status"] != "failed": + relmon["status"] = "done" else: - logging.info('Will notify about failure') + logging.info("Will notify about failure") else: if (not cert_file or not key_file) and proxy_file: cert_file = proxy_file key_file = proxy_file cmsweb = CMSWebWrapper(cert_file, key_file) - relmon['status'] = 'running' + relmon["status"] = "running" notify(relmon, callback_url) download_root_files(relmon, cmsweb, callback_url) run_validation_matrix(relmon, cpus, callback_url) - relmon['status'] = 'finishing' + relmon["status"] = "finishing" except Exception as ex: logging.error(ex) logging.error(traceback.format_exc()) - relmon['status'] = 'failed' + relmon["status"] = "failed" finally: - with open(relmon_filename, 'w') as relmon_file: + with open(relmon_filename, "w", encoding="utf-8") as relmon_file: json.dump(relmon, relmon_file, indent=2, sort_keys=True) notify(relmon, callback_url) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/remote/sqltify.py b/remote/sqltify.py index b18e257..cce274e 100644 --- a/remote/sqltify.py +++ b/remote/sqltify.py @@ -6,56 +6,64 @@ import logging import sys -logging.basicConfig(stream=sys.stdout, - format='[%(asctime)s][%(levelname)s] %(message)s', - level=logging.INFO) +logging.basicConfig( + stream=sys.stdout, + format="[%(asctime)s][%(levelname)s] %(message)s", + level=logging.INFO, +) -categories = [entry for entry in os.listdir('.') if os.path.isdir(os.path.join('.', entry))] -logging.info('Categories %s', categories) -db_connection = sqlite3.connect('reports.sqlite') +categories = [ + entry for entry in os.listdir(".") if os.path.isdir(os.path.join(".", entry)) +] +logging.info("Categories %s", categories) +db_connection = sqlite3.connect("reports.sqlite") db_cursor = db_connection.cursor() indexes_to_create = [] for category in categories: # Create tables - logging.info('Recreating table for %s', category) - db_cursor.execute('DROP TABLE IF EXISTS %s;' % (category)) - db_cursor.execute('DROP INDEX IF EXISTS %sIndex;' % (category)) - db_cursor.execute('CREATE TABLE %s (path text, htmlgz blob);' % (category)) + logging.info("Recreating table for %s", category) + db_cursor.execute("DROP TABLE IF EXISTS %s;" % (category)) + db_cursor.execute("DROP INDEX IF EXISTS %sIndex;" % (category)) + db_cursor.execute("CREATE TABLE %s (path text, htmlgz blob);" % (category)) # Walk through files and add to database files_inserted = 0 for root, dirs, files in os.walk(category, topdown=False): for name in files: file_path = os.path.join(root, name) - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: ablob = f.read() - db_cursor.execute("INSERT INTO %s VALUES (?, ?)" % (category), [file_path, ablob]) + db_cursor.execute( + "INSERT INTO %s VALUES (?, ?)" % (category), [file_path, ablob] + ) files_inserted += 1 if files_inserted % 1000 == 0: - logging.info('Commit after %s inserted files for %s', files_inserted, category) + logging.info( + "Commit after %s inserted files for %s", files_inserted, category + ) db_connection.commit() - logging.info('Commit after %s inserted files for %s', files_inserted, category) + logging.info("Commit after %s inserted files for %s", files_inserted, category) db_connection.commit() db_connection.commit() if files_inserted == 0: - logging.info('No files were inserted for %s, dropping empty table', category) - db_cursor.execute('DROP TABLE IF EXISTS %s;' % (category)) + logging.info("No files were inserted for %s, dropping empty table", category) + db_cursor.execute("DROP TABLE IF EXISTS %s;" % (category)) else: indexes_to_create.append(category) db_connection.commit() # Reclaim space from deleted entries -db_cursor.execute('VACUUM;') +db_cursor.execute("VACUUM;") db_connection.commit() # Create index for category in indexes_to_create: - logging.info('Creating index for %s', category) - db_cursor.execute('CREATE INDEX %sIndex ON %s(path)' % (category, category)) + logging.info("Creating index for %s", category) + db_cursor.execute("CREATE INDEX %sIndex ON %s(path)" % (category, category)) db_connection.commit() db_connection.close() diff --git a/requirements.txt b/requirements.txt index 1ca9be0..44f1e67 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,12 @@ -APScheduler==3.6.1 +APScheduler==3.10.1 cryptography>=3.2 -Flask==1.0.2 -Flask-Cors==3.0.9 -Flask-RESTful==0.3.6 -Jinja2==2.11.3 -paramiko==2.6.0 +Flask==2.3.2 +Flask-Cors==3.0.10 +Flask-RESTful==0.3.10 +paramiko==3.3.1 pymongo==3.10.1 -pylint==2.9.5 +pylint>=2.17.5 +mypy==1.4.1 +PyJWT>=2.6.0 +Authlib>=1.2.0 +requests>=2.31.0 \ No newline at end of file