-
Notifications
You must be signed in to change notification settings - Fork 0
/
plausible_data_gen.py
589 lines (513 loc) · 29.3 KB
/
plausible_data_gen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
"""Plausible metadata and data generator.
author: Marion Shadbolt @mshadbolt
Takes as input a set of simulated structured metadata json files and a table of plausible distributions for numerical
metadata and replaces random simulated metadata values with more realistic values from each distribution.
Optionally copies dummy files of each type and makes naming consistent with the relevant file objects.
Assumes you have already simulated a set of structured metadata using Gen3 data-simulator tool or umccr-dictionary tool.
Intended as a proof of concept demonstrator and would need to be customised to your own data dictionary to generate
relevant metadata and data files suitable to your data commons.
Will overwrite any files that have already been generated to the same locations in edited_jsons/dict_name and
dummy_files/dict_name.
Typical usage:
python plausible_data_gen.py \
--path /path/to/simulated/files
--gurl https://docs.google.com/spreadsheets/d/1G5mVh0KGR4PvXEr1Q-Mg68bEkv8N_Usl92dCmj1yeAk/edit#gid=417452728
Assumptions:
Within the method `` there is an attempt to make the month and year of birth consistent with the age. This will not
work unless exact same field names are used, i.e. baseline_age, month_birth, year_birth
It also attempts to make cigarettes_per_day consistent with smoking_status, this also relies on exact field name
matching.
It does not do any other consistency checking or checking between object consistency.
File types and linked assays are hard-coded so if objects have different names or different structure, file
generation won't work.
"""
import datetime
import json
import argparse
import glob
import os
from datetime import datetime
from datetime import timedelta
import random
import string
import pandas as pd
import numpy as np
import shutil
import logging
from typing_extensions import Literal
def parse_arguments():
"""Parse all command line arguments.
"""
parser = argparse.ArgumentParser("Replace random numbers with plausible values in gen3 simulated metadata files.")
parser.add_argument('--path', type=str, action='store',
help="Path to the directory where the pre-simulated structured metadata is located. Generated by "
"umccr-dictionary or Gen3's data-simulator")
parser.add_argument('--values', type=str, action='store',
help="Path to table defining distribution of plausible values. Either this or gurl should be "
"specified")
parser.add_argument('--name', type=str, action='store', required=False,
help="Name of the dictionary you are generating data for. If not specified will guess "
"from the path (optional).")
parser.add_argument('--gurl', type=str, action='store', required=False,
help="The url of the google sheet with plausible values for variables. Either this or values "
"arg should be specified.")
parser.add_argument('--generate-files', action='store_true', default=False, required=False,
help="If specified, dummy text files will be generated for data_files. False by default")
parser.add_argument('--file-types', action='store', nargs="*",
default=["aligned_reads", "variant", "metabolomics", "proteomics", "lipidomics", "serum_marker"],
help="Space separated list of file types, must be one or more of aligned_reads, variant, "
"metabolomics, proteomics, lipidomics, serum_marker")
parser.add_argument('--num-files', action="store", type=int,
help="Specify a limit on the number of dummy files to generate per file type object. For files "
"with indices it assumed you want a set so file+index file counts as 1.")
parser.add_argument('--enum_sheet', action="store", type=str,
default="https://docs.google.com/spreadsheets/d/1AX9HLzIV6wtkVylLkwOr3kdKDaZf4ukeYACTJ7lYngk/edit#gid=1170119639",
help="Google sheet with enum values to enable choices from an enum category.")
args = parser.parse_args()
if not (args.values or args.gurl):
parser.error('\n\nAt least one of --values or --gurl must be specified, use -h for details')
return args
def parse_json(json_path: str) -> dict:
"""Parses the jsons from the provided path and stores in a dictionary with keys derived from the filenames.
Args:
json_path: Path to pre-simulated json files with values to replace.
Returns:
A dict mapping schema object names to the list of simulated metadata objects.
"""
sim_data = {}
json_files = glob.glob(json_path + "/*.json")
for file in json_files:
object_filename = os.path.basename(os.path.normpath(file))
object_name = object_filename.split(".json")[0]
with open(file, "r") as f:
metadata = json.load(f)
sim_data[object_name] = metadata
return sim_data
def parse_values(values_path: str) -> pd.DataFrame:
"""Reads the table containing distribution information for values that need to be updated and returns a DataFrame.
Args:
values_path: path or google sheet url containing distribution and type information
Returns:
Pandas DataFrame representation of the table.
"""
if values_path.startswith("https"):
values_table = pd.read_csv(values_path.replace("edit#", "export?format=csv&"))
else:
values_table = pd.read_csv(values_path)
return values_table
def generate_random_string(length: int) -> str:
"""Generates a random string according to the given length
"""
result = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
return result
def generate_mean_number(mean: float, sd: float,
schema_type: Literal["number", "boolean", "integer", "datetime", "string"]):
"""Generates a random number from a normal distribution based on mean and standard deviation and returns a string or
integer depending on the type required by the schema.
"""
if schema_type == "number":
return round(np.random.normal(mean, sd), 1)
else:
return int(round(np.random.normal(mean, sd), 0))
def generate_median_number(median: float, first_q: float, third_q: float,
schema_type: Literal["number", "boolean", "integer", "datetime", "string"]):
"""Generates a random number from a normal distribution based on median, first and third quartiles and returns a string
or int depending on the type required by the schema.
"""
estimated_sd = (third_q - first_q) / 1.35
if schema_type == "number":
return round(np.random.normal(median, estimated_sd), 1)
else:
return int(round(np.random.normal(median, estimated_sd), 0))
def generate_range_number(range_min: float, range_max: float,
schema_type: Literal["number", "boolean", "integer", "datetime", "string"]):
"""Generates a random number between min and max which is parsed as a string or int depending on required the type
required by the schema.
"""
if schema_type in ["string", "datetime"]:
return str(random.randint(range_min, range_max))
else:
return random.randint(range_min, range_max)
def generate_date(date_min: str, date_max: str) -> str:
"""Generates date between a given minimum and maximum.
Args:
date_min: string representing a date in YYYY-mm-dd format.
date_max: string representing a date in YYYY-mm-dd format.
Returns:
String representing a date in YYYY-mm-dd format.
"""
date_start = datetime.strptime(date_min, '%Y-%m-%d')
date_end = datetime.strptime(date_max, '%Y-%m-%d')
delta = date_end - date_start
random_day = random.randrange(delta.days)
rand_date = date_start + timedelta(days=random_day)
return rand_date.strftime('%d/%m/%Y')
def calculate_age(birth_year: int, birth_month: int, baseline_year: int):
"""
Method to calculate age from given year, month and baseline so that age is consistent with given month and year.
"""
baseline_date = generate_date(f"{baseline_year}-01-01", f"{baseline_year}-12-31")
timepoint = datetime.strptime(baseline_date, '%d/%m/%Y')
if birth_month in [11, 4, 6, 9]:
max_day = 30
elif birth_month == 2:
max_day = 28
else:
max_day = 31
birth_day = random.randrange(1, max_day)
str_birthdate = f"{birth_year}-{birth_month}-{birth_day}"
birthdate = datetime.strptime(str_birthdate, '%Y-%m-%d')
age = timepoint - birthdate
age_years = round(age.days * 0.002738, 1)
return age_years
def get_enums(sheet_url: str, num: int, enum_name: str) -> list:
"""Randomly choose enum options from the enum category"""
enum_sheet = pd.read_csv(sheet_url.replace("edit#", "export?format=csv&"),
keep_default_na=False, na_values=['_'])
enum_list = list(enum_sheet.loc[enum_sheet['type_name'] == enum_name]['enum'])
random_molecules = sorted(random.sample(enum_list, num))
return random_molecules
def replace_values(sim_data: dict, table: pd.DataFrame) -> None:
""" Replace the existing simulated values with values calculated from provided distributions in place.
Args:
sim_data: A dictionary with schema names as keys and a list of simulated data for each object type
table: a plausible values table with the given parameters to generate plausible values. Should follow format as
described in the documentation.
"""
for index, row in table.iterrows():
if row['property'] == "baseline_age":
for item in sim_data[row['object']]:
calculated_age = calculate_age(item['year_birth'], item['month_birth'], 2000)
item['baseline_age'] = calculated_age
elif row['property'] == "cigarettes_per_day":
for item in sim_data[row['object']]:
if item['smoking_status'] in ['never', 'not collected', None]:
item['cigarettes_per_day'] = None
else:
item['cigarettes_per_day'] = generate_mean_number(row['mean'], row['sd'], row['schema_type'])
elif row['data_type'] == "mean":
for item in sim_data[row['object']]:
new_value = generate_mean_number(row['mean'], row['sd'], row['schema_type'])
item[row['property']] = new_value
elif row['data_type'] == "range":
if row['schema_type'] == "date":
for item in sim_data[row['object']]:
new_value = generate_date(row['range_start'], row['range_end'])
item[row['property']] = new_value
else:
for item in sim_data[row['object']]:
new_value = generate_range_number(int(row['range_start']), int(row['range_end']), row['schema_type'])
item[row['property']] = new_value
elif row['data_type'] == "median":
for item in sim_data[row['object']]:
new_value = generate_median_number(row['median'], row['first_quart'], row['third_quart'],
row['schema_type'])
item[row['property']] = new_value
def create_output_dir(dict_name: str) -> tuple:
cwd = os.getcwd()
write_dir = os.path.join(cwd, "dummy_files", dict_name)
script_path = os.path.abspath(os.path.dirname(__file__))
if not os.path.exists(write_dir):
os.makedirs(write_dir)
return write_dir, script_path
def create_cmc(project_code: str, file_type: str) -> dict:
submitter_id = f"core_metadata_collection_{project_code}_{file_type}_{generate_random_string(10)}"
cmc = {
"contributor": generate_random_string(10),
"coverage": generate_random_string(10),
"creator": generate_random_string(10),
"projects": {
"code": project_code
},
"source": generate_random_string(10),
"submitter_id": submitter_id,
"title": generate_random_string(10),
"type": "core_metadata_collection"
}
return cmc
def write_dummy_reads_files(sim_data: dict, copy_files: bool, num_files: int, paths: tuple) -> None:
"""Generate dummy aligned_reads (BAM/BAI/CRAM/CRAI) files with appropriate template and other metadata fields.
All files are copies of the relevant file in `file_type_templates` and each file is linked to the sample object.
Args:
sim_data: dictionary of objects with presimulated data
copy_files: boolean indicating whether file generation is switched on
num_files: limit on the number of files to generate
paths: a tuple with the paths to directories for [0] where to write output files [1] location of file templates
"""
cmc = create_cmc(sim_data['project']['code'], 'reads')
sim_data['core_metadata_collection'].append(cmc)
file_count = 0
for reads_file, index_file, sample in zip(sim_data['aligned_reads_file'], sim_data['aligned_reads_index_file'], sim_data['sample']):
dummy_index_file_name = None
dummy_file_name = None
if 'samples' not in reads_file.keys():
reads_file['samples'] = {'submitter_id': sample['submitter_id']}
reads_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
if copy_files:
if reads_file['data_format'] == 'cram':
dummy_file_name = "dummy_cram.cram"
reads_file['file_name'] = f"{reads_file['file_name']}.cram"
reads_file['data_type'] = "aligned reads"
reads_file['data_category'] = "sequencing reads"
reads_file['reference_genome_build'] = "GRCh37"
index_file['data_format'] = "crai"
index_file['data_type'] = "aligned reads"
index_file['data_category'] = "sequencing reads"
dummy_index_file_name = f"{dummy_file_name}.crai"
index_file['file_name'] = f"{reads_file['file_name']}.crai"
index_file['aligned_reads_files'] = {"submitter_id": reads_file['submitter_id']}
elif reads_file['data_format'] == 'bam':
dummy_file_name = "dummy_bam.bam"
reads_file['file_name'] = f"{reads_file['file_name']}.bam"
reads_file['data_type'] = "aligned reads"
reads_file['data_category'] = "sequencing reads"
reads_file['reference_genome_build'] = "GRCh37"
index_file['data_format'] = "bai"
index_file['data_category'] = "sequencing reads"
index_file['data_type'] = "aligned reads"
dummy_index_file_name = "dummy_bam.bam.bai"
index_file['file_name'] = f"{reads_file['file_name']}.bai"
if dummy_file_name and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_file_name),
os.path.join(paths[0], reads_file["file_name"]))
if dummy_index_file_name and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_index_file_name),
os.path.join(paths[0], index_file['file_name']))
index_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
file_count += 1
# get a list of all ids for all aligned reads
all_aligned_reads = [{"submitter_id": x['submitter_id']} for x in sim_data['aligned_reads_file']]
# create a genomics assay object that links to all aligned reads files
genomics_assay = {
"aligned_reads_files": all_aligned_reads,
"assay_description": "This is an example description. Ideally you would detail the methods and any useful information that someone would want to know when analysing the data files that are linked to this assay.",
"assay_instrument": "5dbe5b48b8",
"assay_type": "WES",
"submitter_id": "genomics_assay_4ed12374e5",
"type": "genomics_assay"
}
sim_data['genomics_assay'].append(genomics_assay)
def write_dummy_variant_files(sim_data: dict, copy_files: bool, num_files: int, paths: tuple) -> None:
"""Generate dummy variant (vcf.gz) files with appropriate template and other metadata fields.
See `write_dummy_reads_files` for full description of args.
"""
cmc = create_cmc(sim_data['project']['code'], 'variants')
sim_data['core_metadata_collection'].append(cmc)
file_count = 0
for variant_file, sample in zip(sim_data['variant_file'], sim_data['sample']):
if 'samples' not in variant_file.keys():
variant_file['samples'] = {'submitter_id': sample['submitter_id']}
dummy_file_name = "dummy_vcf.vcf.gz"
variant_file['data_format'] = "VCF"
variant_file['file_name'] = f"{variant_file['file_name']}.vcf.gz"
variant_file['data_type'] = "variants annotation"
variant_file['data_category'] = "single nucleotide variation"
variant_file['reference_genome_build'] = "GRCh37"
variant_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
if 'aligned_reads_files' in variant_file.keys():
del variant_file['aligned_reads_files']
if copy_files and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_file_name),
os.path.join(paths[0], variant_file['file_name']))
file_count += 1
all_variants = [{"submitter_id": x['submitter_id']} for x in sim_data['variant_file']]
genomics_assay = {
"variant_files": all_variants,
"assay_description": "This is an example description. Ideally you would detail the methods and any useful information that someone would want to know when analysing the data files that are linked to this assay.",
"assay_instrument": "Infinium CytoSNP-850K BeadChip",
"assay_type": "SNP Chip",
"submitter_id": f"genomics_assay_{generate_random_string(10)}",
"type": "genomics_assay"
}
sim_data['genomics_assay'].append(genomics_assay)
def write_dummy_metabolomics_files(sim_data: dict, copy_files: bool, num_files: int, paths: tuple, gurl: str) -> None:
cmc = create_cmc(sim_data['project']['code'], 'metab')
sim_data['core_metadata_collection'].append(cmc)
file_count = 0
for metab_file, sample in zip(sim_data['metabolomics_file'], sim_data['sample']):
if 'samples' not in metab_file.keys():
metab_file['samples'] = {'submitter_id': sample['submitter_id']}
dummy_file_name = "dummy_metab.wiff"
metab_file['data_format'] = "WIFF"
metab_file['file_name'] = f"{metab_file['file_name']}.wiff"
metab_file['data_type'] = "MS"
metab_file['data_category'] = "mass spec analysed"
metab_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
if copy_files and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_file_name),
os.path.join(paths[0], metab_file['file_name']))
file_count += 1
all_metabolomics_files = [{"submitter_id": x['submitter_id']} for x in sim_data['metabolomics_file']]
metabolomics_assay = {
"metabolomics_files": all_metabolomics_files,
"assay_description": "This is an example description. Ideally you would detail the methods and any useful information that someone would want to know when analysing the data files that are linked to this assay.",
"metabolite_names": get_enums(gurl, 30, "enum_metab"),
"submitter_id": f"metabolomics_assay_{generate_random_string(10)}",
"type": "metabolomics_assay"
}
sim_data['metabolomics_assay'] = [metabolomics_assay]
def write_dummy_proteomics_files(sim_data: dict, copy_files: bool, num_files: int, paths: tuple, gurl: str) -> None:
cmc = create_cmc(sim_data['project']['code'], 'prot')
sim_data['core_metadata_collection'].append(cmc)
file_count = 0
for prot_file, sample in zip(sim_data['proteomics_file'], sim_data['sample']):
if 'samples' not in prot_file.keys():
prot_file['samples'] = {'submitter_id': sample['submitter_id']}
dummy_file_name = "dummy_prot.mgf"
prot_file['data_format'] = "MGF"
prot_file['file_name'] = f"{prot_file['file_name']}.mgf"
prot_file['data_type'] = "MS"
prot_file['data_category'] = "mass spec analysed"
prot_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
if copy_files and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_file_name),
os.path.join(paths[0], prot_file['file_name']))
file_count += 1
all_proteomics_files = [{"submitter_id": x['submitter_id']} for x in sim_data['proteomics_file']]
proteomics_assay = {
"proteomics_files": all_proteomics_files,
"assay_description": "This is an example description. Ideally you would detail the methods and any useful information that someone would want to know when analysing the data files that are linked to this assay.",
"protein_names": get_enums(gurl, 100, "enum_proteins"),
"submitter_id": f"proteomics_assay_{generate_random_string(10)}",
"type": "proteomics_assay"
}
sim_data['proteomics_assay'] = [proteomics_assay]
def write_dummy_serum_marker_files(sim_data: dict, copy_files: bool, num_files: int, paths: tuple, gurl: str) -> None:
cmc = create_cmc(sim_data['project']['code'], 'serum')
sim_data['core_metadata_collection'].append(cmc)
file_count = 0
for serum_file, sample in zip(sim_data['serum_marker_file'], sim_data['sample']):
if 'samples' not in serum_file.keys():
serum_file['samples'] = {'submitter_id': sample['submitter_id']}
dummy_file_name = "dummy_serum.csv"
serum_file['data_format'] = "csv"
serum_file['file_name'] = f"{serum_file['file_name']}.csv"
serum_file['data_type'] = "MS"
serum_file['data_category'] = "mass spec analysed"
serum_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
if copy_files and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_file_name),
os.path.join(paths[0], serum_file['file_name']))
file_count += 1
all_serum_files = [{"submitter_id": x['submitter_id']} for x in sim_data['serum_marker_file']]
serum_marker_assay = {
"serum_marker_files": all_serum_files,
"assay_description": "This is an example description. Ideally you would detail the methods and any useful information that someone would want to know when analysing the data files that are linked to this assay.",
"serum_markers": get_enums(gurl, 8, "enum_serum"),
"submitter_id": f"serum_marker_assay_{generate_random_string(10)}",
"type": "serum_marker_assay"
}
sim_data['serum_marker_assay'] = [serum_marker_assay]
def write_dummy_lipid_files(sim_data: dict, copy_files: bool, num_files: int, paths, gurl) -> None:
dummy_file_name = "dummy_lipids.csv"
cmc = create_cmc(sim_data['project']['code'], 'lipids')
sim_data['core_metadata_collection'].append(cmc)
file_count = 0 # keep track of the number of data files being generated
# Create lipid_file metadata and link to each sample
for lipid_file, sample in zip(sim_data['lipidomics_file'], sim_data['sample']):
if 'samples' not in lipid_file.keys():
lipid_file['samples'] = {'submitter_id': sample['submitter_id']}
lipid_file['core_metadata_collections'] = {'submitter_id': cmc['submitter_id']}
lipid_file['data_format'] = "csv"
lipid_file['data_type'] = "MS"
lipid_file['data_category'] = "summarised results"
lipid_file['file_name'] = f"{lipid_file['file_name']}.csv"
if copy_files and file_count < num_files:
shutil.copyfile(os.path.join(paths[1], "file_type_templates", dummy_file_name),
os.path.join(paths[0], lipid_file['file_name']))
file_count += 1
all_lipidomics_files = [{"submitter_id": x['submitter_id']} for x in sim_data['lipidomics_file']]
# Create a lipidomics_assay object to describe the assay used to create the data
lipidomics_assay = {
"lipidomics_files": all_lipidomics_files,
"assay_description": "This is an example description. Ideally you would detail the methods and any useful information that someone would want to know when analysing the data files that are linked to this assay.",
"lipid_names": get_enums(gurl, 250, "enum_lipids"),
"submitter_id": f"lipidomics_assay_{generate_random_string(10)}",
"type": "lipidomics_assay"
}
sim_data['lipidomics_assay'] = [lipidomics_assay]
def _write_files(sim_data: dict, dict_name: str) -> None:
""" Write structured metadata json files to cwd/edited_jsons/dict_name/
"""
cwd = os.getcwd()
write_dir = os.path.join(cwd, "edited_jsons", dict_name)
if not os.path.exists(write_dir):
os.makedirs(write_dir)
for key, item in sim_data.items():
file_name = f"{key}.json"
with open(os.path.join(write_dir, file_name), "w+") as f:
json.dump(item, f, indent=4, sort_keys=True)
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
args = parse_arguments()
logging.info(f"Parsing simulated jsons from {args.path}")
simulated_data = parse_json(args.path)
logging.info("Parsing distribution values from sheet/csv")
if args.gurl:
values_table = parse_values(args.gurl)
else:
values_table = parse_values(args.values)
replace_values(simulated_data, values_table)
if not args.name:
name = os.path.basename(os.path.split(args.path)[0])
else:
name = args.name
if args.generate_files:
del simulated_data['core_metadata_collection']
simulated_data['core_metadata_collection'] = []
paths = create_output_dir(name)
if "aligned_reads" in args.file_types:
logging.info("generating aligned reads files")
simulated_data['genomics_assay'] = []
write_dummy_reads_files(simulated_data, args.generate_files,
args.num_files if args.num_files else len(simulated_data['aligned_reads_file']),
paths)
if "variant" in args.file_types:
logging.info("generating variant files")
if len(simulated_data['genomics_assay']) > 1:
simulated_data['genomics_assay'] = []
write_dummy_variant_files(simulated_data, args.generate_files,
args.num_files if args.num_files else len(simulated_data['aligned_reads_file']),
paths)
if "lipidomics" in args.file_types:
logging.info("generating lipid files")
write_dummy_lipid_files(simulated_data, args.generate_files,
args.num_files if args.num_files else len(simulated_data["lipidomics_file"]),
paths, args.enum_sheet)
if "metabolomics" in args.file_types:
logging.info("generating metabolomics files")
write_dummy_metabolomics_files(simulated_data, args.generate_files,
args.num_files if args.num_files else len(simulated_data["metabolomics_file"]),
paths, args.enum_sheet)
if "proteomics" in args.file_types:
logging.info("generating proteomics files")
write_dummy_proteomics_files(simulated_data, args.generate_files,
args.num_files if args.num_files else len(simulated_data["proteomics_file"]),
paths, args.enum_sheet)
if "serum_marker" in args.file_types:
logging.info("generating serum marker files")
write_dummy_serum_marker_files(simulated_data, args.generate_files,
args.num_files if args.num_files else len(simulated_data["serum_marker_file"]),
paths, args.enum_sheet)
full_file_set = {"aligned_reads", "variant", "metabolomics", "proteomics", "lipidomics", "serum_marker"}
ungenerated_files = full_file_set.difference(set(args.file_types))
for file_type in ungenerated_files:
del simulated_data[f"{file_type}_file"]
if f"{file_type}_assay" in simulated_data.keys():
del simulated_data[f"{file_type}_assay"]
if "aligned_reads" in ungenerated_files and "variant" in ungenerated_files:
del simulated_data['genomics_assay']
if "aligned_reads" in ungenerated_files:
del simulated_data['aligned_reads_index_file']
logging.info("Writing metadata jsons to file...")
del simulated_data['acknowledgement']
del simulated_data['publication']
_write_files(simulated_data, name)
logging.info("Metadata jsons written to: ./edited_jsons")
if args.generate_files:
logging.info("Data files written to: ./dummy_files")
if __name__ == '__main__':
main()