diff --git a/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py b/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py index b48f58338f6f3..325808b7047b5 100644 --- a/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/ci_image_commands.py @@ -232,7 +232,9 @@ def build_image( """Build CI image. Include building multiple images for all python versions (sequentially).""" def run_build(ci_image_params: BuildCiParams) -> None: - return_code, info = build_ci_image(verbose=verbose, dry_run=dry_run, ci_image_params=ci_image_params) + return_code, info = build_ci_image( + verbose=verbose, dry_run=dry_run, ci_image_params=ci_image_params, parallel=False + ) if return_code != 0: get_console().print(f"[error]Error when building image! {info}") sys.exit(return_code) @@ -423,7 +425,9 @@ def should_we_run_the_build(build_ci_params: BuildCiParams) -> bool: sys.exit(1) -def build_ci_image(verbose: bool, dry_run: bool, ci_image_params: BuildCiParams) -> Tuple[int, str]: +def build_ci_image( + verbose: bool, dry_run: bool, ci_image_params: BuildCiParams, parallel: bool +) -> Tuple[int, str]: """ Builds CI image: @@ -440,6 +444,7 @@ def build_ci_image(verbose: bool, dry_run: bool, ci_image_params: BuildCiParams) :param verbose: print commands when running :param dry_run: do not execute "write" commands - just print what would happen :param ci_image_params: CI image parameters + :param parallel: whether the pull is run as part of parallel execution """ if ( ci_image_params.is_multi_platform() @@ -463,7 +468,9 @@ def build_ci_image(verbose: bool, dry_run: bool, ci_image_params: BuildCiParams) if ci_image_params.prepare_buildx_cache or ci_image_params.push_image: login_to_github_docker_registry(image_params=ci_image_params, dry_run=dry_run, verbose=verbose) if ci_image_params.prepare_buildx_cache: - build_command_result = build_cache(image_params=ci_image_params, dry_run=dry_run, verbose=verbose) + build_command_result = build_cache( + image_params=ci_image_params, dry_run=dry_run, verbose=verbose, parallel=parallel + ) else: if ci_image_params.empty_image: env = os.environ.copy() @@ -477,6 +484,7 @@ def build_ci_image(verbose: bool, dry_run: bool, ci_image_params: BuildCiParams) cwd=AIRFLOW_SOURCES_ROOT, text=True, env=env, + enabled_output_group=not parallel, ) else: get_console().print(f"\n[info]Building CI Image for Python {ci_image_params.python}\n") @@ -490,6 +498,7 @@ def build_ci_image(verbose: bool, dry_run: bool, ci_image_params: BuildCiParams) cwd=AIRFLOW_SOURCES_ROOT, text=True, check=False, + enabled_output_group=not parallel, ) if build_command_result.returncode == 0: if ci_image_params.tag_as_latest: @@ -540,4 +549,4 @@ def rebuild_ci_image_if_needed( 'Forcing build.[/]' ) ci_image_params.force_build = True - build_ci_image(verbose, dry_run=dry_run, ci_image_params=ci_image_params) + build_ci_image(verbose, dry_run=dry_run, ci_image_params=ci_image_params, parallel=False) diff --git a/dev/breeze/src/airflow_breeze/commands/production_image_commands.py b/dev/breeze/src/airflow_breeze/commands/production_image_commands.py index 544095f0b2af8..5418eac696a39 100644 --- a/dev/breeze/src/airflow_breeze/commands/production_image_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/production_image_commands.py @@ -362,6 +362,7 @@ def pull_prod_image( wait_for_image=wait_for_image, tag_as_latest=tag_as_latest, poll_time=10.0, + parallel=False, ) if return_code != 0: get_console().print(f"[error]There was an error when pulling PROD image: {info}[/]") @@ -503,7 +504,9 @@ def build_production_image( login_to_github_docker_registry(image_params=prod_image_params, dry_run=dry_run, verbose=verbose) get_console().print(f"\n[info]Building PROD Image for Python {prod_image_params.python}\n") if prod_image_params.prepare_buildx_cache: - build_command_result = build_cache(image_params=prod_image_params, dry_run=dry_run, verbose=verbose) + build_command_result = build_cache( + image_params=prod_image_params, dry_run=dry_run, verbose=verbose, parallel=False + ) else: if prod_image_params.empty_image: env = os.environ.copy() diff --git a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py index 1e5b8286ff5ba..ba40e373a8900 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py @@ -181,6 +181,7 @@ def run_with_debug( dry_run: bool, debug: bool, enable_input: bool = False, + enabled_output_group: bool = False, ) -> RunCommandResult: env_variables = get_env_variables_for_docker_commands(params) extra_docker_flags = get_extra_docker_flags(mount_sources=params.mount_sources) @@ -216,10 +217,18 @@ def run_with_debug( verbose=verbose, dry_run=dry_run, env=env_variables, + enabled_output_group=enabled_output_group, ) else: base_command.extend(command) - return run_command(base_command, verbose=verbose, dry_run=dry_run, env=env_variables, check=False) + return run_command( + base_command, + enabled_output_group=enabled_output_group, + verbose=verbose, + dry_run=dry_run, + env=env_variables, + check=False, + ) @main.command( @@ -354,7 +363,7 @@ def prepare_provider_packages( def run_generate_constraints( - shell_params: ShellParams, dry_run: bool, verbose: bool, debug: bool + shell_params: ShellParams, dry_run: bool, verbose: bool, debug: bool, parallel: bool = False ) -> Tuple[int, str]: cmd_to_run = [ "/opt/airflow/scripts/in_container/run_generate_constraints.sh", @@ -365,6 +374,7 @@ def run_generate_constraints( verbose=verbose, dry_run=dry_run, debug=debug, + enabled_output_group=not parallel, ) return ( generate_constraints_result.returncode, @@ -388,7 +398,7 @@ def run_generate_constraints_in_parallel( results = [ pool.apply_async( run_generate_constraints, - args=(shell_param, dry_run, verbose, False), + args=(shell_param, dry_run, verbose, False, True), ) for shell_param in shell_params_list ] @@ -486,6 +496,7 @@ def generate_constraints( dry_run=dry_run, verbose=verbose, debug=debug, + parallel=False, ) if return_code != 0: get_console().print(f"[error]There was an error when generating constraints: {info}[/]") diff --git a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py index e838e380f781d..05bcc11d6cb87 100644 --- a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py @@ -460,15 +460,19 @@ def prepare_docker_build_from_input( return ["docker", "build", "-t", image_params.airflow_image_name_with_tag, "-"] -def build_cache(image_params: CommonBuildParams, dry_run: bool, verbose: bool) -> RunCommandResult: +def build_cache( + image_params: CommonBuildParams, dry_run: bool, verbose: bool, parallel: bool +) -> RunCommandResult: build_command_result: Union[CompletedProcess, CalledProcessError] = CompletedProcess( args=[], returncode=0 ) cmd = ['docker', 'buildx', 'inspect', 'airflow_cache'] - buildx_command_result = run_command(cmd, verbose=verbose, dry_run=dry_run, text=True, check=False) + buildx_command_result = run_command( + cmd, verbose=verbose, dry_run=dry_run, text=True, check=False, enabled_output_group=not parallel + ) if buildx_command_result and buildx_command_result.returncode != 0: next_cmd = ['docker', 'buildx', 'create', '--name', 'airflow_cache'] - run_command(next_cmd, verbose=verbose, text=True, check=False) + run_command(next_cmd, verbose=verbose, text=True, check=False, enabled_output_group=not parallel) for platform in image_params.platforms: platform_image_params = deepcopy(image_params) # override the platform in the copied params to only be single platform per run @@ -476,7 +480,13 @@ def build_cache(image_params: CommonBuildParams, dry_run: bool, verbose: bool) - platform_image_params.platform = platform cmd = prepare_docker_build_cache_command(image_params=platform_image_params) build_command_result = run_command( - cmd, verbose=verbose, dry_run=dry_run, cwd=AIRFLOW_SOURCES_ROOT, check=False, text=True + cmd, + verbose=verbose, + dry_run=dry_run, + cwd=AIRFLOW_SOURCES_ROOT, + check=False, + text=True, + enabled_output_group=not parallel, ) if build_command_result.returncode != 0: break diff --git a/dev/breeze/src/airflow_breeze/utils/image.py b/dev/breeze/src/airflow_breeze/utils/image.py index 80da7f75c5273..5030bd669ab37 100644 --- a/dev/breeze/src/airflow_breeze/utils/image.py +++ b/dev/breeze/src/airflow_breeze/utils/image.py @@ -57,7 +57,8 @@ def run_pull_in_parallel( if not verify_image: results = [ pool.apply_async( - run_pull_image, args=(image_param, dry_run, verbose, wait_for_image, tag_as_latest, poll_time) + run_pull_image, + args=(image_param, dry_run, verbose, wait_for_image, tag_as_latest, poll_time, True), ) for image_param in image_params_list ] @@ -88,6 +89,7 @@ def run_pull_image( wait_for_image: bool, tag_as_latest: bool, poll_time: float, + parallel: bool = False, ) -> Tuple[int, str]: """ Pull image specified. @@ -97,6 +99,7 @@ def run_pull_image( :param wait_for_image: whether we should wait for the image to be available :param tag_as_latest: tag the image as latest :param poll_time: what's the polling time between checks if images are there + :param parallel: whether the pull is run as part of parallel execution :return: Tuple of return code and description of the image pulled """ get_console().print( @@ -112,6 +115,7 @@ def run_pull_image( verbose=verbose, dry_run=dry_run, check=False, + enabled_output_group=not parallel, ) if command_result.returncode == 0: command_result = run_command( @@ -121,6 +125,7 @@ def run_pull_image( dry_run=dry_run, text=True, check=False, + enabled_output_group=not parallel, ) if not dry_run: if command_result.returncode == 0: diff --git a/dev/breeze/src/airflow_breeze/utils/run_utils.py b/dev/breeze/src/airflow_breeze/utils/run_utils.py index f228cc657cf60..4c2f5b57f3f23 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/run_utils.py @@ -46,6 +46,7 @@ def run_command( env: Optional[Mapping[str, str]] = None, cwd: Optional[Path] = None, input: Optional[str] = None, + enabled_output_group: bool = False, **kwargs, ) -> RunCommandResult: """ @@ -68,25 +69,26 @@ def run_command( :param env: mapping of environment variables to set for the run command :param cwd: working directory to set for the command :param input: input string to pass to stdin of the process + :param enabled_output_group: if set to true, in CI the logs will be placed in separate, foldable group. :param kwargs: kwargs passed to POpen """ + if not title: + # Heuristics to get a short but explanatory title showing what the command does + # If title is not provided explicitly + title = ' '.join( + shlex.quote(c) + for c in cmd + if not c.startswith('-') # exclude options + and len(c) > 0 + and (c[0] != "/" or c.endswith(".sh")) # exclude volumes + and not c == "never" # exclude --pull never + and not match(r"^[A-Z_]*=.*$", c) + ) workdir: str = str(cwd) if cwd else os.getcwd() if verbose or dry_run: command_to_print = ' '.join(shlex.quote(c) for c in cmd) - if not title: - # Heuristics to get a short but explanatory title showing what the command does - # If title is not provided explicitly - title = ' '.join( - shlex.quote(c) - for c in cmd - if not c.startswith('-') # exclude options - and len(c) > 0 - and (c[0] != "/" or c.endswith(".sh")) # exclude volumes - and not c == "never" # exclude --pull never - and not match(r"^[A-Z_]*=.*$", c) - ) env_to_print = get_environments_to_print(env) - with ci_group(title=f"Running {title}"): + with ci_group(title=f"Running {title}", enabled=enabled_output_group): get_console().print(f"\n[info]Working directory {workdir} [/]\n") # Soft wrap allows to copy&paste and run resulting output as it has no hard EOL get_console().print(f"\n[info]{env_to_print}{command_to_print}[/]\n", soft_wrap=True) @@ -96,7 +98,8 @@ def run_command( cmd_env = os.environ.copy() if env: cmd_env.update(env) - return subprocess.run(cmd, input=input, check=check, env=cmd_env, cwd=workdir, **kwargs) + with ci_group(title=f"Output of {title}", enabled=enabled_output_group): + return subprocess.run(cmd, input=input, check=check, env=cmd_env, cwd=workdir, **kwargs) except subprocess.CalledProcessError as ex: if not no_output_dump_on_exception: if ex.stdout: