diff --git a/pype/plugins/global/publish/submit_publish_job.py b/pype/plugins/global/publish/submit_publish_job.py index 2fe6735e900..ab5d6cf9b2e 100644 --- a/pype/plugins/global/publish/submit_publish_job.py +++ b/pype/plugins/global/publish/submit_publish_job.py @@ -278,26 +278,14 @@ def _submit_deadline_post_job(self, instance, job, instances): # Mandatory for Deadline, may be empty "AuxFiles": [], } - """ - In this part we will add file dependencies instead of job dependencies. - This way we don't need to take care of tile assembly job, getting its - id or name. We expect it to produce specific file with specific name - and we are just waiting for them. - """ + + # add assembly jobs as dependencies if instance.data.get("tileRendering"): - self.log.info("Adding tile assembly results as dependencies...") - asset_index = 0 - for inst in instances: - for represenation in inst.get("representations", []): - if isinstance(represenation["files"], (list, tuple)): - for file in represenation["files"]: - dependency = os.path.join(output_dir, file) - payload["JobInfo"]["AssetDependency{}".format(asset_index)] = dependency # noqa: E501 - else: - dependency = os.path.join( - output_dir, represenation["files"]) - payload["JobInfo"]["AssetDependency{}".format(asset_index)] = dependency # noqa: E501 - asset_index += 1 + self.log.info("Adding tile assembly jobs as dependencies...") + job_index = 0 + for assembly_id in instance.data.get("assemblySubmissionJobs"): + payload["JobInfo"]["JobDependency{}".format(job_index)] = assembly_id # noqa: E501 + job_index += 1 else: payload["JobInfo"]["JobDependency0"] = job["_id"] @@ -729,7 +717,8 @@ def process(self, instance): "pixelAspect": data.get("pixelAspect", 1), "resolutionWidth": data.get("resolutionWidth", 1920), "resolutionHeight": data.get("resolutionHeight", 1080), - "multipartExr": data.get("multipartExr", False) + "multipartExr": data.get("multipartExr", False), + "jobBatchName": data.get("jobBatchName", "") } if "prerender" in instance.data["families"]: @@ -906,8 +895,13 @@ def process(self, instance): # We still use data from it so lets fake it. # # Batch name reflect original scene name - render_job["Props"]["Batch"] = os.path.splitext(os.path.basename( - context.data.get("currentFile")))[0] + + if instance.data.get("assemblySubmissionJobs"): + render_job["Props"]["Batch"] = instance.data.get( + "jobBatchName") + else: + render_job["Props"]["Batch"] = os.path.splitext( + os.path.basename(context.data.get("currentFile")))[0] # User is deadline user render_job["Props"]["User"] = context.data.get( "deadlineUser", getpass.getuser()) diff --git a/pype/plugins/maya/create/create_render.py b/pype/plugins/maya/create/create_render.py index 9f05226f69f..6826d33c580 100644 --- a/pype/plugins/maya/create/create_render.py +++ b/pype/plugins/maya/create/create_render.py @@ -186,6 +186,8 @@ def _create_render_settings(self): self.data["useMayaBatch"] = False self.data["vrayScene"] = False self.data["tileRendering"] = False + self.data["tilesX"] = 2 + self.data["tilesY"] = 2 # Disable for now as this feature is not working yet # self.data["assScene"] = False diff --git a/pype/plugins/maya/publish/collect_render.py b/pype/plugins/maya/publish/collect_render.py index dfae6ed0af2..91230fcc462 100644 --- a/pype/plugins/maya/publish/collect_render.py +++ b/pype/plugins/maya/publish/collect_render.py @@ -244,6 +244,8 @@ def process(self, context): "resolutionHeight": cmds.getAttr("defaultResolution.height"), "pixelAspect": cmds.getAttr("defaultResolution.pixelAspect"), "tileRendering": render_instance.data.get("tileRendering") or False, # noqa: E501 + "tilesX": render_instance.data.get("tilesX") or 2, + "tilesY": render_instance.data.get("tilesY") or 2, "priority": render_instance.data.get("priority") } diff --git a/pype/plugins/maya/publish/submit_maya_deadline.py b/pype/plugins/maya/publish/submit_maya_deadline.py index 7fe20c779d8..747d2727b7e 100644 --- a/pype/plugins/maya/publish/submit_maya_deadline.py +++ b/pype/plugins/maya/publish/submit_maya_deadline.py @@ -16,11 +16,15 @@ """ +from __future__ import print_function import os import json import getpass import copy import re +import hashlib +from datetime import datetime +import itertools import clique import requests @@ -61,6 +65,97 @@ } +def _format_tiles( + filename, index, tiles_x, tiles_y, + width, height, prefix, origin="blc"): + """Generate tile entries for Deadline tile job. + + Returns two dictionaries - one that can be directly used in Deadline + job, second that can be used for Deadline Assembly job configuration + file. + + This will format tile names: + + Example:: + { + "OutputFilename0Tile0": "_tile_1x1_4x4_Main_beauty.1001.exr", + "OutputFilename0Tile1": "_tile_2x1_4x4_Main_beauty.1001.exr" + } + + And add tile prefixes like: + + Example:: + Image prefix is: + `maya///_` + + Result for tile 0 for 4x4 will be: + `maya///_tile_1x1_4x4__` + + Calculating coordinates is tricky as in Job they are defined as top, + left, bottom, right with zero being in top-left corner. But Assembler + configuration file takes tile coordinates as X, Y, Width and Height and + zero is bottom left corner. + + Args: + filename (str): Filename to process as tiles. + index (int): Index of that file if it is sequence. + tiles_x (int): Number of tiles in X. + tiles_y (int): Number if tikes in Y. + width (int): Width resolution of final image. + height (int): Height resolution of final image. + prefix (str): Image prefix. + + Returns: + (dict, dict): Tuple of two dictionaires - first can be used to + extend JobInfo, second has tiles x, y, width and height + used for assembler configuration. + + """ + tile = 0 + out = {"JobInfo": {}, "PluginInfo": {}} + cfg = {} + w_space = width / tiles_x + h_space = height / tiles_y + + for tile_x in range(1, tiles_x + 1): + for tile_y in range(1, tiles_y + 1): + tile_prefix = "_tile_{}x{}_{}x{}_".format( + tile_x, tile_y, + tiles_x, + tiles_y + ) + out_tile_index = "OutputFilename{}Tile{}".format( + str(index), tile + ) + new_filename = "{}/{}{}".format( + os.path.dirname(filename), + tile_prefix, + os.path.basename(filename) + ) + out["JobInfo"][out_tile_index] = new_filename + out["PluginInfo"]["RegionPrefix{}".format(str(tile))] = \ + "/{}".format(tile_prefix).join(prefix.rsplit("/", 1)) + + out["PluginInfo"]["RegionTop{}".format(tile)] = int(height) - (tile_y * h_space) # noqa: E501 + out["PluginInfo"]["RegionBottom{}".format(tile)] = int(height) - ((tile_y - 1) * h_space) - 1 # noqa: E501 + out["PluginInfo"]["RegionLeft{}".format(tile)] = (tile_x - 1) * w_space # noqa: E501 + out["PluginInfo"]["RegionRight{}".format(tile)] = (tile_x * w_space) - 1 # noqa: E501 + + cfg["Tile{}".format(tile)] = new_filename + cfg["Tile{}Tile".format(tile)] = new_filename + cfg["Tile{}X".format(tile)] = (tile_x - 1) * w_space + if origin == "blc": + cfg["Tile{}Y".format(tile)] = (tile_y - 1) * h_space + else: + cfg["Tile{}Y".format(tile)] = int(height) - ((tile_y - 1) * h_space) # noqa: E501 + + cfg["Tile{}Width".format(tile)] = tile_x * w_space + cfg["Tile{}Height".format(tile)] = tile_y * h_space + + tile += 1 + return out, cfg + + def get_renderer_variables(renderlayer, root): """Retrieve the extension which has been set in the VRay settings. @@ -164,6 +259,7 @@ class MayaSubmitDeadline(pyblish.api.InstancePlugin): optional = True use_published = True + tile_assembler_plugin = "PypeTileAssembler" def process(self, instance): """Plugin entry point.""" @@ -309,7 +405,7 @@ def process(self, instance): # Optional, enable double-click to preview rendered # frames from Deadline Monitor payload_skeleton["JobInfo"]["OutputDirectory0"] = \ - os.path.dirname(output_filename_0) + os.path.dirname(output_filename_0).replace("\\", "/") payload_skeleton["JobInfo"]["OutputFilename0"] = \ output_filename_0.replace("\\", "/") @@ -376,9 +472,8 @@ def process(self, instance): # Add list of expected files to job --------------------------------- exp = instance.data.get("expectedFiles") - - output_filenames = {} exp_index = 0 + output_filenames = {} if isinstance(exp[0], dict): # we have aovs and we need to iterate over them @@ -390,33 +485,237 @@ def process(self, instance): assert len(rem) == 1, ("Found multiple non related files " "to render, don't know what to do " "with them.") - payload['JobInfo']['OutputFilename' + str(exp_index)] = rem[0] # noqa: E501 output_file = rem[0] + if not instance.data.get("tileRendering"): + payload['JobInfo']['OutputFilename' + str(exp_index)] = output_file # noqa: E501 else: output_file = col[0].format('{head}{padding}{tail}') - payload['JobInfo']['OutputFilename' + str(exp_index)] = output_file # noqa: E501 - output_filenames[exp_index] = output_file + if not instance.data.get("tileRendering"): + payload['JobInfo']['OutputFilename' + str(exp_index)] = output_file # noqa: E501 + + output_filenames['OutputFilename' + str(exp_index)] = output_file # noqa: E501 exp_index += 1 else: - col, rem = clique.assemble(files) + col, rem = clique.assemble(exp) if not col and rem: # we couldn't find any collections but have # individual files. assert len(rem) == 1, ("Found multiple non related files " "to render, don't know what to do " "with them.") - payload['JobInfo']['OutputFilename' + str(exp_index)] = rem[0] # noqa: E501 + + output_file = rem[0] + if not instance.data.get("tileRendering"): + payload['JobInfo']['OutputFilename' + str(exp_index)] = output_file # noqa: E501 else: output_file = col[0].format('{head}{padding}{tail}') - payload['JobInfo']['OutputFilename' + str(exp_index)] = output_file # noqa: E501 + if not instance.data.get("tileRendering"): + payload['JobInfo']['OutputFilename' + str(exp_index)] = output_file # noqa: E501 + + output_filenames['OutputFilename' + str(exp_index)] = output_file plugin = payload["JobInfo"]["Plugin"] self.log.info("using render plugin : {}".format(plugin)) + # Store output dir for unified publisher (filesequence) + instance.data["outputDir"] = os.path.dirname(output_filename_0) + self.preflight_check(instance) - # Submit job to farm ------------------------------------------------ - if not instance.data.get("tileRendering"): + # Prepare tiles data ------------------------------------------------ + if instance.data.get("tileRendering"): + # if we have sequence of files, we need to create tile job for + # every frame + + payload["JobInfo"]["TileJob"] = True + payload["JobInfo"]["TileJobTilesInX"] = instance.data.get("tilesX") + payload["JobInfo"]["TileJobTilesInY"] = instance.data.get("tilesY") + payload["PluginInfo"]["ImageHeight"] = instance.data.get("resolutionHeight") # noqa: E501 + payload["PluginInfo"]["ImageWidth"] = instance.data.get("resolutionWidth") # noqa: E501 + payload["PluginInfo"]["RegionRendering"] = True + + assembly_payload = { + "AuxFiles": [], + "JobInfo": { + "BatchName": payload["JobInfo"]["BatchName"], + "Frames": 0, + "Name": "{} - Tile Assembly Job".format( + payload["JobInfo"]["Name"]), + "OutputDirectory0": + payload["JobInfo"]["OutputDirectory0"].replace( + "\\", "/"), + "Plugin": self.tile_assembler_plugin, + "MachineLimit": 1 + }, + "PluginInfo": { + "CleanupTiles": 1, + "ErrorOnMissing": True + } + } + assembly_payload["JobInfo"].update(output_filenames) + assembly_payload["JobInfo"]["Priority"] = self._instance.data.get( + "priority", 50) + assembly_payload["JobInfo"]["UserName"] = deadline_user + + frame_payloads = [] + assembly_payloads = [] + + R_FRAME_NUMBER = re.compile(r".+\.(?P[0-9]+)\..+") # noqa: N806, E501 + REPL_FRAME_NUMBER = re.compile(r"(.+\.)([0-9]+)(\..+)") # noqa: N806, E501 + + if isinstance(exp[0], dict): + # we have aovs and we need to iterate over them + # get files from `beauty` + files = exp[0].get("beauty") + # assembly files are used for assembly jobs as we need to put + # together all AOVs + assembly_files = list( + itertools.chain.from_iterable( + [f for _, f in exp[0].items()])) + if not files: + # if beauty doesn't exists, use first aov we found + files = exp[0].get(list(exp[0].keys())[0]) + else: + files = exp + assembly_files = files + + frame_jobs = {} + + file_index = 1 + for file in files: + frame = re.search(R_FRAME_NUMBER, file).group("frame") + new_payload = copy.deepcopy(payload) + new_payload["JobInfo"]["Name"] = \ + "{} (Frame {} - {} tiles)".format( + payload["JobInfo"]["Name"], + frame, + instance.data.get("tilesX") * instance.data.get("tilesY") # noqa: E501 + ) + self.log.info( + "... preparing job {}".format( + new_payload["JobInfo"]["Name"])) + new_payload["JobInfo"]["TileJobFrame"] = frame + + tiles_data = _format_tiles( + file, 0, + instance.data.get("tilesX"), + instance.data.get("tilesY"), + instance.data.get("resolutionWidth"), + instance.data.get("resolutionHeight"), + payload["PluginInfo"]["OutputFilePrefix"] + )[0] + new_payload["JobInfo"].update(tiles_data["JobInfo"]) + new_payload["PluginInfo"].update(tiles_data["PluginInfo"]) + + job_hash = hashlib.sha256("{}_{}".format(file_index, file)) + frame_jobs[frame] = job_hash.hexdigest() + new_payload["JobInfo"]["ExtraInfo0"] = job_hash.hexdigest() + new_payload["JobInfo"]["ExtraInfo1"] = file + + frame_payloads.append(new_payload) + file_index += 1 + + file_index = 1 + for file in assembly_files: + frame = re.search(R_FRAME_NUMBER, file).group("frame") + new_assembly_payload = copy.deepcopy(assembly_payload) + new_assembly_payload["JobInfo"]["Name"] = \ + "{} (Frame {})".format( + assembly_payload["JobInfo"]["Name"], + frame) + new_assembly_payload["JobInfo"]["OutputFilename0"] = re.sub( + REPL_FRAME_NUMBER, + "\\1{}\\3".format("#" * len(frame)), file) + + new_assembly_payload["PluginInfo"]["Renderer"] = self._instance.data["renderer"] # noqa: E501 + new_assembly_payload["JobInfo"]["ExtraInfo0"] = frame_jobs[frame] # noqa: E501 + new_assembly_payload["JobInfo"]["ExtraInfo1"] = file + assembly_payloads.append(new_assembly_payload) + file_index += 1 + + self.log.info( + "Submitting tile job(s) [{}] ...".format(len(frame_payloads))) + + url = "{}/api/jobs".format(self._deadline_url) + tiles_count = instance.data.get("tilesX") * instance.data.get("tilesY") # noqa: E501 + + for tile_job in frame_payloads: + response = self._requests_post(url, json=tile_job) + if not response.ok: + raise Exception(response.text) + + job_id = response.json()["_id"] + hash = response.json()["Props"]["Ex0"] + + for assembly_job in assembly_payloads: + if assembly_job["JobInfo"]["ExtraInfo0"] == hash: + assembly_job["JobInfo"]["JobDependency0"] = job_id + + for assembly_job in assembly_payloads: + file = assembly_job["JobInfo"]["ExtraInfo1"] + # write assembly job config files + now = datetime.now() + + config_file = os.path.join( + os.path.dirname(output_filename_0), + "{}_config_{}.txt".format( + os.path.splitext(file)[0], + now.strftime("%Y_%m_%d_%H_%M_%S") + ) + ) + + try: + if not os.path.isdir(os.path.dirname(config_file)): + os.makedirs(os.path.dirname(config_file)) + except OSError: + # directory is not available + self.log.warning( + "Path is unreachable: `{}`".format( + os.path.dirname(config_file))) + + # add config file as job auxFile + assembly_job["AuxFiles"] = [config_file] + + with open(config_file, "w") as cf: + print("TileCount={}".format(tiles_count), file=cf) + print("ImageFileName={}".format(file), file=cf) + print("ImageWidth={}".format( + instance.data.get("resolutionWidth")), file=cf) + print("ImageHeight={}".format( + instance.data.get("resolutionHeight")), file=cf) + + tiles = _format_tiles( + file, 0, + instance.data.get("tilesX"), + instance.data.get("tilesY"), + instance.data.get("resolutionWidth"), + instance.data.get("resolutionHeight"), + payload["PluginInfo"]["OutputFilePrefix"] + )[1] + sorted(tiles) + for k, v in tiles.items(): + print("{}={}".format(k, v), file=cf) + + job_idx = 1 + instance.data["assemblySubmissionJobs"] = [] + for ass_job in assembly_payloads: + self.log.info("submitting assembly job {} of {}".format( + job_idx, len(assembly_payloads) + )) + self.log.debug(json.dumps(ass_job, indent=4, sort_keys=True)) + response = self._requests_post(url, json=ass_job) + if not response.ok: + raise Exception(response.text) + + instance.data["assemblySubmissionJobs"].append( + response.json()["_id"]) + job_idx += 1 + + instance.data["jobBatchName"] = payload["JobInfo"]["BatchName"] + self.log.info("Setting batch name on instance: {}".format( + instance.data["jobBatchName"])) + else: + # Submit job to farm -------------------------------------------- self.log.info("Submitting ...") self.log.debug(json.dumps(payload, indent=4, sort_keys=True)) @@ -426,11 +725,6 @@ def process(self, instance): if not response.ok: raise Exception(response.text) instance.data["deadlineSubmissionJob"] = response.json() - else: - self.log.info("Skipping submission, tile rendering enabled.") - - # Store output dir for unified publisher (filesequence) - instance.data["outputDir"] = os.path.dirname(output_filename_0) def _get_maya_payload(self, data): payload = copy.deepcopy(payload_skeleton) diff --git a/pype/plugins/maya/publish/validate_deadline_tile_submission.py b/pype/plugins/maya/publish/validate_deadline_tile_submission.py deleted file mode 100644 index b0b995de3e7..00000000000 --- a/pype/plugins/maya/publish/validate_deadline_tile_submission.py +++ /dev/null @@ -1,69 +0,0 @@ -# -*- coding: utf-8 -*- -"""Validate settings from Deadline Submitter. - -This is useful mainly for tile rendering, where jobs on farm are created by -submitter script from Maya. - -Unfortunately Deadline doesn't expose frame number for tiles job so that -cannot be validated, even if it is important setting. Also we cannot -determine if 'Region Rendering' (tile rendering) is enabled or not because -of the same thing. - -""" -import os - -from maya import mel -from maya import cmds - -import pyblish.api -from pype.hosts.maya import lib - - -class ValidateDeadlineTileSubmission(pyblish.api.InstancePlugin): - """Validate Deadline Submission settings are OK for tile rendering.""" - - label = "Validate Deadline Tile Submission" - order = pyblish.api.ValidatorOrder - hosts = ["maya"] - families = ["renderlayer"] - if not os.environ.get("DEADLINE_REST_URL"): - active = False - - def process(self, instance): - """Entry point.""" - # try if Deadline submitter was loaded - if mel.eval("exists SubmitJobToDeadline") == 0: - # if not, try to load it manually - try: - mel.eval("source DeadlineMayaClient;") - except RuntimeError: - raise AssertionError("Deadline Maya client cannot be loaded") - mel.eval("DeadlineMayaClient();") - assert mel.eval("exists SubmitJobToDeadline") == 1, ( - "Deadline Submission script cannot be initialized.") - if instance.data.get("tileRendering"): - job_name = cmds.getAttr("defaultRenderGlobals.deadlineJobName") - scene_name = os.path.splitext(os.path.basename( - instance.context.data.get("currentFile")))[0] - if job_name != scene_name: - self.log.warning(("Job submitted through Deadline submitter " - "has different name then current scene " - "{} / {}").format(job_name, scene_name)) - if cmds.getAttr("defaultRenderGlobals.deadlineTileSingleJob") == 1: - layer = instance.data['setMembers'] - anim_override = lib.get_attr_in_layer( - "defaultRenderGlobals.animation", layer=layer) - assert anim_override, ( - "Animation must be enabled in " - "Render Settings even when rendering single frame." - ) - - start_frame = cmds.getAttr("defaultRenderGlobals.startFrame") - end_frame = cmds.getAttr("defaultRenderGlobals.endFrame") - assert start_frame == end_frame, ( - "Start frame and end frame are not equals. When " - "'Submit All Tles As A Single Job' is selected, only " - "single frame is expected to be rendered. It must match " - "the one specified in Deadline Submitter under " - "'Region Rendering'" - )