From 4bb57e6723b7a5c2eb685536e5a293aea87bd3a1 Mon Sep 17 00:00:00 2001 From: Connor McCarthy Date: Tue, 22 Nov 2022 19:38:48 -0500 Subject: [PATCH] fix(sdk): fix multiple workers writing with gcsfuse bug [KFP SDK v2] (#8455) * fix(sdk): fix multiple workers writing with gcsfuse bug [KFP SDK v1] * add comment for context --- sdk/python/kfp/components/executor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/python/kfp/components/executor.py b/sdk/python/kfp/components/executor.py index bf443ad5c72..6f37e832aff 100644 --- a/sdk/python/kfp/components/executor.py +++ b/sdk/python/kfp/components/executor.py @@ -264,12 +264,12 @@ def _write_executor_output(self, func_output: Optional[Any] = None): ' subclass of `Artifact`, or a NamedTuple collection of these types.' .format(self._return_annotation)) - import os - os.makedirs( - os.path.dirname(self._input['outputs']['outputFile']), - exist_ok=True) - with open(self._input['outputs']['outputFile'], 'w') as f: - f.write(json.dumps(self._executor_output)) + executor_output_path = self._input['outputs']['outputFile'] + # This check is to reduce the likelihood that two or more workers (in a distributed training/compute strategy) attempt to write to the same executor output file at the same time using gcsfuse. Do not remove until fixed by gcsfuse. + if not os.path.exists(executor_output_path): + os.makedirs(os.path.dirname(executor_output_path), exist_ok=True) + with open(executor_output_path, 'w') as f: + f.write(json.dumps(self._executor_output)) def execute(self): annotations = inspect.getfullargspec(self._func).annotations