-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
create_test_branch.py
410 lines (324 loc) · 15 KB
/
create_test_branch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
import argparse
import json
import os
import shutil
import subprocess
import time
from pathlib import Path
from git import GitCommandError, Head, Repo
from zipfile import ZipFile
from packaging.version import Version
from Tests.scripts.utils import logging_wrapper as logging
from Tests.scripts.utils.log_util import install_logging
from Utils.github_workflow_scripts.utils import get_env_var
from demisto_sdk.commands.common.constants import MarketplaceVersions
versions_dict = {}
pack_items_dict = {}
changed_packs = set()
GITLAB_SERVER_HOST = get_env_var('CI_SERVER_HOST', 'gitlab.xdr.pan.local') # disable-secrets-detection
GITLAB_PROJECT_NAMESPACE = get_env_var('CI_PROJECT_NAMESPACE', 'xdr/cortex-content') # disable-secrets-detection
# HELPER FUNCTIONS
def json_write(file_path: str, data: list | dict):
""" Writes given data to a json file
Args:
file_path: The file path
data: The data to write
"""
with open(file_path, "w") as f:
f.write(json.dumps(data, indent=4))
def get_pack_content_paths(pack_path: Path, marketplace=MarketplaceVersions.XSOAR.value):
"""
Gets a dict of all the paths of the given pack content items as it is in the bucket.
To get these paths we are running the `demisto-sdk prepare-content` command and saving the result
paths for each created item in the pack into a dict that will be saved in a file result `packs_items.json`.
Args:
pack_path (Path): The pack path.
Returns:
dict: The content items paths dict.
"""
create_artifacts_command = ['demisto-sdk', 'prepare-content', '-i', f'Packs/{pack_path.name}', '-o', '.']
if marketplace != 'xsoar':
create_artifacts_command.extend(['-mp', f'{marketplace}'])
try:
logging.debug(f"Running the SDK prepare-content command for pack {pack_path.name} - "
f"Command: `{' '.join(create_artifacts_command)}`")
res = subprocess.run(create_artifacts_command, capture_output=True, check=True)
logging.debug(f"Result from prepare-content - stdout: [{str(res.stdout)}] stderr: [{str(res.stderr)}]")
except subprocess.CalledProcessError as se:
logging.error(f'Subprocess exception: {se}. stderr: [{se.stderr}] stdout: [{se.stdout}]')
raise
pack_artifacts_path = f'./{pack_path.name}'
with ZipFile(f'{pack_path.name}.zip') as pack_artifacts_zip:
pack_artifacts_zip.extractall(pack_artifacts_path)
os.remove(f'{pack_path.name}.zip')
content_dict = {}
sub_dirs = os.listdir(pack_artifacts_path)
sub_dirs = [str(sub_dir) for sub_dir in sub_dirs if '.' not in str(sub_dir)]
for content_item_type in sub_dirs:
if content_item_type not in ['ReleaseNotes', 'TestPlaybooks']:
content_dict[content_item_type] = ['/'.join(p.parts[1:]) for p in Path(os.path.join(str(pack_artifacts_path),
content_item_type)).glob('*')]
shutil.rmtree(pack_artifacts_path)
return content_dict
def modify_item_path(item: Path, new_name: str):
"""
Modify item's path, in order to verify that the pack was uploaded again
"""
parent = item.parent
item.rename(parent.joinpath(new_name))
def get_current_version(pack: Path):
"""
Returns the current version of a pack
"""
metadata_json = pack / 'pack_metadata.json'
with metadata_json.open('r') as f:
base_metadata = json.load(f)
return base_metadata['currentVersion']
def create_new_branch(repo: Repo, new_branch_name: str) -> Head:
"""
Creates a new branch in a given repository
"""
branch = repo.create_head(new_branch_name)
branch.checkout()
logging.info(f"Created new branch {repo.active_branch}")
return branch
# TEST CHANGES FUNCTIONS
def add_changed_pack(func):
def wrapper(*args, **kwargs):
global changed_packs
global versions_dict
global pack_items_dict
logging.info(f'Running {func.__name__}')
pack, version, pack_items = func(*args, **kwargs)
changed_packs.add(pack)
versions_dict[str(pack.name)] = version
if pack_items:
pack_items_dict[str(pack.name)] = pack_items
logging.info(f"Done running {func.__name__} on pack {pack.name}")
return pack, version, pack_items
return wrapper
@add_changed_pack
def create_new_pack(pack_id: str):
"""
Creates a new pack with a given pack name
"""
content_path = Path(__file__).parent.parent.parent
source_path = Path(__file__).parent / pack_id
dest_path = content_path / 'Packs' / pack_id
if dest_path.exists():
shutil.rmtree(dest_path)
shutil.copytree(source_path, dest_path)
return dest_path, '1.0.0', get_pack_content_paths(dest_path, marketplace=MarketplaceVersions.XSOAR_SAAS.value)
@add_changed_pack
def add_dependency(base_pack: Path, new_depndency_pack: Path, mandatory: bool = True):
"""
Adds a new dependency to a given pack
"""
metadata_json = base_pack / 'pack_metadata.json'
with metadata_json.open('r') as fr:
base_metadata = json.load(fr)
new_pack_name = new_depndency_pack.name
base_metadata.setdefault('dependencies', {}).update({
new_pack_name: {
"mandatory": mandatory,
"display_name": new_pack_name
}
})
json_write(str(metadata_json), base_metadata)
enhance_release_notes(base_pack)
return base_pack, base_metadata['currentVersion'], None
@add_changed_pack
def enhance_release_notes(pack: Path):
"""
Bumping a new version for a given pack with release notes
"""
subprocess.call(['demisto-sdk', 'update-release-notes', '-i',
f'{pack}', "--force", '--text', 'testing adding new RN'], stdout=subprocess.DEVNULL)
return pack, get_current_version(pack), None
@add_changed_pack
def change_image(pack: Path):
"""
Changes an existing image of a given pack
"""
new_image = pack.parent / 'TestUploadFlow' / 'Integrations' / 'TestUploadFlow' / 'TestUploadFlow_image.png'
for p in Path(pack).glob('**/*.png'):
shutil.copy(new_image, p)
return pack, get_current_version(pack), None
@add_changed_pack
def update_existing_release_notes(pack: Path):
"""
Modifies an existing pack release notes
"""
latest_pack_version = str(max([Version(file_name.name.replace('.md', '').replace('_', '.'))
for file_name in (pack / 'ReleaseNotes').glob('*_*_*.md')]))
version_rn = latest_pack_version.replace('.', '_')
path = pack / 'ReleaseNotes' / f'{version_rn}.md'
if not path.exists():
raise Exception("path is not valid release note")
with path.open('w') as f:
f.write('testing modifying existing RN')
return pack, latest_pack_version, None
@add_changed_pack
def set_pack_hidden(pack: Path):
"""
Sets a given pack to hidden
"""
metadata_json = pack / 'pack_metadata.json'
with metadata_json.open('r') as f:
base_metadata = json.load(f)
base_metadata['hidden'] = True
with metadata_json.open('w') as f:
json.dump(base_metadata, f)
return pack, base_metadata['currentVersion'], None
@add_changed_pack
def update_readme(pack: Path):
"""
Updates a pack README file
"""
for path in pack.glob('**/*README.md'):
with path.open('a') as f:
f.write("readme test upload flow")
return pack, get_current_version(pack), None
@add_changed_pack
def create_failing_pack(pack: Path):
"""
Modify a pack such that the upload fails on it - bumping the pack version
without adding release notes.
"""
metadata_json = pack / 'pack_metadata.json'
with metadata_json.open('r') as f:
base_metadata = json.load(f)
splited_pack_version = base_metadata['currentVersion'].rsplit('.', 1)
base_metadata['currentVersion'] = '.'.join([splited_pack_version[0], str(int(splited_pack_version[1]) + 1)])
json_write(str(metadata_json), base_metadata)
return pack, base_metadata['currentVersion'], None
@add_changed_pack
def modify_pack(pack: Path, integration: str):
"""
Modify a pack regularly, in order to check if all packs items are uploaded correctly
"""
integration = pack / integration
with integration.open('a') as f:
f.write('\n# CHANGE IN PACK')
enhance_release_notes(pack)
return pack, get_current_version(pack), get_pack_content_paths(pack)
@add_changed_pack
def modify_pack_metadata(pack: Path):
"""
Modify a packmetadata file, in order to check that only the permitted fields have been changed in metadata.json
"""
metadata_json = pack / 'pack_metadata.json'
with metadata_json.open('r') as f:
base_metadata = json.load(f)
base_metadata['keywords'] = ["Mobile"]
with metadata_json.open('w') as f:
json.dump(base_metadata, f)
return pack, base_metadata['currentVersion'], None
@add_changed_pack
def modify_modeling_rules_path(modeling_rule: Path, old_name: str, new_name: str):
"""
Modify modeling rules path, in order to verify that the pack was uploaded correctly and that the path was changed
"""
modify_item_path(modeling_rule / f'{old_name}.xif', f'{new_name}.xif')
modify_item_path(modeling_rule / f'{old_name}.yml', f'{new_name}.yml')
modify_item_path(modeling_rule / f'{old_name}_schema.json', f'{new_name}_schema.json')
parent = modeling_rule.parent
pack_path = modeling_rule.parent.parent
modeling_rule.rename(parent.joinpath(new_name))
return pack_path, get_current_version(pack_path), get_pack_content_paths(pack_path,
marketplace=MarketplaceVersions.MarketplaceV2.value)
@add_changed_pack
def modify_script_path(script: Path, old_name: str, new_name: str):
"""
Modify script path, in order to verify that the pack was uploaded correctly and that the path was changed
"""
modify_item_path(script / f'{old_name}.py', f'{new_name}.py')
modify_item_path(script / f'{old_name}.yml', f'{new_name}.yml')
modify_item_path(script / f'{old_name}_test.py', f'{new_name}_test.py')
parent = script.parent
pack_path = script.parent.parent
script.rename(parent.joinpath(new_name))
return pack_path, get_current_version(pack_path), get_pack_content_paths(pack_path)
def do_changes_on_branch(packs_path: Path):
"""
Makes the test changes on the created branch
"""
# Case 1: Verify new pack - TestUploadFlow
new_pack_path, _, _ = create_new_pack(pack_id='TestUploadFlow')
# Case 2: Verify modified pack - Armorblox
modify_pack(packs_path / 'Armorblox', 'Integrations/Armorblox/Armorblox.py')
# Case 3: Verify dependencies handling - Armis
add_dependency(packs_path / 'Armis', new_pack_path)
# Case 4: Verify new version - ZeroFox
enhance_release_notes(packs_path / 'ZeroFox')
# Case 5: Verify modified existing release notes - Box
update_existing_release_notes(packs_path / 'Box')
# Case 6: Verify pack is set to hidden - Microsoft365Defender
set_pack_hidden(packs_path / 'Microsoft365Defender')
# Case 7: Verify changed readme - Maltiverse
update_readme(packs_path / 'Maltiverse')
# TODO: need to cause this pack to fail in another way because the current way cause validation to fail
# Case 8: Verify failing pack - Absolute
# create_failing_pack(packs_path / 'Absolute')
# Case 9: Verify changed image - Armis
change_image(packs_path / 'Armis')
# Case 10: Verify modified modeling rule path - AlibabaActionTrail
modify_modeling_rules_path(packs_path / 'AlibabaActionTrail/ModelingRules/AlibabaModelingRules',
'AlibabaModelingRules', 'Alibaba')
# Case 11: Verify script path - CortexXDR
modify_script_path(packs_path / 'CortexXDR/Scripts/XDRSyncScript',
'XDRSyncScript', 'XDRSyncScript_new_name')
# case 12: Verify setting hidden dependency does not add this dependency to the metadata - MicrosoftAdvancedThreatAnalytics
add_dependency(packs_path / 'MicrosoftAdvancedThreatAnalytics', packs_path / 'Microsoft365Defender',
mandatory=False)
# case 13: Verify new only-XSOAR pack uploaded only to XSOAR's bucket - TestUploadFlowXSOAR
create_new_pack(pack_id='TestUploadFlowXSOAR')
# case 14: Verify new only-XSOAR-SaaS pack uploaded only to XSOAR SAAS bucket - TestUploadFlowXsoarSaaS
create_new_pack(pack_id='TestUploadFlowXsoarSaaS')
# case 15: metadata changes (soft upload) - verify that only the permitted fields have been changed in metadata.json
modify_pack_metadata(packs_path / 'Zoom')
logging.info("Finished making test changes on the branch")
# MAIN FUNCTION
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--path", nargs="?", help="Content directory path, default is current directory.", default='.')
parser.add_argument("-cb", "--content-branch", nargs="?",
help="The content branch name, if empty will run on current branch.")
parser.add_argument("-tb", "--test-branch", nargs="?",
help="The content test branch name to create and test on.")
parser.add_argument("-a", "--artifacts_path", help="Path to store the script's output", default=".")
parser.add_argument("-g", "--gitlab-mirror-token", help="Gitlab mirror token for pushing commits "
"directly to gitlab repo")
return parser.parse_args()
def main():
install_logging('create_test_branch.log', logger=logging)
args = parse_arguments()
repo = Repo(args.path)
if args.content_branch:
original_branch = args.content_branch
repo.git.checkout(original_branch)
else:
original_branch = repo.active_branch
try:
new_branch_name = args.test_branch if args.test_branch else f"{original_branch}_upload_test_branch_{time.time()}"
content_path = Path(__file__).parent.parent.parent
packs_path = content_path / 'Packs'
branch = create_new_branch(repo, new_branch_name)
logging.info(f"Starts doing test changes on branch '{branch.name}'")
do_changes_on_branch(packs_path)
for p in changed_packs:
repo.git.add(f"{p}/*")
repo.git.commit(m="Added Test file", no_verify=True)
repo.git.push('--set-upstream',
f'https://GITLAB_PUSH_TOKEN:{args.gitlab_mirror_token}@' # disable-secrets-detection
f'{GITLAB_SERVER_HOST}/{GITLAB_PROJECT_NAMESPACE}/content.git', # disable-secrets-detection
branch, push_option="ci.skip") # disable-secrets-detection
logging.info("Successfully pushed the branch to GitLab content repo")
except GitCommandError as e:
logging.error(e)
finally:
repo.git.checkout(original_branch)
json_write(os.path.join(args.artifacts_path, 'packs_items.json'), pack_items_dict)
json_write(os.path.join(args.artifacts_path, 'versions_dict.json'), versions_dict)
if __name__ == "__main__":
main()