From 789ef634022f39ee2126fcf541eaf99edfd806b7 Mon Sep 17 00:00:00 2001 From: Xincun Li <147451452+xincunli-sonic@users.noreply.github.com> Date: Tue, 9 Jul 2024 13:03:04 -0700 Subject: [PATCH] Add Parallel option for apply-patch (#3373) * Add Parallel option for apply-patch * fix format * fix format * Add UT to check if parallel option ran as expected. * fix format. * Remove Dry Run. * add parallel run checker * Modify to number of asics * Modify UT. --- config/main.py | 47 +++++++-- tests/config_test.py | 244 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 276 insertions(+), 15 deletions(-) diff --git a/config/main.py b/config/main.py index 83637c1421..709c96402a 100644 --- a/config/main.py +++ b/config/main.py @@ -1,6 +1,8 @@ #!/usr/sbin/env python +import threading import click +import concurrent.futures import datetime import ipaddress import json @@ -1212,6 +1214,11 @@ def multiasic_save_to_singlefile(db, filename): with open(filename, 'w') as file: json.dump(all_current_config, file, indent=4) + +def apply_patch_wrapper(args): + return apply_patch_for_scope(*args) + + # Function to apply patch for a single ASIC. def apply_patch_for_scope(scope_changes, results, config_format, verbose, dry_run, ignore_non_yang_tables, ignore_path): scope, changes = scope_changes @@ -1220,16 +1227,19 @@ def apply_patch_for_scope(scope_changes, results, config_format, verbose, dry_ru scope = multi_asic.DEFAULT_NAMESPACE scope_for_log = scope if scope else HOST_NAMESPACE + thread_id = threading.get_ident() + log.log_notice(f"apply_patch_for_scope started for {scope_for_log} by {changes} in thread:{thread_id}") + try: # Call apply_patch with the ASIC-specific changes and predefined parameters GenericUpdater(scope=scope).apply_patch(jsonpatch.JsonPatch(changes), - config_format, - verbose, - dry_run, - ignore_non_yang_tables, - ignore_path) + config_format, + verbose, + dry_run, + ignore_non_yang_tables, + ignore_path) results[scope_for_log] = {"success": True, "message": "Success"} - log.log_notice(f"'apply-patch' executed successfully for {scope_for_log} by {changes}") + log.log_notice(f"'apply-patch' executed successfully for {scope_for_log} by {changes} in thread:{thread_id}") except Exception as e: results[scope_for_log] = {"success": False, "message": str(e)} log.log_error(f"'apply-patch' executed failed for {scope_for_log} by {changes} due to {str(e)}") @@ -1549,11 +1559,12 @@ def print_dry_run_message(dry_run): help='format of config of the patch is either ConfigDb(ABNF) or SonicYang', show_default=True) @click.option('-d', '--dry-run', is_flag=True, default=False, help='test out the command without affecting config state') +@click.option('-p', '--parallel', is_flag=True, default=False, help='applying the change to all ASICs parallelly') @click.option('-n', '--ignore-non-yang-tables', is_flag=True, default=False, help='ignore validation for tables without YANG models', hidden=True) @click.option('-i', '--ignore-path', multiple=True, help='ignore validation for config specified by given path which is a JsonPointer', hidden=True) @click.option('-v', '--verbose', is_flag=True, default=False, help='print additional details of what the operation is doing') @click.pass_context -def apply_patch(ctx, patch_file_path, format, dry_run, ignore_non_yang_tables, ignore_path, verbose): +def apply_patch(ctx, patch_file_path, format, dry_run, parallel, ignore_non_yang_tables, ignore_path, verbose): """Apply given patch of updates to Config. A patch is a JsonPatch which follows rfc6902. This command can be used do partial updates to the config with minimum disruption to running processes. It allows addition as well as deletion of configs. The patch file represents a diff of ConfigDb(ABNF) @@ -1599,8 +1610,26 @@ def apply_patch(ctx, patch_file_path, format, dry_run, ignore_non_yang_tables, i changes_by_scope[asic] = [] # Apply changes for each scope - for scope_changes in changes_by_scope.items(): - apply_patch_for_scope(scope_changes, results, config_format, verbose, dry_run, ignore_non_yang_tables, ignore_path) + if parallel: + with concurrent.futures.ThreadPoolExecutor() as executor: + # Prepare the argument tuples + arguments = [(scope_changes, results, config_format, + verbose, dry_run, ignore_non_yang_tables, ignore_path) + for scope_changes in changes_by_scope.items()] + + # Submit all tasks and wait for them to complete + futures = [executor.submit(apply_patch_wrapper, args) for args in arguments] + + # Wait for all tasks to complete + concurrent.futures.wait(futures) + else: + for scope_changes in changes_by_scope.items(): + apply_patch_for_scope(scope_changes, + results, + config_format, + verbose, dry_run, + ignore_non_yang_tables, + ignore_path) # Check if any updates failed failures = [scope for scope, result in results.items() if not result['success']] diff --git a/tests/config_test.py b/tests/config_test.py index db62bf3249..748d434fc2 100644 --- a/tests/config_test.py +++ b/tests/config_test.py @@ -3229,6 +3229,199 @@ def test_apply_patch_multiasic(self): @patch('config.main.validate_patch', mock.Mock(return_value=True)) def test_apply_patch_dryrun_multiasic(self): + # Mock open to simulate file reading + with patch('builtins.open', mock_open(read_data=json.dumps(self.patch_content)), create=True) as mocked_open: + # Mock GenericUpdater to avoid actual patch application + with patch('config.main.GenericUpdater') as mock_generic_updater: + mock_generic_updater.return_value.apply_patch = MagicMock() + + # Mock ConfigDBConnector to ensure it's not called during dry-run + with patch('config.main.ConfigDBConnector') as mock_config_db_connector: + + print("Multi ASIC: {}".format(multi_asic.is_multi_asic())) + # Invocation of the command with the CliRunner + result = self.runner.invoke(config.config.commands["apply-patch"], + [self.patch_file_path, + "--format", ConfigFormat.SONICYANG.name, + "--dry-run", + "--ignore-non-yang-tables", + "--ignore-path", "/ANY_TABLE", + "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", + "--ignore-path", "", + "--verbose"], + catch_exceptions=False) + + print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) + # Assertions and verifications + self.assertEqual(result.exit_code, 0, "Command should succeed") + self.assertIn("Patch applied successfully.", result.output) + + # Verify mocked_open was called as expected + mocked_open.assert_called_with(self.patch_file_path, 'r') + + # Ensure ConfigDBConnector was never instantiated or called + mock_config_db_connector.assert_not_called() + + @patch('config.main.validate_patch', mock.Mock(return_value=True)) + @patch('config.main.concurrent.futures.wait', autospec=True) + def test_apply_patch_dryrun_parallel_multiasic(self, MockThreadPoolWait): + # Mock open to simulate file reading + with patch('builtins.open', mock_open(read_data=json.dumps(self.patch_content)), create=True) as mocked_open: + # Mock GenericUpdater to avoid actual patch application + with patch('config.main.GenericUpdater') as mock_generic_updater: + mock_generic_updater.return_value.apply_patch = MagicMock() + + # Mock ConfigDBConnector to ensure it's not called during dry-run + with patch('config.main.ConfigDBConnector') as mock_config_db_connector: + + print("Multi ASIC: {}".format(multi_asic.is_multi_asic())) + # Invocation of the command with the CliRunner + result = self.runner.invoke(config.config.commands["apply-patch"], + [self.patch_file_path, + "--format", ConfigFormat.SONICYANG.name, + "--dry-run", + "--parallel", + "--ignore-non-yang-tables", + "--ignore-path", "/ANY_TABLE", + "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", + "--ignore-path", "", + "--verbose"], + catch_exceptions=False) + + print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) + # Assertions and verifications + self.assertEqual(result.exit_code, 0, "Command should succeed") + self.assertIn("Patch applied successfully.", result.output) + + # Assertions to check if ThreadPoolExecutor was used correctly + MockThreadPoolWait.assert_called_once() + + # Verify mocked_open was called as expected + mocked_open.assert_called_with(self.patch_file_path, 'r') + + # Ensure ConfigDBConnector was never instantiated or called + mock_config_db_connector.assert_not_called() + + @patch('config.main.validate_patch', mock.Mock(return_value=True)) + @patch('config.main.concurrent.futures.wait', autospec=True) + def test_apply_patch_check_running_in_parallel_multiasic(self, MockThreadPoolWait): + # Mock open to simulate file reading + with patch('builtins.open', mock_open(read_data=json.dumps(self.patch_content)), create=True) as mocked_open: + # Mock GenericUpdater to avoid actual patch application + with patch('config.main.GenericUpdater') as mock_generic_updater: + mock_generic_updater.return_value.apply_patch = MagicMock() + + # Mock ConfigDBConnector to ensure it's not called during dry-run + with patch('config.main.ConfigDBConnector') as mock_config_db_connector: + + print("Multi ASIC: {}".format(multi_asic.is_multi_asic())) + # Invocation of the command with the CliRunner + result = self.runner.invoke(config.config.commands["apply-patch"], + [self.patch_file_path, + "--format", ConfigFormat.SONICYANG.name, + "--parallel", + "--ignore-non-yang-tables", + "--ignore-path", "/ANY_TABLE", + "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", + "--ignore-path", "", + "--verbose"], + catch_exceptions=False) + + print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) + # Assertions and verifications + self.assertEqual(result.exit_code, 0, "Command should succeed") + self.assertIn("Patch applied successfully.", result.output) + + # Assertions to check if ThreadPoolExecutor was used correctly + MockThreadPoolWait.assert_called_once() + + # Verify mocked_open was called as expected + mocked_open.assert_called_with(self.patch_file_path, 'r') + + # Ensure ConfigDBConnector was never instantiated or called + mock_config_db_connector.assert_not_called() + + @patch('config.main.validate_patch', mock.Mock(return_value=True)) + @patch('config.main.apply_patch_wrapper') + def test_apply_patch_check_apply_call_parallel_multiasic(self, mock_apply_patch): + # Mock open to simulate file reading + with patch('builtins.open', mock_open(read_data=json.dumps(self.patch_content)), create=True) as mocked_open: + # Mock GenericUpdater to avoid actual patch application + with patch('config.main.GenericUpdater') as mock_generic_updater: + mock_generic_updater.return_value.apply_patch = MagicMock() + + # Mock ConfigDBConnector to ensure it's not called during dry-run + with patch('config.main.ConfigDBConnector') as mock_config_db_connector: + + print("Multi ASIC: {}".format(multi_asic.is_multi_asic())) + # Invocation of the command with the CliRunner + result = self.runner.invoke(config.config.commands["apply-patch"], + [self.patch_file_path, + "--format", ConfigFormat.SONICYANG.name, + "--parallel", + "--ignore-non-yang-tables", + "--ignore-path", "/ANY_TABLE", + "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", + "--ignore-path", "", + "--verbose"], + catch_exceptions=False) + + print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) + # Assertions and verifications + self.assertEqual(result.exit_code, 0, "Command should succeed") + self.assertIn("Patch applied successfully.", result.output) + + # Assertions to check if ThreadPoolExecutor was used correctly + self.assertEqual(mock_apply_patch.call_count, + multi_asic.get_num_asics() + 1, + "apply_patch_wrapper function should be called number of ASICs plus host times") + + # Verify mocked_open was called as expected + mocked_open.assert_called_with(self.patch_file_path, 'r') + + # Ensure ConfigDBConnector was never instantiated or called + mock_config_db_connector.assert_not_called() + + @patch('config.main.validate_patch', mock.Mock(return_value=True)) + @patch('config.main.concurrent.futures.wait', autospec=True) + def test_apply_patch_check_running_in_not_parallel_multiasic(self, MockThreadPoolWait): + # Mock open to simulate file reading + with patch('builtins.open', mock_open(read_data=json.dumps(self.patch_content)), create=True) as mocked_open: + # Mock GenericUpdater to avoid actual patch application + with patch('config.main.GenericUpdater') as mock_generic_updater: + mock_generic_updater.return_value.apply_patch = MagicMock() + + # Mock ConfigDBConnector to ensure it's not called during dry-run + with patch('config.main.ConfigDBConnector') as mock_config_db_connector: + + print("Multi ASIC: {}".format(multi_asic.is_multi_asic())) + # Invocation of the command with the CliRunner + result = self.runner.invoke(config.config.commands["apply-patch"], + [self.patch_file_path, + "--format", ConfigFormat.SONICYANG.name, + "--ignore-non-yang-tables", + "--ignore-path", "/ANY_TABLE", + "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", + "--ignore-path", "", + "--verbose"], + catch_exceptions=False) + + print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) + # Assertions and verifications + self.assertEqual(result.exit_code, 0, "Command should succeed") + self.assertIn("Patch applied successfully.", result.output) + + # Assertions to check if ThreadPoolExecutor was used correctly + MockThreadPoolWait.assert_not_called() + + # Verify mocked_open was called as expected + mocked_open.assert_called_with(self.patch_file_path, 'r') + + # Ensure ConfigDBConnector was never instantiated or called + mock_config_db_connector.assert_not_called() + + @patch('config.main.validate_patch', mock.Mock(return_value=True)) + def test_apply_patch_parallel_with_error_multiasic(self): # Mock open to simulate file reading with patch('builtins.open', mock_open(read_data=json.dumps(self.patch_content)), create=True) as mocked_open: # Mock GenericUpdater to avoid actual patch application @@ -3243,12 +3436,13 @@ def test_apply_patch_dryrun_multiasic(self): result = self.runner.invoke(config.config.commands["apply-patch"], [self.patch_file_path, "--format", ConfigFormat.SONICYANG.name, - "--dry-run", - "--ignore-non-yang-tables", - "--ignore-path", "/ANY_TABLE", - "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", - "--ignore-path", "", - "--verbose"], + "--dry-run", + "--parallel", + "--ignore-non-yang-tables", + "--ignore-path", "/ANY_TABLE", + "--ignore-path", "/ANY_OTHER_TABLE/ANY_FIELD", + "--ignore-path", "", + "--verbose"], catch_exceptions=False) print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) @@ -3326,6 +3520,44 @@ def test_apply_patch_validate_patch_with_badpath_multiasic(self, mock_subprocess # Verify mocked_open was called as expected mocked_open.assert_called_with(self.patch_file_path, 'r') + @patch('config.main.subprocess.Popen') + @patch('config.main.SonicYangCfgDbGenerator.validate_config_db_json', mock.Mock(return_value=True)) + def test_apply_patch_parallel_badpath_multiasic(self, mock_subprocess_popen): + mock_instance = MagicMock() + mock_instance.communicate.return_value = (json.dumps(self.all_config), 0) + mock_subprocess_popen.return_value = mock_instance + + bad_patch = copy.deepcopy(self.patch_content) + bad_patch.append({ + "value": { + "policy_desc": "New ACL Table", + "ports": ["Ethernet3", "Ethernet4"], + "stage": "ingress", + "type": "L3" + } + }) + + # Mock open to simulate file reading + with patch('builtins.open', mock_open(read_data=json.dumps(bad_patch)), create=True) as mocked_open: + # Mock GenericUpdater to avoid actual patch application + with patch('config.main.GenericUpdater') as mock_generic_updater: + mock_generic_updater.return_value.apply_patch = MagicMock() + + print("Multi ASIC: {}".format(multi_asic.is_multi_asic())) + # Invocation of the command with the CliRunner + result = self.runner.invoke(config.config.commands["apply-patch"], + [self.patch_file_path, + "--parallel"], + catch_exceptions=True) + + print("Exit Code: {}, output: {}".format(result.exit_code, result.output)) + # Assertions and verifications + self.assertNotEqual(result.exit_code, 0, "Command should failed.") + self.assertIn("Failed to apply patch", result.output) + + # Verify mocked_open was called as expected + mocked_open.assert_called_with(self.patch_file_path, 'r') + @patch('config.main.subprocess.Popen') @patch('config.main.SonicYangCfgDbGenerator.validate_config_db_json', mock.Mock(return_value=True)) def test_apply_patch_validate_patch_with_wrong_fetch_config(self, mock_subprocess_popen):